Showing error 1341

User: Jiri Slaby
Error type: Leaving function in locked state
Error type description: Some lock is not unlocked on all paths of a function, so it is leaked
File location: fs/dlm/lock.c
Line in file: 940
Project: Linux Kernel
Project version: 2.6.28
Tools: Stanse (1.2)
Entered: 2012-05-21 20:30:05 UTC


Source:

   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   5**
   6**  This copyrighted material is made available to anyone wishing to use,
   7**  modify, copy, or redistribute it subject to the terms and conditions
   8**  of the GNU General Public License v.2.
   9**
  10*******************************************************************************
  11******************************************************************************/
  12
  13/* Central locking logic has four stages:
  14
  15   dlm_lock()
  16   dlm_unlock()
  17
  18   request_lock(ls, lkb)
  19   convert_lock(ls, lkb)
  20   unlock_lock(ls, lkb)
  21   cancel_lock(ls, lkb)
  22
  23   _request_lock(r, lkb)
  24   _convert_lock(r, lkb)
  25   _unlock_lock(r, lkb)
  26   _cancel_lock(r, lkb)
  27
  28   do_request(r, lkb)
  29   do_convert(r, lkb)
  30   do_unlock(r, lkb)
  31   do_cancel(r, lkb)
  32
  33   Stage 1 (lock, unlock) is mainly about checking input args and
  34   splitting into one of the four main operations:
  35
  36       dlm_lock          = request_lock
  37       dlm_lock+CONVERT  = convert_lock
  38       dlm_unlock        = unlock_lock
  39       dlm_unlock+CANCEL = cancel_lock
  40
  41   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
  42   provided to the next stage.
  43
  44   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
  45   When remote, it calls send_xxxx(), when local it calls do_xxxx().
  46
  47   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
  48   given rsb and lkb and queues callbacks.
  49
  50   For remote operations, send_xxxx() results in the corresponding do_xxxx()
  51   function being executed on the remote node.  The connecting send/receive
  52   calls on local (L) and remote (R) nodes:
  53
  54   L: send_xxxx()              ->  R: receive_xxxx()
  55                                   R: do_xxxx()
  56   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
  57*/
  58#include <linux/types.h>
  59#include "dlm_internal.h"
  60#include <linux/dlm_device.h>
  61#include "memory.h"
  62#include "lowcomms.h"
  63#include "requestqueue.h"
  64#include "util.h"
  65#include "dir.h"
  66#include "member.h"
  67#include "lockspace.h"
  68#include "ast.h"
  69#include "lock.h"
  70#include "rcom.h"
  71#include "recover.h"
  72#include "lvb_table.h"
  73#include "user.h"
  74#include "config.h"
  75
  76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
  77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
  78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
  80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
  81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
  82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
  83static int send_remove(struct dlm_rsb *r);
  84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  85static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
  86static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
  87                                    struct dlm_message *ms);
  88static int receive_extralen(struct dlm_message *ms);
  89static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
  90static void del_timeout(struct dlm_lkb *lkb);
  91
  92/*
  93 * Lock compatibilty matrix - thanks Steve
  94 * UN = Unlocked state. Not really a state, used as a flag
  95 * PD = Padding. Used to make the matrix a nice power of two in size
  96 * Other states are the same as the VMS DLM.
  97 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
  98 */
  99
 100static const int __dlm_compat_matrix[8][8] = {
 101      /* UN NL CR CW PR PW EX PD */
 102        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 103        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 104        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 105        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 106        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 107        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 108        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 109        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 110};
 111
 112/*
 113 * This defines the direction of transfer of LVB data.
 114 * Granted mode is the row; requested mode is the column.
 115 * Usage: matrix[grmode+1][rqmode+1]
 116 * 1 = LVB is returned to the caller
 117 * 0 = LVB is written to the resource
 118 * -1 = nothing happens to the LVB
 119 */
 120
 121const int dlm_lvb_operations[8][8] = {
 122        /* UN   NL  CR  CW  PR  PW  EX  PD*/
 123        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
 124        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
 125        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
 126        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
 127        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
 128        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
 129        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
 130        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
 131};
 132
 133#define modes_compat(gr, rq) \
 134        __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
 135
 136int dlm_modes_compat(int mode1, int mode2)
 137{
 138        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 139}
 140
 141/*
 142 * Compatibility matrix for conversions with QUECVT set.
 143 * Granted mode is the row; requested mode is the column.
 144 * Usage: matrix[grmode+1][rqmode+1]
 145 */
 146
 147static const int __quecvt_compat_matrix[8][8] = {
 148      /* UN NL CR CW PR PW EX PD */
 149        {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
 150        {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
 151        {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
 152        {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
 153        {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
 154        {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
 155        {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
 156        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 157};
 158
 159void dlm_print_lkb(struct dlm_lkb *lkb)
 160{
 161        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
 162               "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
 163               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 164               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
 165               lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
 166}
 167
 168static void dlm_print_rsb(struct dlm_rsb *r)
 169{
 170        printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
 171               r->res_nodeid, r->res_flags, r->res_first_lkid,
 172               r->res_recover_locks_count, r->res_name);
 173}
 174
 175void dlm_dump_rsb(struct dlm_rsb *r)
 176{
 177        struct dlm_lkb *lkb;
 178
 179        dlm_print_rsb(r);
 180
 181        printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
 182               list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
 183        printk(KERN_ERR "rsb lookup list\n");
 184        list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
 185                dlm_print_lkb(lkb);
 186        printk(KERN_ERR "rsb grant queue:\n");
 187        list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
 188                dlm_print_lkb(lkb);
 189        printk(KERN_ERR "rsb convert queue:\n");
 190        list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
 191                dlm_print_lkb(lkb);
 192        printk(KERN_ERR "rsb wait queue:\n");
 193        list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
 194                dlm_print_lkb(lkb);
 195}
 196
 197/* Threads cannot use the lockspace while it's being recovered */
 198
 199static inline void dlm_lock_recovery(struct dlm_ls *ls)
 200{
 201        down_read(&ls->ls_in_recovery);
 202}
 203
 204void dlm_unlock_recovery(struct dlm_ls *ls)
 205{
 206        up_read(&ls->ls_in_recovery);
 207}
 208
 209int dlm_lock_recovery_try(struct dlm_ls *ls)
 210{
 211        return down_read_trylock(&ls->ls_in_recovery);
 212}
 213
 214static inline int can_be_queued(struct dlm_lkb *lkb)
 215{
 216        return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
 217}
 218
 219static inline int force_blocking_asts(struct dlm_lkb *lkb)
 220{
 221        return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
 222}
 223
 224static inline int is_demoted(struct dlm_lkb *lkb)
 225{
 226        return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
 227}
 228
 229static inline int is_altmode(struct dlm_lkb *lkb)
 230{
 231        return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
 232}
 233
 234static inline int is_granted(struct dlm_lkb *lkb)
 235{
 236        return (lkb->lkb_status == DLM_LKSTS_GRANTED);
 237}
 238
 239static inline int is_remote(struct dlm_rsb *r)
 240{
 241        DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
 242        return !!r->res_nodeid;
 243}
 244
 245static inline int is_process_copy(struct dlm_lkb *lkb)
 246{
 247        return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
 248}
 249
 250static inline int is_master_copy(struct dlm_lkb *lkb)
 251{
 252        if (lkb->lkb_flags & DLM_IFL_MSTCPY)
 253                DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
 254        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 255}
 256
 257static inline int middle_conversion(struct dlm_lkb *lkb)
 258{
 259        if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
 260            (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
 261                return 1;
 262        return 0;
 263}
 264
 265static inline int down_conversion(struct dlm_lkb *lkb)
 266{
 267        return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
 268}
 269
 270static inline int is_overlap_unlock(struct dlm_lkb *lkb)
 271{
 272        return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
 273}
 274
 275static inline int is_overlap_cancel(struct dlm_lkb *lkb)
 276{
 277        return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
 278}
 279
 280static inline int is_overlap(struct dlm_lkb *lkb)
 281{
 282        return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
 283                                  DLM_IFL_OVERLAP_CANCEL));
 284}
 285
 286static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
 287{
 288        if (is_master_copy(lkb))
 289                return;
 290
 291        del_timeout(lkb);
 292
 293        DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 294
 295        /* if the operation was a cancel, then return -DLM_ECANCEL, if a
 296           timeout caused the cancel then return -ETIMEDOUT */
 297        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
 298                lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
 299                rv = -ETIMEDOUT;
 300        }
 301
 302        if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
 303                lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
 304                rv = -EDEADLK;
 305        }
 306
 307        lkb->lkb_lksb->sb_status = rv;
 308        lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 309
 310        dlm_add_ast(lkb, AST_COMP);
 311}
 312
 313static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
 314{
 315        queue_cast(r, lkb,
 316                   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
 317}
 318
 319static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
 320{
 321        if (is_master_copy(lkb))
 322                send_bast(r, lkb, rqmode);
 323        else {
 324                lkb->lkb_bastmode = rqmode;
 325                dlm_add_ast(lkb, AST_BAST);
 326        }
 327}
 328
 329/*
 330 * Basic operations on rsb's and lkb's
 331 */
 332
 333static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
 334{
 335        struct dlm_rsb *r;
 336
 337        r = dlm_allocate_rsb(ls, len);
 338        if (!r)
 339                return NULL;
 340
 341        r->res_ls = ls;
 342        r->res_length = len;
 343        memcpy(r->res_name, name, len);
 344        mutex_init(&r->res_mutex);
 345
 346        INIT_LIST_HEAD(&r->res_lookup);
 347        INIT_LIST_HEAD(&r->res_grantqueue);
 348        INIT_LIST_HEAD(&r->res_convertqueue);
 349        INIT_LIST_HEAD(&r->res_waitqueue);
 350        INIT_LIST_HEAD(&r->res_root_list);
 351        INIT_LIST_HEAD(&r->res_recover_list);
 352
 353        return r;
 354}
 355
 356static int search_rsb_list(struct list_head *head, char *name, int len,
 357                           unsigned int flags, struct dlm_rsb **r_ret)
 358{
 359        struct dlm_rsb *r;
 360        int error = 0;
 361
 362        list_for_each_entry(r, head, res_hashchain) {
 363                if (len == r->res_length && !memcmp(name, r->res_name, len))
 364                        goto found;
 365        }
 366        *r_ret = NULL;
 367        return -EBADR;
 368
 369 found:
 370        if (r->res_nodeid && (flags & R_MASTER))
 371                error = -ENOTBLK;
 372        *r_ret = r;
 373        return error;
 374}
 375
 376static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 377                       unsigned int flags, struct dlm_rsb **r_ret)
 378{
 379        struct dlm_rsb *r;
 380        int error;
 381
 382        error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
 383        if (!error) {
 384                kref_get(&r->res_ref);
 385                goto out;
 386        }
 387        error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 388        if (error)
 389                goto out;
 390
 391        list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
 392
 393        if (dlm_no_directory(ls))
 394                goto out;
 395
 396        if (r->res_nodeid == -1) {
 397                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
 398                r->res_first_lkid = 0;
 399        } else if (r->res_nodeid > 0) {
 400                rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
 401                r->res_first_lkid = 0;
 402        } else {
 403                DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
 404                DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
 405        }
 406 out:
 407        *r_ret = r;
 408        return error;
 409}
 410
 411static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 412                      unsigned int flags, struct dlm_rsb **r_ret)
 413{
 414        int error;
 415        write_lock(&ls->ls_rsbtbl[b].lock);
 416        error = _search_rsb(ls, name, len, b, flags, r_ret);
 417        write_unlock(&ls->ls_rsbtbl[b].lock);
 418        return error;
 419}
 420
 421/*
 422 * Find rsb in rsbtbl and potentially create/add one
 423 *
 424 * Delaying the release of rsb's has a similar benefit to applications keeping
 425 * NL locks on an rsb, but without the guarantee that the cached master value
 426 * will still be valid when the rsb is reused.  Apps aren't always smart enough
 427 * to keep NL locks on an rsb that they may lock again shortly; this can lead
 428 * to excessive master lookups and removals if we don't delay the release.
 429 *
 430 * Searching for an rsb means looking through both the normal list and toss
 431 * list.  When found on the toss list the rsb is moved to the normal list with
 432 * ref count of 1; when found on normal list the ref count is incremented.
 433 */
 434
 435static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
 436                    unsigned int flags, struct dlm_rsb **r_ret)
 437{
 438        struct dlm_rsb *r, *tmp;
 439        uint32_t hash, bucket;
 440        int error = -EINVAL;
 441
 442        if (namelen > DLM_RESNAME_MAXLEN)
 443                goto out;
 444
 445        if (dlm_no_directory(ls))
 446                flags |= R_CREATE;
 447
 448        error = 0;
 449        hash = jhash(name, namelen, 0);
 450        bucket = hash & (ls->ls_rsbtbl_size - 1);
 451
 452        error = search_rsb(ls, name, namelen, bucket, flags, &r);
 453        if (!error)
 454                goto out;
 455
 456        if (error == -EBADR && !(flags & R_CREATE))
 457                goto out;
 458
 459        /* the rsb was found but wasn't a master copy */
 460        if (error == -ENOTBLK)
 461                goto out;
 462
 463        error = -ENOMEM;
 464        r = create_rsb(ls, name, namelen);
 465        if (!r)
 466                goto out;
 467
 468        r->res_hash = hash;
 469        r->res_bucket = bucket;
 470        r->res_nodeid = -1;
 471        kref_init(&r->res_ref);
 472
 473        /* With no directory, the master can be set immediately */
 474        if (dlm_no_directory(ls)) {
 475                int nodeid = dlm_dir_nodeid(r);
 476                if (nodeid == dlm_our_nodeid())
 477                        nodeid = 0;
 478                r->res_nodeid = nodeid;
 479        }
 480
 481        write_lock(&ls->ls_rsbtbl[bucket].lock);
 482        error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
 483        if (!error) {
 484                write_unlock(&ls->ls_rsbtbl[bucket].lock);
 485                dlm_free_rsb(r);
 486                r = tmp;
 487                goto out;
 488        }
 489        list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
 490        write_unlock(&ls->ls_rsbtbl[bucket].lock);
 491        error = 0;
 492 out:
 493        *r_ret = r;
 494        return error;
 495}
 496
 497/* This is only called to add a reference when the code already holds
 498   a valid reference to the rsb, so there's no need for locking. */
 499
 500static inline void hold_rsb(struct dlm_rsb *r)
 501{
 502        kref_get(&r->res_ref);
 503}
 504
 505void dlm_hold_rsb(struct dlm_rsb *r)
 506{
 507        hold_rsb(r);
 508}
 509
 510static void toss_rsb(struct kref *kref)
 511{
 512        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 513        struct dlm_ls *ls = r->res_ls;
 514
 515        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 516        kref_init(&r->res_ref);
 517        list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
 518        r->res_toss_time = jiffies;
 519        if (r->res_lvbptr) {
 520                dlm_free_lvb(r->res_lvbptr);
 521                r->res_lvbptr = NULL;
 522        }
 523}
 524
 525/* When all references to the rsb are gone it's transfered to
 526   the tossed list for later disposal. */
 527
 528static void put_rsb(struct dlm_rsb *r)
 529{
 530        struct dlm_ls *ls = r->res_ls;
 531        uint32_t bucket = r->res_bucket;
 532
 533        write_lock(&ls->ls_rsbtbl[bucket].lock);
 534        kref_put(&r->res_ref, toss_rsb);
 535        write_unlock(&ls->ls_rsbtbl[bucket].lock);
 536}
 537
 538void dlm_put_rsb(struct dlm_rsb *r)
 539{
 540        put_rsb(r);
 541}
 542
 543/* See comment for unhold_lkb */
 544
 545static void unhold_rsb(struct dlm_rsb *r)
 546{
 547        int rv;
 548        rv = kref_put(&r->res_ref, toss_rsb);
 549        DLM_ASSERT(!rv, dlm_dump_rsb(r););
 550}
 551
 552static void kill_rsb(struct kref *kref)
 553{
 554        struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
 555
 556        /* All work is done after the return from kref_put() so we
 557           can release the write_lock before the remove and free. */
 558
 559        DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
 560        DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
 561        DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
 562        DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
 563        DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
 564        DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
 565}
 566
 567/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
 568   The rsb must exist as long as any lkb's for it do. */
 569
 570static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 571{
 572        hold_rsb(r);
 573        lkb->lkb_resource = r;
 574}
 575
 576static void detach_lkb(struct dlm_lkb *lkb)
 577{
 578        if (lkb->lkb_resource) {
 579                put_rsb(lkb->lkb_resource);
 580                lkb->lkb_resource = NULL;
 581        }
 582}
 583
 584static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
 585{
 586        struct dlm_lkb *lkb, *tmp;
 587        uint32_t lkid = 0;
 588        uint16_t bucket;
 589
 590        lkb = dlm_allocate_lkb(ls);
 591        if (!lkb)
 592                return -ENOMEM;
 593
 594        lkb->lkb_nodeid = -1;
 595        lkb->lkb_grmode = DLM_LOCK_IV;
 596        kref_init(&lkb->lkb_ref);
 597        INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 598        INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
 599        INIT_LIST_HEAD(&lkb->lkb_time_list);
 600
 601        get_random_bytes(&bucket, sizeof(bucket));
 602        bucket &= (ls->ls_lkbtbl_size - 1);
 603
 604        write_lock(&ls->ls_lkbtbl[bucket].lock);
 605
 606        /* counter can roll over so we must verify lkid is not in use */
 607
 608        while (lkid == 0) {
 609                lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
 610
 611                list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
 612                                    lkb_idtbl_list) {
 613                        if (tmp->lkb_id != lkid)
 614                                continue;
 615                        lkid = 0;
 616                        break;
 617                }
 618        }
 619
 620        lkb->lkb_id = lkid;
 621        list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
 622        write_unlock(&ls->ls_lkbtbl[bucket].lock);
 623
 624        *lkb_ret = lkb;
 625        return 0;
 626}
 627
 628static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
 629{
 630        struct dlm_lkb *lkb;
 631        uint16_t bucket = (lkid >> 16);
 632
 633        list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
 634                if (lkb->lkb_id == lkid)
 635                        return lkb;
 636        }
 637        return NULL;
 638}
 639
 640static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
 641{
 642        struct dlm_lkb *lkb;
 643        uint16_t bucket = (lkid >> 16);
 644
 645        if (bucket >= ls->ls_lkbtbl_size)
 646                return -EBADSLT;
 647
 648        read_lock(&ls->ls_lkbtbl[bucket].lock);
 649        lkb = __find_lkb(ls, lkid);
 650        if (lkb)
 651                kref_get(&lkb->lkb_ref);
 652        read_unlock(&ls->ls_lkbtbl[bucket].lock);
 653
 654        *lkb_ret = lkb;
 655        return lkb ? 0 : -ENOENT;
 656}
 657
 658static void kill_lkb(struct kref *kref)
 659{
 660        struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
 661
 662        /* All work is done after the return from kref_put() so we
 663           can release the write_lock before the detach_lkb */
 664
 665        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 666}
 667
 668/* __put_lkb() is used when an lkb may not have an rsb attached to
 669   it so we need to provide the lockspace explicitly */
 670
 671static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
 672{
 673        uint16_t bucket = (lkb->lkb_id >> 16);
 674
 675        write_lock(&ls->ls_lkbtbl[bucket].lock);
 676        if (kref_put(&lkb->lkb_ref, kill_lkb)) {
 677                list_del(&lkb->lkb_idtbl_list);
 678                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 679
 680                detach_lkb(lkb);
 681
 682                /* for local/process lkbs, lvbptr points to caller's lksb */
 683                if (lkb->lkb_lvbptr && is_master_copy(lkb))
 684                        dlm_free_lvb(lkb->lkb_lvbptr);
 685                dlm_free_lkb(lkb);
 686                return 1;
 687        } else {
 688                write_unlock(&ls->ls_lkbtbl[bucket].lock);
 689                return 0;
 690        }
 691}
 692
 693int dlm_put_lkb(struct dlm_lkb *lkb)
 694{
 695        struct dlm_ls *ls;
 696
 697        DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
 698        DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
 699
 700        ls = lkb->lkb_resource->res_ls;
 701        return __put_lkb(ls, lkb);
 702}
 703
 704/* This is only called to add a reference when the code already holds
 705   a valid reference to the lkb, so there's no need for locking. */
 706
 707static inline void hold_lkb(struct dlm_lkb *lkb)
 708{
 709        kref_get(&lkb->lkb_ref);
 710}
 711
 712/* This is called when we need to remove a reference and are certain
 713   it's not the last ref.  e.g. del_lkb is always called between a
 714   find_lkb/put_lkb and is always the inverse of a previous add_lkb.
 715   put_lkb would work fine, but would involve unnecessary locking */
 716
 717static inline void unhold_lkb(struct dlm_lkb *lkb)
 718{
 719        int rv;
 720        rv = kref_put(&lkb->lkb_ref, kill_lkb);
 721        DLM_ASSERT(!rv, dlm_print_lkb(lkb););
 722}
 723
 724static void lkb_add_ordered(struct list_head *new, struct list_head *head,
 725                            int mode)
 726{
 727        struct dlm_lkb *lkb = NULL;
 728
 729        list_for_each_entry(lkb, head, lkb_statequeue)
 730                if (lkb->lkb_rqmode < mode)
 731                        break;
 732
 733        if (!lkb)
 734                list_add_tail(new, head);
 735        else
 736                __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
 737}
 738
 739/* add/remove lkb to rsb's grant/convert/wait queue */
 740
 741static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 742{
 743        kref_get(&lkb->lkb_ref);
 744
 745        DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
 746
 747        lkb->lkb_status = status;
 748
 749        switch (status) {
 750        case DLM_LKSTS_WAITING:
 751                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 752                        list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
 753                else
 754                        list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
 755                break;
 756        case DLM_LKSTS_GRANTED:
 757                /* convention says granted locks kept in order of grmode */
 758                lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 759                                lkb->lkb_grmode);
 760                break;
 761        case DLM_LKSTS_CONVERT:
 762                if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
 763                        list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
 764                else
 765                        list_add_tail(&lkb->lkb_statequeue,
 766                                      &r->res_convertqueue);
 767                break;
 768        default:
 769                DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
 770        }
 771}
 772
 773static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 774{
 775        lkb->lkb_status = 0;
 776        list_del(&lkb->lkb_statequeue);
 777        unhold_lkb(lkb);
 778}
 779
 780static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
 781{
 782        hold_lkb(lkb);
 783        del_lkb(r, lkb);
 784        add_lkb(r, lkb, sts);
 785        unhold_lkb(lkb);
 786}
 787
 788static int msg_reply_type(int mstype)
 789{
 790        switch (mstype) {
 791        case DLM_MSG_REQUEST:
 792                return DLM_MSG_REQUEST_REPLY;
 793        case DLM_MSG_CONVERT:
 794                return DLM_MSG_CONVERT_REPLY;
 795        case DLM_MSG_UNLOCK:
 796                return DLM_MSG_UNLOCK_REPLY;
 797        case DLM_MSG_CANCEL:
 798                return DLM_MSG_CANCEL_REPLY;
 799        case DLM_MSG_LOOKUP:
 800                return DLM_MSG_LOOKUP_REPLY;
 801        }
 802        return -1;
 803}
 804
 805/* add/remove lkb from global waiters list of lkb's waiting for
 806   a reply from a remote node */
 807
 808static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
 809{
 810        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 811        int error = 0;
 812
 813        mutex_lock(&ls->ls_waiters_mutex);
 814
 815        if (is_overlap_unlock(lkb) ||
 816            (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
 817                error = -EINVAL;
 818                goto out;
 819        }
 820
 821        if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
 822                switch (mstype) {
 823                case DLM_MSG_UNLOCK:
 824                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
 825                        break;
 826                case DLM_MSG_CANCEL:
 827                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
 828                        break;
 829                default:
 830                        error = -EBUSY;
 831                        goto out;
 832                }
 833                lkb->lkb_wait_count++;
 834                hold_lkb(lkb);
 835
 836                log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
 837                          lkb->lkb_id, lkb->lkb_wait_type, mstype,
 838                          lkb->lkb_wait_count, lkb->lkb_flags);
 839                goto out;
 840        }
 841
 842        DLM_ASSERT(!lkb->lkb_wait_count,
 843                   dlm_print_lkb(lkb);
 844                   printk("wait_count %d\n", lkb->lkb_wait_count););
 845
 846        lkb->lkb_wait_count++;
 847        lkb->lkb_wait_type = mstype;
 848        hold_lkb(lkb);
 849        list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 850 out:
 851        if (error)
 852                log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
 853                          lkb->lkb_id, error, lkb->lkb_flags, mstype,
 854                          lkb->lkb_wait_type, lkb->lkb_resource->res_name);
 855        mutex_unlock(&ls->ls_waiters_mutex);
 856        return error;
 857}
 858
 859/* We clear the RESEND flag because we might be taking an lkb off the waiters
 860   list as part of process_requestqueue (e.g. a lookup that has an optimized
 861   request reply on the requestqueue) between dlm_recover_waiters_pre() which
 862   set RESEND and dlm_recover_waiters_post() */
 863
 864static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 865{
 866        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 867        int overlap_done = 0;
 868
 869        if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
 870                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
 871                overlap_done = 1;
 872                goto out_del;
 873        }
 874
 875        if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
 876                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 877                overlap_done = 1;
 878                goto out_del;
 879        }
 880
 881        /* N.B. type of reply may not always correspond to type of original
 882           msg due to lookup->request optimization, verify others? */
 883
 884        if (lkb->lkb_wait_type) {
 885                lkb->lkb_wait_type = 0;
 886                goto out_del;
 887        }
 888
 889        log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
 890                  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
 891        return -1;
 892
 893 out_del:
 894        /* the force-unlock/cancel has completed and we haven't recvd a reply
 895           to the op that was in progress prior to the unlock/cancel; we
 896           give up on any reply to the earlier op.  FIXME: not sure when/how
 897           this would happen */
 898
 899        if (overlap_done && lkb->lkb_wait_type) {
 900                log_error(ls, "remove_from_waiters %x reply %d give up on %d",
 901                          lkb->lkb_id, mstype, lkb->lkb_wait_type);
 902                lkb->lkb_wait_count--;
 903                lkb->lkb_wait_type = 0;
 904        }
 905
 906        DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
 907
 908        lkb->lkb_flags &= ~DLM_IFL_RESEND;
 909        lkb->lkb_wait_count--;
 910        if (!lkb->lkb_wait_count)
 911                list_del_init(&lkb->lkb_wait_reply);
 912        unhold_lkb(lkb);
 913        return 0;
 914}
 915
 916static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
 917{
 918        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 919        int error;
 920
 921        mutex_lock(&ls->ls_waiters_mutex);
 922        error = _remove_from_waiters(lkb, mstype);
 923        mutex_unlock(&ls->ls_waiters_mutex);
 924        return error;
 925}
 926
 927/* Handles situations where we might be processing a "fake" or "stub" reply in
 928   which we can't try to take waiters_mutex again. */
 929
 930static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
 931{
 932        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 933        int error;
 934
 935        if (ms != &ls->ls_stub_ms)
 936                mutex_lock(&ls->ls_waiters_mutex);
 937        error = _remove_from_waiters(lkb, ms->m_type);
 938        if (ms != &ls->ls_stub_ms)
 939                mutex_unlock(&ls->ls_waiters_mutex);
 940        return error;
 941}
 942
 943static void dir_remove(struct dlm_rsb *r)
 944{
 945        int to_nodeid;
 946
 947        if (dlm_no_directory(r->res_ls))
 948                return;
 949
 950        to_nodeid = dlm_dir_nodeid(r);
 951        if (to_nodeid != dlm_our_nodeid())
 952                send_remove(r);
 953        else
 954                dlm_dir_remove_entry(r->res_ls, to_nodeid,
 955                                     r->res_name, r->res_length);
 956}
 957
 958/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
 959   found since they are in order of newest to oldest? */
 960
 961static int shrink_bucket(struct dlm_ls *ls, int b)
 962{
 963        struct dlm_rsb *r;
 964        int count = 0, found;
 965
 966        for (;;) {
 967                found = 0;
 968                write_lock(&ls->ls_rsbtbl[b].lock);
 969                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
 970                                            res_hashchain) {
 971                        if (!time_after_eq(jiffies, r->res_toss_time +
 972                                           dlm_config.ci_toss_secs * HZ))
 973                                continue;
 974                        found = 1;
 975                        break;
 976                }
 977
 978                if (!found) {
 979                        write_unlock(&ls->ls_rsbtbl[b].lock);
 980                        break;
 981                }
 982
 983                if (kref_put(&r->res_ref, kill_rsb)) {
 984                        list_del(&r->res_hashchain);
 985                        write_unlock(&ls->ls_rsbtbl[b].lock);
 986
 987                        if (is_master(r))
 988                                dir_remove(r);
 989                        dlm_free_rsb(r);
 990                        count++;
 991                } else {
 992                        write_unlock(&ls->ls_rsbtbl[b].lock);
 993                        log_error(ls, "tossed rsb in use %s", r->res_name);
 994                }
 995        }
 996
 997        return count;
 998}
 999
1000void dlm_scan_rsbs(struct dlm_ls *ls)
1001{
1002        int i;
1003
1004        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1005                shrink_bucket(ls, i);
1006                if (dlm_locking_stopped(ls))
1007                        break;
1008                cond_resched();
1009        }
1010}
1011
1012static void add_timeout(struct dlm_lkb *lkb)
1013{
1014        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1015
1016        if (is_master_copy(lkb)) {
1017                lkb->lkb_timestamp = jiffies;
1018                return;
1019        }
1020
1021        if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1022            !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1023                lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1024                goto add_it;
1025        }
1026        if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1027                goto add_it;
1028        return;
1029
1030 add_it:
1031        DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1032        mutex_lock(&ls->ls_timeout_mutex);
1033        hold_lkb(lkb);
1034        lkb->lkb_timestamp = jiffies;
1035        list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1036        mutex_unlock(&ls->ls_timeout_mutex);
1037}
1038
1039static void del_timeout(struct dlm_lkb *lkb)
1040{
1041        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1042
1043        mutex_lock(&ls->ls_timeout_mutex);
1044        if (!list_empty(&lkb->lkb_time_list)) {
1045                list_del_init(&lkb->lkb_time_list);
1046                unhold_lkb(lkb);
1047        }
1048        mutex_unlock(&ls->ls_timeout_mutex);
1049}
1050
1051/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1052   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1053   and then lock rsb because of lock ordering in add_timeout.  We may need
1054   to specify some special timeout-related bits in the lkb that are just to
1055   be accessed under the timeout_mutex. */
1056
1057void dlm_scan_timeout(struct dlm_ls *ls)
1058{
1059        struct dlm_rsb *r;
1060        struct dlm_lkb *lkb;
1061        int do_cancel, do_warn;
1062
1063        for (;;) {
1064                if (dlm_locking_stopped(ls))
1065                        break;
1066
1067                do_cancel = 0;
1068                do_warn = 0;
1069                mutex_lock(&ls->ls_timeout_mutex);
1070                list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1071
1072                        if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1073                            time_after_eq(jiffies, lkb->lkb_timestamp +
1074                                          lkb->lkb_timeout_cs * HZ/100))
1075                                do_cancel = 1;
1076
1077                        if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1078                            time_after_eq(jiffies, lkb->lkb_timestamp +
1079                                              dlm_config.ci_timewarn_cs * HZ/100))
1080                                do_warn = 1;
1081
1082                        if (!do_cancel && !do_warn)
1083                                continue;
1084                        hold_lkb(lkb);
1085                        break;
1086                }
1087                mutex_unlock(&ls->ls_timeout_mutex);
1088
1089                if (!do_cancel && !do_warn)
1090                        break;
1091
1092                r = lkb->lkb_resource;
1093                hold_rsb(r);
1094                lock_rsb(r);
1095
1096                if (do_warn) {
1097                        /* clear flag so we only warn once */
1098                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1099                        if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1100                                del_timeout(lkb);
1101                        dlm_timeout_warn(lkb);
1102                }
1103
1104                if (do_cancel) {
1105                        log_debug(ls, "timeout cancel %x node %d %s",
1106                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1107                        lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1108                        lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1109                        del_timeout(lkb);
1110                        _cancel_lock(r, lkb);
1111                }
1112
1113                unlock_rsb(r);
1114                unhold_rsb(r);
1115                dlm_put_lkb(lkb);
1116        }
1117}
1118
1119/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1120   dlm_recoverd before checking/setting ls_recover_begin. */
1121
1122void dlm_adjust_timeouts(struct dlm_ls *ls)
1123{
1124        struct dlm_lkb *lkb;
1125        long adj = jiffies - ls->ls_recover_begin;
1126
1127        ls->ls_recover_begin = 0;
1128        mutex_lock(&ls->ls_timeout_mutex);
1129        list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1130                lkb->lkb_timestamp += adj;
1131        mutex_unlock(&ls->ls_timeout_mutex);
1132}
1133
1134/* lkb is master or local copy */
1135
1136static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1137{
1138        int b, len = r->res_ls->ls_lvblen;
1139
1140        /* b=1 lvb returned to caller
1141           b=0 lvb written to rsb or invalidated
1142           b=-1 do nothing */
1143
1144        b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1145
1146        if (b == 1) {
1147                if (!lkb->lkb_lvbptr)
1148                        return;
1149
1150                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1151                        return;
1152
1153                if (!r->res_lvbptr)
1154                        return;
1155
1156                memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1157                lkb->lkb_lvbseq = r->res_lvbseq;
1158
1159        } else if (b == 0) {
1160                if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1161                        rsb_set_flag(r, RSB_VALNOTVALID);
1162                        return;
1163                }
1164
1165                if (!lkb->lkb_lvbptr)
1166                        return;
1167
1168                if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1169                        return;
1170
1171                if (!r->res_lvbptr)
1172                        r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1173
1174                if (!r->res_lvbptr)
1175                        return;
1176
1177                memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1178                r->res_lvbseq++;
1179                lkb->lkb_lvbseq = r->res_lvbseq;
1180                rsb_clear_flag(r, RSB_VALNOTVALID);
1181        }
1182
1183        if (rsb_flag(r, RSB_VALNOTVALID))
1184                lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1185}
1186
1187static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1188{
1189        if (lkb->lkb_grmode < DLM_LOCK_PW)
1190                return;
1191
1192        if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1193                rsb_set_flag(r, RSB_VALNOTVALID);
1194                return;
1195        }
1196
1197        if (!lkb->lkb_lvbptr)
1198                return;
1199
1200        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1201                return;
1202
1203        if (!r->res_lvbptr)
1204                r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1205
1206        if (!r->res_lvbptr)
1207                return;
1208
1209        memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1210        r->res_lvbseq++;
1211        rsb_clear_flag(r, RSB_VALNOTVALID);
1212}
1213
1214/* lkb is process copy (pc) */
1215
1216static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1217                            struct dlm_message *ms)
1218{
1219        int b;
1220
1221        if (!lkb->lkb_lvbptr)
1222                return;
1223
1224        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1225                return;
1226
1227        b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1228        if (b == 1) {
1229                int len = receive_extralen(ms);
1230                if (len > DLM_RESNAME_MAXLEN)
1231                        len = DLM_RESNAME_MAXLEN;
1232                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1233                lkb->lkb_lvbseq = ms->m_lvbseq;
1234        }
1235}
1236
1237/* Manipulate lkb's on rsb's convert/granted/waiting queues
1238   remove_lock -- used for unlock, removes lkb from granted
1239   revert_lock -- used for cancel, moves lkb from convert to granted
1240   grant_lock  -- used for request and convert, adds lkb to granted or
1241                  moves lkb from convert or waiting to granted
1242
1243   Each of these is used for master or local copy lkb's.  There is
1244   also a _pc() variation used to make the corresponding change on
1245   a process copy (pc) lkb. */
1246
1247static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1248{
1249        del_lkb(r, lkb);
1250        lkb->lkb_grmode = DLM_LOCK_IV;
1251        /* this unhold undoes the original ref from create_lkb()
1252           so this leads to the lkb being freed */
1253        unhold_lkb(lkb);
1254}
1255
1256static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1257{
1258        set_lvb_unlock(r, lkb);
1259        _remove_lock(r, lkb);
1260}
1261
1262static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1263{
1264        _remove_lock(r, lkb);
1265}
1266
1267/* returns: 0 did nothing
1268            1 moved lock to granted
1269           -1 removed lock */
1270
1271static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1272{
1273        int rv = 0;
1274
1275        lkb->lkb_rqmode = DLM_LOCK_IV;
1276
1277        switch (lkb->lkb_status) {
1278        case DLM_LKSTS_GRANTED:
1279                break;
1280        case DLM_LKSTS_CONVERT:
1281                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1282                rv = 1;
1283                break;
1284        case DLM_LKSTS_WAITING:
1285                del_lkb(r, lkb);
1286                lkb->lkb_grmode = DLM_LOCK_IV;
1287                /* this unhold undoes the original ref from create_lkb()
1288                   so this leads to the lkb being freed */
1289                unhold_lkb(lkb);
1290                rv = -1;
1291                break;
1292        default:
1293                log_print("invalid status for revert %d", lkb->lkb_status);
1294        }
1295        return rv;
1296}
1297
1298static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299{
1300        return revert_lock(r, lkb);
1301}
1302
1303static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1304{
1305        if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1306                lkb->lkb_grmode = lkb->lkb_rqmode;
1307                if (lkb->lkb_status)
1308                        move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1309                else
1310                        add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1311        }
1312
1313        lkb->lkb_rqmode = DLM_LOCK_IV;
1314}
1315
1316static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1317{
1318        set_lvb_lock(r, lkb);
1319        _grant_lock(r, lkb);
1320        lkb->lkb_highbast = 0;
1321}
1322
1323static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1324                          struct dlm_message *ms)
1325{
1326        set_lvb_lock_pc(r, lkb, ms);
1327        _grant_lock(r, lkb);
1328}
1329
1330/* called by grant_pending_locks() which means an async grant message must
1331   be sent to the requesting node in addition to granting the lock if the
1332   lkb belongs to a remote node. */
1333
1334static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1335{
1336        grant_lock(r, lkb);
1337        if (is_master_copy(lkb))
1338                send_grant(r, lkb);
1339        else
1340                queue_cast(r, lkb, 0);
1341}
1342
1343/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1344   change the granted/requested modes.  We're munging things accordingly in
1345   the process copy.
1346   CONVDEADLK: our grmode may have been forced down to NL to resolve a
1347   conversion deadlock
1348   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1349   compatible with other granted locks */
1350
1351static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1352{
1353        if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1354                log_print("munge_demoted %x invalid reply type %d",
1355                          lkb->lkb_id, ms->m_type);
1356                return;
1357        }
1358
1359        if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1360                log_print("munge_demoted %x invalid modes gr %d rq %d",
1361                          lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1362                return;
1363        }
1364
1365        lkb->lkb_grmode = DLM_LOCK_NL;
1366}
1367
1368static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1369{
1370        if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1371            ms->m_type != DLM_MSG_GRANT) {
1372                log_print("munge_altmode %x invalid reply type %d",
1373                          lkb->lkb_id, ms->m_type);
1374                return;
1375        }
1376
1377        if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1378                lkb->lkb_rqmode = DLM_LOCK_PR;
1379        else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1380                lkb->lkb_rqmode = DLM_LOCK_CW;
1381        else {
1382                log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1383                dlm_print_lkb(lkb);
1384        }
1385}
1386
1387static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1388{
1389        struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1390                                           lkb_statequeue);
1391        if (lkb->lkb_id == first->lkb_id)
1392                return 1;
1393
1394        return 0;
1395}
1396
1397/* Check if the given lkb conflicts with another lkb on the queue. */
1398
1399static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1400{
1401        struct dlm_lkb *this;
1402
1403        list_for_each_entry(this, head, lkb_statequeue) {
1404                if (this == lkb)
1405                        continue;
1406                if (!modes_compat(this, lkb))
1407                        return 1;
1408        }
1409        return 0;
1410}
1411
1412/*
1413 * "A conversion deadlock arises with a pair of lock requests in the converting
1414 * queue for one resource.  The granted mode of each lock blocks the requested
1415 * mode of the other lock."
1416 *
1417 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1418 * convert queue from being granted, then deadlk/demote lkb.
1419 *
1420 * Example:
1421 * Granted Queue: empty
1422 * Convert Queue: NL->EX (first lock)
1423 *                PR->EX (second lock)
1424 *
1425 * The first lock can't be granted because of the granted mode of the second
1426 * lock and the second lock can't be granted because it's not first in the
1427 * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1428 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1429 * flag set and return DEMOTED in the lksb flags.
1430 *
1431 * Originally, this function detected conv-deadlk in a more limited scope:
1432 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1433 * - if lkb1 was the first entry in the queue (not just earlier), and was
1434 *   blocked by the granted mode of lkb2, and there was nothing on the
1435 *   granted queue preventing lkb1 from being granted immediately, i.e.
1436 *   lkb2 was the only thing preventing lkb1 from being granted.
1437 *
1438 * That second condition meant we'd only say there was conv-deadlk if
1439 * resolving it (by demotion) would lead to the first lock on the convert
1440 * queue being granted right away.  It allowed conversion deadlocks to exist
1441 * between locks on the convert queue while they couldn't be granted anyway.
1442 *
1443 * Now, we detect and take action on conversion deadlocks immediately when
1444 * they're created, even if they may not be immediately consequential.  If
1445 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1446 * mode that would prevent lkb1's conversion from being granted, we do a
1447 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1448 * I think this means that the lkb_is_ahead condition below should always
1449 * be zero, i.e. there will never be conv-deadlk between two locks that are
1450 * both already on the convert queue.
1451 */
1452
1453static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1454{
1455        struct dlm_lkb *lkb1;
1456        int lkb_is_ahead = 0;
1457
1458        list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1459                if (lkb1 == lkb2) {
1460                        lkb_is_ahead = 1;
1461                        continue;
1462                }
1463
1464                if (!lkb_is_ahead) {
1465                        if (!modes_compat(lkb2, lkb1))
1466                                return 1;
1467                } else {
1468                        if (!modes_compat(lkb2, lkb1) &&
1469                            !modes_compat(lkb1, lkb2))
1470                                return 1;
1471                }
1472        }
1473        return 0;
1474}
1475
1476/*
1477 * Return 1 if the lock can be granted, 0 otherwise.
1478 * Also detect and resolve conversion deadlocks.
1479 *
1480 * lkb is the lock to be granted
1481 *
1482 * now is 1 if the function is being called in the context of the
1483 * immediate request, it is 0 if called later, after the lock has been
1484 * queued.
1485 *
1486 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1487 */
1488
1489static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1490{
1491        int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1492
1493        /*
1494         * 6-10: Version 5.4 introduced an option to address the phenomenon of
1495         * a new request for a NL mode lock being blocked.
1496         *
1497         * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1498         * request, then it would be granted.  In essence, the use of this flag
1499         * tells the Lock Manager to expedite theis request by not considering
1500         * what may be in the CONVERTING or WAITING queues...  As of this
1501         * writing, the EXPEDITE flag can be used only with new requests for NL
1502         * mode locks.  This flag is not valid for conversion requests.
1503         *
1504         * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1505         * conversion or used with a non-NL requested mode.  We also know an
1506         * EXPEDITE request is always granted immediately, so now must always
1507         * be 1.  The full condition to grant an expedite request: (now &&
1508         * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1509         * therefore be shortened to just checking the flag.
1510         */
1511
1512        if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1513                return 1;
1514
1515        /*
1516         * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1517         * added to the remaining conditions.
1518         */
1519
1520        if (queue_conflict(&r->res_grantqueue, lkb))
1521                goto out;
1522
1523        /*
1524         * 6-3: By default, a conversion request is immediately granted if the
1525         * requested mode is compatible with the modes of all other granted
1526         * locks
1527         */
1528
1529        if (queue_conflict(&r->res_convertqueue, lkb))
1530                goto out;
1531
1532        /*
1533         * 6-5: But the default algorithm for deciding whether to grant or
1534         * queue conversion requests does not by itself guarantee that such
1535         * requests are serviced on a "first come first serve" basis.  This, in
1536         * turn, can lead to a phenomenon known as "indefinate postponement".
1537         *
1538         * 6-7: This issue is dealt with by using the optional QUECVT flag with
1539         * the system service employed to request a lock conversion.  This flag
1540         * forces certain conversion requests to be queued, even if they are
1541         * compatible with the granted modes of other locks on the same
1542         * resource.  Thus, the use of this flag results in conversion requests
1543         * being ordered on a "first come first servce" basis.
1544         *
1545         * DCT: This condition is all about new conversions being able to occur
1546         * "in place" while the lock remains on the granted queue (assuming
1547         * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1548         * doesn't _have_ to go onto the convert queue where it's processed in
1549         * order.  The "now" variable is necessary to distinguish converts
1550         * being received and processed for the first time now, because once a
1551         * convert is moved to the conversion queue the condition below applies
1552         * requiring fifo granting.
1553         */
1554
1555        if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1556                return 1;
1557
1558        /*
1559         * The NOORDER flag is set to avoid the standard vms rules on grant
1560         * order.
1561         */
1562
1563        if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1564                return 1;
1565
1566        /*
1567         * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1568         * granted until all other conversion requests ahead of it are granted
1569         * and/or canceled.
1570         */
1571
1572        if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1573                return 1;
1574
1575        /*
1576         * 6-4: By default, a new request is immediately granted only if all
1577         * three of the following conditions are satisfied when the request is
1578         * issued:
1579         * - The queue of ungranted conversion requests for the resource is
1580         *   empty.
1581         * - The queue of ungranted new requests for the resource is empty.
1582         * - The mode of the new request is compatible with the most
1583         *   restrictive mode of all granted locks on the resource.
1584         */
1585
1586        if (now && !conv && list_empty(&r->res_convertqueue) &&
1587            list_empty(&r->res_waitqueue))
1588                return 1;
1589
1590        /*
1591         * 6-4: Once a lock request is in the queue of ungranted new requests,
1592         * it cannot be granted until the queue of ungranted conversion
1593         * requests is empty, all ungranted new requests ahead of it are
1594         * granted and/or canceled, and it is compatible with the granted mode
1595         * of the most restrictive lock granted on the resource.
1596         */
1597
1598        if (!now && !conv && list_empty(&r->res_convertqueue) &&
1599            first_in_list(lkb, &r->res_waitqueue))
1600                return 1;
1601 out:
1602        return 0;
1603}
1604
1605static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1606                          int *err)
1607{
1608        int rv;
1609        int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1610        int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1611
1612        if (err)
1613                *err = 0;
1614
1615        rv = _can_be_granted(r, lkb, now);
1616        if (rv)
1617                goto out;
1618
1619        /*
1620         * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1621         * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1622         * cancels one of the locks.
1623         */
1624
1625        if (is_convert && can_be_queued(lkb) &&
1626            conversion_deadlock_detect(r, lkb)) {
1627                if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1628                        lkb->lkb_grmode = DLM_LOCK_NL;
1629                        lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1630                } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1631                        if (err)
1632                                *err = -EDEADLK;
1633                        else {
1634                                log_print("can_be_granted deadlock %x now %d",
1635                                          lkb->lkb_id, now);
1636                                dlm_dump_rsb(r);
1637                        }
1638                }
1639                goto out;
1640        }
1641
1642        /*
1643         * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1644         * to grant a request in a mode other than the normal rqmode.  It's a
1645         * simple way to provide a big optimization to applications that can
1646         * use them.
1647         */
1648
1649        if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1650                alt = DLM_LOCK_PR;
1651        else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1652                alt = DLM_LOCK_CW;
1653
1654        if (alt) {
1655                lkb->lkb_rqmode = alt;
1656                rv = _can_be_granted(r, lkb, now);
1657                if (rv)
1658                        lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1659                else
1660                        lkb->lkb_rqmode = rqmode;
1661        }
1662 out:
1663        return rv;
1664}
1665
1666/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1667   for locks pending on the convert list.  Once verified (watch for these
1668   log_prints), we should be able to just call _can_be_granted() and not
1669   bother with the demote/deadlk cases here (and there's no easy way to deal
1670   with a deadlk here, we'd have to generate something like grant_lock with
1671   the deadlk error.) */
1672
1673/* Returns the highest requested mode of all blocked conversions; sets
1674   cw if there's a blocked conversion to DLM_LOCK_CW. */
1675
1676static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1677{
1678        struct dlm_lkb *lkb, *s;
1679        int hi, demoted, quit, grant_restart, demote_restart;
1680        int deadlk;
1681
1682        quit = 0;
1683 restart:
1684        grant_restart = 0;
1685        demote_restart = 0;
1686        hi = DLM_LOCK_IV;
1687
1688        list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1689                demoted = is_demoted(lkb);
1690                deadlk = 0;
1691
1692                if (can_be_granted(r, lkb, 0, &deadlk)) {
1693                        grant_lock_pending(r, lkb);
1694                        grant_restart = 1;
1695                        continue;
1696                }
1697
1698                if (!demoted && is_demoted(lkb)) {
1699                        log_print("WARN: pending demoted %x node %d %s",
1700                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1701                        demote_restart = 1;
1702                        continue;
1703                }
1704
1705                if (deadlk) {
1706                        log_print("WARN: pending deadlock %x node %d %s",
1707                                  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1708                        dlm_dump_rsb(r);
1709                        continue;
1710                }
1711
1712                hi = max_t(int, lkb->lkb_rqmode, hi);
1713
1714                if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1715                        *cw = 1;
1716        }
1717
1718        if (grant_restart)
1719                goto restart;
1720        if (demote_restart && !quit) {
1721                quit = 1;
1722                goto restart;
1723        }
1724
1725        return max_t(int, high, hi);
1726}
1727
1728static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1729{
1730        struct dlm_lkb *lkb, *s;
1731
1732        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1733                if (can_be_granted(r, lkb, 0, NULL))
1734                        grant_lock_pending(r, lkb);
1735                else {
1736                        high = max_t(int, lkb->lkb_rqmode, high);
1737                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
1738                                *cw = 1;
1739                }
1740        }
1741
1742        return high;
1743}
1744
1745/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1746   on either the convert or waiting queue.
1747   high is the largest rqmode of all locks blocked on the convert or
1748   waiting queue. */
1749
1750static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1751{
1752        if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1753                if (gr->lkb_highbast < DLM_LOCK_EX)
1754                        return 1;
1755                return 0;
1756        }
1757
1758        if (gr->lkb_highbast < high &&
1759            !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1760                return 1;
1761        return 0;
1762}
1763
1764static void grant_pending_locks(struct dlm_rsb *r)
1765{
1766        struct dlm_lkb *lkb, *s;
1767        int high = DLM_LOCK_IV;
1768        int cw = 0;
1769
1770        DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1771
1772        high = grant_pending_convert(r, high, &cw);
1773        high = grant_pending_wait(r, high, &cw);
1774
1775        if (high == DLM_LOCK_IV)
1776                return;
1777
1778        /*
1779         * If there are locks left on the wait/convert queue then send blocking
1780         * ASTs to granted locks based on the largest requested mode (high)
1781         * found above.
1782         */
1783
1784        list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1785                if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1786                        if (cw && high == DLM_LOCK_PR &&
1787                            lkb->lkb_grmode == DLM_LOCK_PR)
1788                                queue_bast(r, lkb, DLM_LOCK_CW);
1789                        else
1790                                queue_bast(r, lkb, high);
1791                        lkb->lkb_highbast = high;
1792                }
1793        }
1794}
1795
1796static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1797{
1798        if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1799            (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1800                if (gr->lkb_highbast < DLM_LOCK_EX)
1801                        return 1;
1802                return 0;
1803        }
1804
1805        if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1806                return 1;
1807        return 0;
1808}
1809
1810static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1811                            struct dlm_lkb *lkb)
1812{
1813        struct dlm_lkb *gr;
1814
1815        list_for_each_entry(gr, head, lkb_statequeue) {
1816                if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1817                        queue_bast(r, gr, lkb->lkb_rqmode);
1818                        gr->lkb_highbast = lkb->lkb_rqmode;
1819                }
1820        }
1821}
1822
1823static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1824{
1825        send_bast_queue(r, &r->res_grantqueue, lkb);
1826}
1827
1828static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1829{
1830        send_bast_queue(r, &r->res_grantqueue, lkb);
1831        send_bast_queue(r, &r->res_convertqueue, lkb);
1832}
1833
1834/* set_master(r, lkb) -- set the master nodeid of a resource
1835
1836   The purpose of this function is to set the nodeid field in the given
1837   lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1838   known, it can just be copied to the lkb and the function will return
1839   0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1840   before it can be copied to the lkb.
1841
1842   When the rsb nodeid is being looked up remotely, the initial lkb
1843   causing the lookup is kept on the ls_waiters list waiting for the
1844   lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1845   on the rsb's res_lookup list until the master is verified.
1846
1847   Return values:
1848   0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1849   1: the rsb master is not available and the lkb has been placed on
1850      a wait queue
1851*/
1852
1853static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1854{
1855        struct dlm_ls *ls = r->res_ls;
1856        int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1857
1858        if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1859                rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1860                r->res_first_lkid = lkb->lkb_id;
1861                lkb->lkb_nodeid = r->res_nodeid;
1862                return 0;
1863        }
1864
1865        if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1866                list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1867                return 1;
1868        }
1869
1870        if (r->res_nodeid == 0) {
1871                lkb->lkb_nodeid = 0;
1872                return 0;
1873        }
1874
1875        if (r->res_nodeid > 0) {
1876                lkb->lkb_nodeid = r->res_nodeid;
1877                return 0;
1878        }
1879
1880        DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1881
1882        dir_nodeid = dlm_dir_nodeid(r);
1883
1884        if (dir_nodeid != our_nodeid) {
1885                r->res_first_lkid = lkb->lkb_id;
1886                send_lookup(r, lkb);
1887                return 1;
1888        }
1889
1890        for (i = 0; i < 2; i++) {
1891                /* It's possible for dlm_scand to remove an old rsb for
1892                   this same resource from the toss list, us to create
1893                   a new one, look up the master locally, and find it
1894                   already exists just before dlm_scand does the
1895                   dir_remove() on the previous rsb. */
1896
1897                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1898                                       r->res_length, &ret_nodeid);
1899                if (!error)
1900                        break;
1901                log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1902                schedule();
1903        }
1904        if (error && error != -EEXIST)
1905                return error;
1906
1907        if (ret_nodeid == our_nodeid) {
1908                r->res_first_lkid = 0;
1909                r->res_nodeid = 0;
1910                lkb->lkb_nodeid = 0;
1911        } else {
1912                r->res_first_lkid = lkb->lkb_id;
1913                r->res_nodeid = ret_nodeid;
1914                lkb->lkb_nodeid = ret_nodeid;
1915        }
1916        return 0;
1917}
1918
1919static void process_lookup_list(struct dlm_rsb *r)
1920{
1921        struct dlm_lkb *lkb, *safe;
1922
1923        list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1924                list_del_init(&lkb->lkb_rsb_lookup);
1925                _request_lock(r, lkb);
1926                schedule();
1927        }
1928}
1929
1930/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1931
1932static void confirm_master(struct dlm_rsb *r, int error)
1933{
1934        struct dlm_lkb *lkb;
1935
1936        if (!r->res_first_lkid)
1937                return;
1938
1939        switch (error) {
1940        case 0:
1941        case -EINPROGRESS:
1942                r->res_first_lkid = 0;
1943                process_lookup_list(r);
1944                break;
1945
1946        case -EAGAIN:
1947        case -EBADR:
1948        case -ENOTBLK:
1949                /* the remote request failed and won't be retried (it was
1950                   a NOQUEUE, or has been canceled/unlocked); make a waiting
1951                   lkb the first_lkid */
1952
1953                r->res_first_lkid = 0;
1954
1955                if (!list_empty(&r->res_lookup)) {
1956                        lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1957                                         lkb_rsb_lookup);
1958                        list_del_init(&lkb->lkb_rsb_lookup);
1959                        r->res_first_lkid = lkb->lkb_id;
1960                        _request_lock(r, lkb);
1961                }
1962                break;
1963
1964        default:
1965                log_error(r->res_ls, "confirm_master unknown error %d", error);
1966        }
1967}
1968
1969static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1970                         int namelen, unsigned long timeout_cs,
1971                         void (*ast) (void *astparam),
1972                         void *astparam,
1973                         void (*bast) (void *astparam, int mode),
1974                         struct dlm_args *args)
1975{
1976        int rv = -EINVAL;
1977
1978        /* check for invalid arg usage */
1979
1980        if (mode < 0 || mode > DLM_LOCK_EX)
1981                goto out;
1982
1983        if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1984                goto out;
1985
1986        if (flags & DLM_LKF_CANCEL)
1987                goto out;
1988
1989        if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1990                goto out;
1991
1992        if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1993                goto out;
1994
1995        if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1996                goto out;
1997
1998        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1999                goto out;
2000
2001        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2002                goto out;
2003
2004        if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2005                goto out;
2006
2007        if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2008                goto out;
2009
2010        if (!ast || !lksb)
2011                goto out;
2012
2013        if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2014                goto out;
2015
2016        if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2017                goto out;
2018
2019        /* these args will be copied to the lkb in validate_lock_args,
2020           it cannot be done now because when converting locks, fields in
2021           an active lkb cannot be modified before locking the rsb */
2022
2023        args->flags = flags;
2024        args->astfn = ast;
2025        args->astparam = astparam;
2026        args->bastfn = bast;
2027        args->timeout = timeout_cs;
2028        args->mode = mode;
2029        args->lksb = lksb;
2030        rv = 0;
2031 out:
2032        return rv;
2033}
2034
2035static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2036{
2037        if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2038                       DLM_LKF_FORCEUNLOCK))
2039                return -EINVAL;
2040
2041        if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2042                return -EINVAL;
2043
2044        args->flags = flags;
2045        args->astparam = astarg;
2046        return 0;
2047}
2048
2049static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2050                              struct dlm_args *args)
2051{
2052        int rv = -EINVAL;
2053
2054        if (args->flags & DLM_LKF_CONVERT) {
2055                if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2056                        goto out;
2057
2058                if (args->flags & DLM_LKF_QUECVT &&
2059                    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2060                        goto out;
2061
2062                rv = -EBUSY;
2063                if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2064                        goto out;
2065
2066                if (lkb->lkb_wait_type)
2067                        goto out;
2068
2069                if (is_overlap(lkb))
2070                        goto out;
2071        }
2072
2073        lkb->lkb_exflags = args->flags;
2074        lkb->lkb_sbflags = 0;
2075        lkb->lkb_astfn = args->astfn;
2076        lkb->lkb_astparam = args->astparam;
2077        lkb->lkb_bastfn = args->bastfn;
2078        lkb->lkb_rqmode = args->mode;
2079        lkb->lkb_lksb = args->lksb;
2080        lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2081        lkb->lkb_ownpid = (int) current->pid;
2082        lkb->lkb_timeout_cs = args->timeout;
2083        rv = 0;
2084 out:
2085        return rv;
2086}
2087
2088/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2089   for success */
2090
2091/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2092   because there may be a lookup in progress and it's valid to do
2093   cancel/unlockf on it */
2094
2095static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2096{
2097        struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2098        int rv = -EINVAL;
2099
2100        if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2101                log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2102                dlm_print_lkb(lkb);
2103                goto out;
2104        }
2105
2106        /* an lkb may still exist even though the lock is EOL'ed due to a
2107           cancel, unlock or failed noqueue request; an app can't use these
2108           locks; return same error as if the lkid had not been found at all */
2109
2110        if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2111                log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2112                rv = -ENOENT;
2113                goto out;
2114        }
2115
2116        /* an lkb may be waiting for an rsb lookup to complete where the
2117           lookup was initiated by another lock */
2118
2119        if (!list_empty(&lkb->lkb_rsb_lookup)) {
2120                if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2121                        log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2122                        list_del_init(&lkb->lkb_rsb_lookup);
2123                        queue_cast(lkb->lkb_resource, lkb,
2124                                   args->flags & DLM_LKF_CANCEL ?
2125                                   -DLM_ECANCEL : -DLM_EUNLOCK);
2126                        unhold_lkb(lkb); /* undoes create_lkb() */
2127                }
2128                /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2129                rv = -EBUSY;
2130                goto out;
2131        }
2132
2133        /* cancel not allowed with another cancel/unlock in progress */
2134
2135        if (args->flags & DLM_LKF_CANCEL) {
2136                if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2137                        goto out;
2138
2139                if (is_overlap(lkb))
2140                        goto out;
2141
2142                /* don't let scand try to do a cancel */
2143                del_timeout(lkb);
2144
2145                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2146                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2147                        rv = -EBUSY;
2148                        goto out;
2149                }
2150
2151                switch (lkb->lkb_wait_type) {
2152                case DLM_MSG_LOOKUP:
2153                case DLM_MSG_REQUEST:
2154                        lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2155                        rv = -EBUSY;
2156                        goto out;
2157                case DLM_MSG_UNLOCK:
2158                case DLM_MSG_CANCEL:
2159                        goto out;
2160                }
2161                /* add_to_waiters() will set OVERLAP_CANCEL */
2162                goto out_ok;
2163        }
2164
2165        /* do we need to allow a force-unlock if there's a normal unlock
2166           already in progress?  in what conditions could the normal unlock
2167           fail such that we'd want to send a force-unlock to be sure? */
2168
2169        if (args->flags & DLM_LKF_FORCEUNLOCK) {
2170                if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2171                        goto out;
2172
2173                if (is_overlap_unlock(lkb))
2174                        goto out;
2175
2176                /* don't let scand try to do a cancel */
2177                del_timeout(lkb);
2178
2179                if (lkb->lkb_flags & DLM_IFL_RESEND) {
2180                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2181                        rv = -EBUSY;
2182                        goto out;
2183                }
2184
2185                switch (lkb->lkb_wait_type) {
2186                case DLM_MSG_LOOKUP:
2187                case DLM_MSG_REQUEST:
2188                        lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2189                        rv = -EBUSY;
2190                        goto out;
2191                case DLM_MSG_UNLOCK:
2192                        goto out;
2193                }
2194                /* add_to_waiters() will set OVERLAP_UNLOCK */
2195                goto out_ok;
2196        }
2197
2198        /* normal unlock not allowed if there's any op in progress */
2199        rv = -EBUSY;
2200        if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2201                goto out;
2202
2203 out_ok:
2204        /* an overlapping op shouldn't blow away exflags from other op */
2205        lkb->lkb_exflags |= args->flags;
2206        lkb->lkb_sbflags = 0;
2207        lkb->lkb_astparam = args->astparam;
2208        rv = 0;
2209 out:
2210        if (rv)
2211                log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2212                          lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2213                          args->flags, lkb->lkb_wait_type,
2214                          lkb->lkb_resource->res_name);
2215        return rv;
2216}
2217
2218/*
2219 * Four stage 4 varieties:
2220 * do_request(), do_convert(), do_unlock(), do_cancel()
2221 * These are called on the master node for the given lock and
2222 * from the central locking logic.
2223 */
2224
2225static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2226{
2227        int error = 0;
2228
2229        if (can_be_granted(r, lkb, 1, NULL)) {
2230                grant_lock(r, lkb);
2231                queue_cast(r, lkb, 0);
2232                goto out;
2233        }
2234
2235        if (can_be_queued(lkb)) {
2236                error = -EINPROGRESS;
2237                add_lkb(r, lkb, DLM_LKSTS_WAITING);
2238                send_blocking_asts(r, lkb);
2239                add_timeout(lkb);
2240                goto out;
2241        }
2242
2243        error = -EAGAIN;
2244        if (force_blocking_asts(lkb))
2245                send_blocking_asts_all(r, lkb);
2246        queue_cast(r, lkb, -EAGAIN);
2247
2248 out:
2249        return error;
2250}
2251
2252static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2253{
2254        int error = 0;
2255        int deadlk = 0;
2256
2257        /* changing an existing lock may allow others to be granted */
2258
2259        if (can_be_granted(r, lkb, 1, &deadlk)) {
2260                grant_lock(r, lkb);
2261                queue_cast(r, lkb, 0);
2262                grant_pending_locks(r);
2263                goto out;
2264        }
2265
2266        /* can_be_granted() detected that this lock would block in a conversion
2267           deadlock, so we leave it on the granted queue and return EDEADLK in
2268           the ast for the convert. */
2269
2270        if (deadlk) {
2271                /* it's left on the granted queue */
2272                log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2273                          lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2274                          lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2275                revert_lock(r, lkb);
2276                queue_cast(r, lkb, -EDEADLK);
2277                error = -EDEADLK;
2278                goto out;
2279        }
2280
2281        /* is_demoted() means the can_be_granted() above set the grmode
2282           to NL, and left us on the granted queue.  This auto-demotion
2283           (due to CONVDEADLK) might mean other locks, and/or this lock, are
2284           now grantable.  We have to try to grant other converting locks
2285           before we try again to grant this one. */
2286
2287        if (is_demoted(lkb)) {
2288                grant_pending_convert(r, DLM_LOCK_IV, NULL);
2289                if (_can_be_granted(r, lkb, 1)) {
2290                        grant_lock(r, lkb);
2291                        queue_cast(r, lkb, 0);
2292                        grant_pending_locks(r);
2293                        goto out;
2294                }
2295                /* else fall through and move to convert queue */
2296        }
2297
2298        if (can_be_queued(lkb)) {
2299                error = -EINPROGRESS;
2300                del_lkb(r, lkb);
2301                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2302                send_blocking_asts(r, lkb);
2303                add_timeout(lkb);
2304                goto out;
2305        }
2306
2307        error = -EAGAIN;
2308        if (force_blocking_asts(lkb))
2309                send_blocking_asts_all(r, lkb);
2310        queue_cast(r, lkb, -EAGAIN);
2311
2312 out:
2313        return error;
2314}
2315
2316static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2317{
2318        remove_lock(r, lkb);
2319        queue_cast(r, lkb, -DLM_EUNLOCK);
2320        grant_pending_locks(r);
2321        return -DLM_EUNLOCK;
2322}
2323
2324/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2325 
2326static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2327{
2328        int error;
2329
2330        error = revert_lock(r, lkb);
2331        if (error) {
2332                queue_cast(r, lkb, -DLM_ECANCEL);
2333                grant_pending_locks(r);
2334                return -DLM_ECANCEL;
2335        }
2336        return 0;
2337}
2338
2339/*
2340 * Four stage 3 varieties:
2341 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2342 */
2343
2344/* add a new lkb to a possibly new rsb, called by requesting process */
2345
2346static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2347{
2348        int error;
2349
2350        /* set_master: sets lkb nodeid from r */
2351
2352        error = set_master(r, lkb);
2353        if (error < 0)
2354                goto out;
2355        if (error) {
2356                error = 0;
2357                goto out;
2358        }
2359
2360        if (is_remote(r))
2361                /* receive_request() calls do_request() on remote node */
2362                error = send_request(r, lkb);
2363        else
2364                error = do_request(r, lkb);
2365 out:
2366        return error;
2367}
2368
2369/* change some property of an existing lkb, e.g. mode */
2370
2371static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2372{
2373        int error;
2374
2375        if (is_remote(r))
2376                /* receive_convert() calls do_convert() on remote node */
2377                error = send_convert(r, lkb);
2378        else
2379                error = do_convert(r, lkb);
2380
2381        return error;
2382}
2383
2384/* remove an existing lkb from the granted queue */
2385
2386static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2387{
2388        int error;
2389
2390        if (is_remote(r))
2391                /* receive_unlock() calls do_unlock() on remote node */
2392                error = send_unlock(r, lkb);
2393        else
2394                error = do_unlock(r, lkb);
2395
2396        return error;
2397}
2398
2399/* remove an existing lkb from the convert or wait queue */
2400
2401static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2402{
2403        int error;
2404
2405        if (is_remote(r))
2406                /* receive_cancel() calls do_cancel() on remote node */
2407                error = send_cancel(r, lkb);
2408        else
2409                error = do_cancel(r, lkb);
2410
2411        return error;
2412}
2413
2414/*
2415 * Four stage 2 varieties:
2416 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2417 */
2418
2419static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2420                        int len, struct dlm_args *args)
2421{
2422        struct dlm_rsb *r;
2423        int error;
2424
2425        error = validate_lock_args(ls, lkb, args);
2426        if (error)
2427                goto out;
2428
2429        error = find_rsb(ls, name, len, R_CREATE, &r);
2430        if (error)
2431                goto out;
2432
2433        lock_rsb(r);
2434
2435        attach_lkb(r, lkb);
2436        lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2437
2438        error = _request_lock(r, lkb);
2439
2440        unlock_rsb(r);
2441        put_rsb(r);
2442
2443 out:
2444        return error;
2445}
2446
2447static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2448                        struct dlm_args *args)
2449{
2450        struct dlm_rsb *r;
2451        int error;
2452
2453        r = lkb->lkb_resource;
2454
2455        hold_rsb(r);
2456        lock_rsb(r);
2457
2458        error = validate_lock_args(ls, lkb, args);
2459        if (error)
2460                goto out;
2461
2462        error = _convert_lock(r, lkb);
2463 out:
2464        unlock_rsb(r);
2465        put_rsb(r);
2466        return error;
2467}
2468
2469static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2470                       struct dlm_args *args)
2471{
2472        struct dlm_rsb *r;
2473        int error;
2474
2475        r = lkb->lkb_resource;
2476
2477        hold_rsb(r);
2478        lock_rsb(r);
2479
2480        error = validate_unlock_args(lkb, args);
2481        if (error)
2482                goto out;
2483
2484        error = _unlock_lock(r, lkb);
2485 out:
2486        unlock_rsb(r);
2487        put_rsb(r);
2488        return error;
2489}
2490
2491static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2492                       struct dlm_args *args)
2493{
2494        struct dlm_rsb *r;
2495        int error;
2496
2497        r = lkb->lkb_resource;
2498
2499        hold_rsb(r);
2500        lock_rsb(r);
2501
2502        error = validate_unlock_args(lkb, args);
2503        if (error)
2504                goto out;
2505
2506        error = _cancel_lock(r, lkb);
2507 out:
2508        unlock_rsb(r);
2509        put_rsb(r);
2510        return error;
2511}
2512
2513/*
2514 * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2515 */
2516
2517int dlm_lock(dlm_lockspace_t *lockspace,
2518             int mode,
2519             struct dlm_lksb *lksb,
2520             uint32_t flags,
2521             void *name,
2522             unsigned int namelen,
2523             uint32_t parent_lkid,
2524             void (*ast) (void *astarg),
2525             void *astarg,
2526             void (*bast) (void *astarg, int mode))
2527{
2528        struct dlm_ls *ls;
2529        struct dlm_lkb *lkb;
2530        struct dlm_args args;
2531        int error, convert = flags & DLM_LKF_CONVERT;
2532
2533        ls = dlm_find_lockspace_local(lockspace);
2534        if (!ls)
2535                return -EINVAL;
2536
2537        dlm_lock_recovery(ls);
2538
2539        if (convert)
2540                error = find_lkb(ls, lksb->sb_lkid, &lkb);
2541        else
2542                error = create_lkb(ls, &lkb);
2543
2544        if (error)
2545                goto out;
2546
2547        error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2548                              astarg, bast, &args);
2549        if (error)
2550                goto out_put;
2551
2552        if (convert)
2553                error = convert_lock(ls, lkb, &args);
2554        else
2555                error = request_lock(ls, lkb, name, namelen, &args);
2556
2557        if (error == -EINPROGRESS)
2558                error = 0;
2559 out_put:
2560        if (convert || error)
2561                __put_lkb(ls, lkb);
2562        if (error == -EAGAIN || error == -EDEADLK)
2563                error = 0;
2564 out:
2565        dlm_unlock_recovery(ls);
2566        dlm_put_lockspace(ls);
2567        return error;
2568}
2569
2570int dlm_unlock(dlm_lockspace_t *lockspace,
2571               uint32_t lkid,
2572               uint32_t flags,
2573               struct dlm_lksb *lksb,
2574               void *astarg)
2575{
2576        struct dlm_ls *ls;
2577        struct dlm_lkb *lkb;
2578        struct dlm_args args;
2579        int error;
2580
2581        ls = dlm_find_lockspace_local(lockspace);
2582        if (!ls)
2583                return -EINVAL;
2584
2585        dlm_lock_recovery(ls);
2586
2587        error = find_lkb(ls, lkid, &lkb);
2588        if (error)
2589                goto out;
2590
2591        error = set_unlock_args(flags, astarg, &args);
2592        if (error)
2593                goto out_put;
2594
2595        if (flags & DLM_LKF_CANCEL)
2596                error = cancel_lock(ls, lkb, &args);
2597        else
2598                error = unlock_lock(ls, lkb, &args);
2599
2600        if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2601                error = 0;
2602        if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2603                error = 0;
2604 out_put:
2605        dlm_put_lkb(lkb);
2606 out:
2607        dlm_unlock_recovery(ls);
2608        dlm_put_lockspace(ls);
2609        return error;
2610}
2611
2612/*
2613 * send/receive routines for remote operations and replies
2614 *
2615 * send_args
2616 * send_common
2617 * send_request                        receive_request
2618 * send_convert                        receive_convert
2619 * send_unlock                        receive_unlock
2620 * send_cancel                        receive_cancel
2621 * send_grant                        receive_grant
2622 * send_bast                        receive_bast
2623 * send_lookup                        receive_lookup
2624 * send_remove                        receive_remove
2625 *
2626 *                                 send_common_reply
2627 * receive_request_reply        send_request_reply
2628 * receive_convert_reply        send_convert_reply
2629 * receive_unlock_reply                send_unlock_reply
2630 * receive_cancel_reply                send_cancel_reply
2631 * receive_lookup_reply                send_lookup_reply
2632 */
2633
2634static int _create_message(struct dlm_ls *ls, int mb_len,
2635                           int to_nodeid, int mstype,
2636                           struct dlm_message **ms_ret,
2637                           struct dlm_mhandle **mh_ret)
2638{
2639        struct dlm_message *ms;
2640        struct dlm_mhandle *mh;
2641        char *mb;
2642
2643        /* get_buffer gives us a message handle (mh) that we need to
2644           pass into lowcomms_commit and a message buffer (mb) that we
2645           write our data into */
2646
2647        mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2648        if (!mh)
2649                return -ENOBUFS;
2650
2651        memset(mb, 0, mb_len);
2652
2653        ms = (struct dlm_message *) mb;
2654
2655        ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2656        ms->m_header.h_lockspace = ls->ls_global_id;
2657        ms->m_header.h_nodeid = dlm_our_nodeid();
2658        ms->m_header.h_length = mb_len;
2659        ms->m_header.h_cmd = DLM_MSG;
2660
2661        ms->m_type = mstype;
2662
2663        *mh_ret = mh;
2664        *ms_ret = ms;
2665        return 0;
2666}
2667
2668static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2669                          int to_nodeid, int mstype,
2670                          struct dlm_message **ms_ret,
2671                          struct dlm_mhandle **mh_ret)
2672{
2673        int mb_len = sizeof(struct dlm_message);
2674
2675        switch (mstype) {
2676        case DLM_MSG_REQUEST:
2677        case DLM_MSG_LOOKUP:
2678        case DLM_MSG_REMOVE:
2679                mb_len += r->res_length;
2680                break;
2681        case DLM_MSG_CONVERT:
2682        case DLM_MSG_UNLOCK:
2683        case DLM_MSG_REQUEST_REPLY:
2684        case DLM_MSG_CONVERT_REPLY:
2685        case DLM_MSG_GRANT:
2686                if (lkb && lkb->lkb_lvbptr)
2687                        mb_len += r->res_ls->ls_lvblen;
2688                break;
2689        }
2690
2691        return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2692                               ms_ret, mh_ret);
2693}
2694
2695/* further lowcomms enhancements or alternate implementations may make
2696   the return value from this function useful at some point */
2697
2698static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2699{
2700        dlm_message_out(ms);
2701        dlm_lowcomms_commit_buffer(mh);
2702        return 0;
2703}
2704
2705static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2706                      struct dlm_message *ms)
2707{
2708        ms->m_nodeid   = lkb->lkb_nodeid;
2709        ms->m_pid      = lkb->lkb_ownpid;
2710        ms->m_lkid     = lkb->lkb_id;
2711        ms->m_remid    = lkb->lkb_remid;
2712        ms->m_exflags  = lkb->lkb_exflags;
2713        ms->m_sbflags  = lkb->lkb_sbflags;
2714        ms->m_flags    = lkb->lkb_flags;
2715        ms->m_lvbseq   = lkb->lkb_lvbseq;
2716        ms->m_status   = lkb->lkb_status;
2717        ms->m_grmode   = lkb->lkb_grmode;
2718        ms->m_rqmode   = lkb->lkb_rqmode;
2719        ms->m_hash     = r->res_hash;
2720
2721        /* m_result and m_bastmode are set from function args,
2722           not from lkb fields */
2723
2724        if (lkb->lkb_bastfn)
2725                ms->m_asts |= AST_BAST;
2726        if (lkb->lkb_astfn)
2727                ms->m_asts |= AST_COMP;
2728
2729        /* compare with switch in create_message; send_remove() doesn't
2730           use send_args() */
2731
2732        switch (ms->m_type) {
2733        case DLM_MSG_REQUEST:
2734        case DLM_MSG_LOOKUP:
2735                memcpy(ms->m_extra, r->res_name, r->res_length);
2736                break;
2737        case DLM_MSG_CONVERT:
2738        case DLM_MSG_UNLOCK:
2739        case DLM_MSG_REQUEST_REPLY:
2740        case DLM_MSG_CONVERT_REPLY:
2741        case DLM_MSG_GRANT:
2742                if (!lkb->lkb_lvbptr)
2743                        break;
2744                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2745                break;
2746        }
2747}
2748
2749static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2750{
2751        struct dlm_message *ms;
2752        struct dlm_mhandle *mh;
2753        int to_nodeid, error;
2754
2755        error = add_to_waiters(lkb, mstype);
2756        if (error)
2757                return error;
2758
2759        to_nodeid = r->res_nodeid;
2760
2761        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2762        if (error)
2763                goto fail;
2764
2765        send_args(r, lkb, ms);
2766
2767        error = send_message(mh, ms);
2768        if (error)
2769                goto fail;
2770        return 0;
2771
2772 fail:
2773        remove_from_waiters(lkb, msg_reply_type(mstype));
2774        return error;
2775}
2776
2777static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2778{
2779        return send_common(r, lkb, DLM_MSG_REQUEST);
2780}
2781
2782static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2783{
2784        int error;
2785
2786        error = send_common(r, lkb, DLM_MSG_CONVERT);
2787
2788        /* down conversions go without a reply from the master */
2789        if (!error && down_conversion(lkb)) {
2790                remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2791                r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2792                r->res_ls->ls_stub_ms.m_result = 0;
2793                r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2794                __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2795        }
2796
2797        return error;
2798}
2799
2800/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2801   MASTER_UNCERTAIN to force the next request on the rsb to confirm
2802   that the master is still correct. */
2803
2804static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2805{
2806        return send_common(r, lkb, DLM_MSG_UNLOCK);
2807}
2808
2809static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2810{
2811        return send_common(r, lkb, DLM_MSG_CANCEL);
2812}
2813
2814static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2815{
2816        struct dlm_message *ms;
2817        struct dlm_mhandle *mh;
2818        int to_nodeid, error;
2819
2820        to_nodeid = lkb->lkb_nodeid;
2821
2822        error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2823        if (error)
2824                goto out;
2825
2826        send_args(r, lkb, ms);
2827
2828        ms->m_result = 0;
2829
2830        error = send_message(mh, ms);
2831 out:
2832        return error;
2833}
2834
2835static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2836{
2837        struct dlm_message *ms;
2838        struct dlm_mhandle *mh;
2839        int to_nodeid, error;
2840
2841        to_nodeid = lkb->lkb_nodeid;
2842
2843        error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2844        if (error)
2845                goto out;
2846
2847        send_args(r, lkb, ms);
2848
2849        ms->m_bastmode = mode;
2850
2851        error = send_message(mh, ms);
2852 out:
2853        return error;
2854}
2855
2856static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2857{
2858        struct dlm_message *ms;
2859        struct dlm_mhandle *mh;
2860        int to_nodeid, error;
2861
2862        error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2863        if (error)
2864                return error;
2865
2866        to_nodeid = dlm_dir_nodeid(r);
2867
2868        error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2869        if (error)
2870                goto fail;
2871
2872        send_args(r, lkb, ms);
2873
2874        error = send_message(mh, ms);
2875        if (error)
2876                goto fail;
2877        return 0;
2878
2879 fail:
2880        remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2881        return error;
2882}
2883
2884static int send_remove(struct dlm_rsb *r)
2885{
2886        struct dlm_message *ms;
2887        struct dlm_mhandle *mh;
2888        int to_nodeid, error;
2889
2890        to_nodeid = dlm_dir_nodeid(r);
2891
2892        error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2893        if (error)
2894                goto out;
2895
2896        memcpy(ms->m_extra, r->res_name, r->res_length);
2897        ms->m_hash = r->res_hash;
2898
2899        error = send_message(mh, ms);
2900 out:
2901        return error;
2902}
2903
2904static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2905                             int mstype, int rv)
2906{
2907        struct dlm_message *ms;
2908        struct dlm_mhandle *mh;
2909        int to_nodeid, error;
2910
2911        to_nodeid = lkb->lkb_nodeid;
2912
2913        error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2914        if (error)
2915                goto out;
2916
2917        send_args(r, lkb, ms);
2918
2919        ms->m_result = rv;
2920
2921        error = send_message(mh, ms);
2922 out:
2923        return error;
2924}
2925
2926static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2927{
2928        return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2929}
2930
2931static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2932{
2933        return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2934}
2935
2936static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2937{
2938        return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2939}
2940
2941static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2942{
2943        return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2944}
2945
2946static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2947                             int ret_nodeid, int rv)
2948{
2949        struct dlm_rsb *r = &ls->ls_stub_rsb;
2950        struct dlm_message *ms;
2951        struct dlm_mhandle *mh;
2952        int error, nodeid = ms_in->m_header.h_nodeid;
2953
2954        error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2955        if (error)
2956                goto out;
2957
2958        ms->m_lkid = ms_in->m_lkid;
2959        ms->m_result = rv;
2960        ms->m_nodeid = ret_nodeid;
2961
2962        error = send_message(mh, ms);
2963 out:
2964        return error;
2965}
2966
2967/* which args we save from a received message depends heavily on the type
2968   of message, unlike the send side where we can safely send everything about
2969   the lkb for any type of message */
2970
2971static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2972{
2973        lkb->lkb_exflags = ms->m_exflags;
2974        lkb->lkb_sbflags = ms->m_sbflags;
2975        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2976                         (ms->m_flags & 0x0000FFFF);
2977}
2978
2979static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2980{
2981        lkb->lkb_sbflags = ms->m_sbflags;
2982        lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2983                         (ms->m_flags & 0x0000FFFF);
2984}
2985
2986static int receive_extralen(struct dlm_message *ms)
2987{
2988        return (ms->m_header.h_length - sizeof(struct dlm_message));
2989}
2990
2991static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2992                       struct dlm_message *ms)
2993{
2994        int len;
2995
2996        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2997                if (!lkb->lkb_lvbptr)
2998                        lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2999                if (!lkb->lkb_lvbptr)
3000                        return -ENOMEM;
3001                len = receive_extralen(ms);
3002                if (len > DLM_RESNAME_MAXLEN)
3003                        len = DLM_RESNAME_MAXLEN;
3004                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3005        }
3006        return 0;
3007}
3008
3009static void fake_bastfn(void *astparam, int mode)
3010{
3011        log_print("fake_bastfn should not be called");
3012}
3013
3014static void fake_astfn(void *astparam)
3015{
3016        log_print("fake_astfn should not be called");
3017}
3018
3019static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3020                                struct dlm_message *ms)
3021{
3022        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3023        lkb->lkb_ownpid = ms->m_pid;
3024        lkb->lkb_remid = ms->m_lkid;
3025        lkb->lkb_grmode = DLM_LOCK_IV;
3026        lkb->lkb_rqmode = ms->m_rqmode;
3027
3028        lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3029        lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3030
3031        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3032                /* lkb was just created so there won't be an lvb yet */
3033                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3034                if (!lkb->lkb_lvbptr)
3035                        return -ENOMEM;
3036        }
3037
3038        return 0;
3039}
3040
3041static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3042                                struct dlm_message *ms)
3043{
3044        if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3045                return -EBUSY;
3046
3047        if (receive_lvb(ls, lkb, ms))
3048                return -ENOMEM;
3049
3050        lkb->lkb_rqmode = ms->m_rqmode;
3051        lkb->lkb_lvbseq = ms->m_lvbseq;
3052
3053        return 0;
3054}
3055
3056static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3057                               struct dlm_message *ms)
3058{
3059        if (receive_lvb(ls, lkb, ms))
3060                return -ENOMEM;
3061        return 0;
3062}
3063
3064/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3065   uses to send a reply and that the remote end uses to process the reply. */
3066
3067static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3068{
3069        struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3070        lkb->lkb_nodeid = ms->m_header.h_nodeid;
3071        lkb->lkb_remid = ms->m_lkid;
3072}
3073
3074/* This is called after the rsb is locked so that we can safely inspect
3075   fields in the lkb. */
3076
3077static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3078{
3079        int from = ms->m_header.h_nodeid;
3080        int error = 0;
3081
3082        switch (ms->m_type) {
3083        case DLM_MSG_CONVERT:
3084        case DLM_MSG_UNLOCK:
3085        case DLM_MSG_CANCEL:
3086                if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3087                        error = -EINVAL;
3088                break;
3089
3090        case DLM_MSG_CONVERT_REPLY:
3091        case DLM_MSG_UNLOCK_REPLY:
3092        case DLM_MSG_CANCEL_REPLY:
3093        case DLM_MSG_GRANT:
3094        case DLM_MSG_BAST:
3095                if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3096                        error = -EINVAL;
3097                break;
3098
3099        case DLM_MSG_REQUEST_REPLY:
3100                if (!is_process_copy(lkb))
3101                        error = -EINVAL;
3102                else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3103                        error = -EINVAL;
3104                break;
3105
3106        default:
3107                error = -EINVAL;
3108        }
3109
3110        if (error)
3111                log_error(lkb->lkb_resource->res_ls,
3112                          "ignore invalid message %d from %d %x %x %x %d",
3113                          ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3114                          lkb->lkb_flags, lkb->lkb_nodeid);
3115        return error;
3116}
3117
3118static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3119{
3120        struct dlm_lkb *lkb;
3121        struct dlm_rsb *r;
3122        int error, namelen;
3123
3124        error = create_lkb(ls, &lkb);
3125        if (error)
3126                goto fail;
3127
3128        receive_flags(lkb, ms);
3129        lkb->lkb_flags |= DLM_IFL_MSTCPY;
3130        error = receive_request_args(ls, lkb, ms);
3131        if (error) {
3132                __put_lkb(ls, lkb);
3133                goto fail;
3134        }
3135
3136        namelen = receive_extralen(ms);
3137
3138        error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3139        if (error) {
3140                __put_lkb(ls, lkb);
3141                goto fail;
3142        }
3143
3144        lock_rsb(r);
3145
3146        attach_lkb(r, lkb);
3147        error = do_request(r, lkb);
3148        send_request_reply(r, lkb, error);
3149
3150        unlock_rsb(r);
3151        put_rsb(r);
3152
3153        if (error == -EINPROGRESS)
3154                error = 0;
3155        if (error)
3156                dlm_put_lkb(lkb);
3157        return;
3158
3159 fail:
3160        setup_stub_lkb(ls, ms);
3161        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3162}
3163
3164static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3165{
3166        struct dlm_lkb *lkb;
3167        struct dlm_rsb *r;
3168        int error, reply = 1;
3169
3170        error = find_lkb(ls, ms->m_remid, &lkb);
3171        if (error)
3172                goto fail;
3173
3174        r = lkb->lkb_resource;
3175
3176        hold_rsb(r);
3177        lock_rsb(r);
3178
3179        error = validate_message(lkb, ms);
3180        if (error)
3181                goto out;
3182
3183        receive_flags(lkb, ms);
3184        error = receive_convert_args(ls, lkb, ms);
3185        if (error)
3186                goto out_reply;
3187        reply = !down_conversion(lkb);
3188
3189        error = do_convert(r, lkb);
3190 out_reply:
3191        if (reply)
3192                send_convert_reply(r, lkb, error);
3193 out:
3194        unlock_rsb(r);
3195        put_rsb(r);
3196        dlm_put_lkb(lkb);
3197        return;
3198
3199 fail:
3200        setup_stub_lkb(ls, ms);
3201        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3202}
3203
3204static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3205{
3206        struct dlm_lkb *lkb;
3207        struct dlm_rsb *r;
3208        int error;
3209
3210        error = find_lkb(ls, ms->m_remid, &lkb);
3211        if (error)
3212                goto fail;
3213
3214        r = lkb->lkb_resource;
3215
3216        hold_rsb(r);
3217        lock_rsb(r);
3218
3219        error = validate_message(lkb, ms);
3220        if (error)
3221                goto out;
3222
3223        receive_flags(lkb, ms);
3224        error = receive_unlock_args(ls, lkb, ms);
3225        if (error)
3226                goto out_reply;
3227
3228        error = do_unlock(r, lkb);
3229 out_reply:
3230        send_unlock_reply(r, lkb, error);
3231 out:
3232        unlock_rsb(r);
3233        put_rsb(r);
3234        dlm_put_lkb(lkb);
3235        return;
3236
3237 fail:
3238        setup_stub_lkb(ls, ms);
3239        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3240}
3241
3242static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3243{
3244        struct dlm_lkb *lkb;
3245        struct dlm_rsb *r;
3246        int error;
3247
3248        error = find_lkb(ls, ms->m_remid, &lkb);
3249        if (error)
3250                goto fail;
3251
3252        receive_flags(lkb, ms);
3253
3254        r = lkb->lkb_resource;
3255
3256        hold_rsb(r);
3257        lock_rsb(r);
3258
3259        error = validate_message(lkb, ms);
3260        if (error)
3261                goto out;
3262
3263        error = do_cancel(r, lkb);
3264        send_cancel_reply(r, lkb, error);
3265 out:
3266        unlock_rsb(r);
3267        put_rsb(r);
3268        dlm_put_lkb(lkb);
3269        return;
3270
3271 fail:
3272        setup_stub_lkb(ls, ms);
3273        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3274}
3275
3276static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3277{
3278        struct dlm_lkb *lkb;
3279        struct dlm_rsb *r;
3280        int error;
3281
3282        error = find_lkb(ls, ms->m_remid, &lkb);
3283        if (error) {
3284                log_debug(ls, "receive_grant from %d no lkb %x",
3285                          ms->m_header.h_nodeid, ms->m_remid);
3286                return;
3287        }
3288
3289        r = lkb->lkb_resource;
3290
3291        hold_rsb(r);
3292        lock_rsb(r);
3293
3294        error = validate_message(lkb, ms);
3295        if (error)
3296                goto out;
3297
3298        receive_flags_reply(lkb, ms);
3299        if (is_altmode(lkb))
3300                munge_altmode(lkb, ms);
3301        grant_lock_pc(r, lkb, ms);
3302        queue_cast(r, lkb, 0);
3303 out:
3304        unlock_rsb(r);
3305        put_rsb(r);
3306        dlm_put_lkb(lkb);
3307}
3308
3309static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3310{
3311        struct dlm_lkb *lkb;
3312        struct dlm_rsb *r;
3313        int error;
3314
3315        error = find_lkb(ls, ms->m_remid, &lkb);
3316        if (error) {
3317                log_debug(ls, "receive_bast from %d no lkb %x",
3318                          ms->m_header.h_nodeid, ms->m_remid);
3319                return;
3320        }
3321
3322        r = lkb->lkb_resource;
3323
3324        hold_rsb(r);
3325        lock_rsb(r);
3326
3327        error = validate_message(lkb, ms);
3328        if (error)
3329                goto out;
3330
3331        queue_bast(r, lkb, ms->m_bastmode);
3332 out:
3333        unlock_rsb(r);
3334        put_rsb(r);
3335        dlm_put_lkb(lkb);
3336}
3337
3338static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3339{
3340        int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3341
3342        from_nodeid = ms->m_header.h_nodeid;
3343        our_nodeid = dlm_our_nodeid();
3344
3345        len = receive_extralen(ms);
3346
3347        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3348        if (dir_nodeid != our_nodeid) {
3349                log_error(ls, "lookup dir_nodeid %d from %d",
3350                          dir_nodeid, from_nodeid);
3351                error = -EINVAL;
3352                ret_nodeid = -1;
3353                goto out;
3354        }
3355
3356        error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3357
3358        /* Optimization: we're master so treat lookup as a request */
3359        if (!error && ret_nodeid == our_nodeid) {
3360                receive_request(ls, ms);
3361                return;
3362        }
3363 out:
3364        send_lookup_reply(ls, ms, ret_nodeid, error);
3365}
3366
3367static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3368{
3369        int len, dir_nodeid, from_nodeid;
3370
3371        from_nodeid = ms->m_header.h_nodeid;
3372
3373        len = receive_extralen(ms);
3374
3375        dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3376        if (dir_nodeid != dlm_our_nodeid()) {
3377                log_error(ls, "remove dir entry dir_nodeid %d from %d",
3378                          dir_nodeid, from_nodeid);
3379                return;
3380        }
3381
3382        dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3383}
3384
3385static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3386{
3387        do_purge(ls, ms->m_nodeid, ms->m_pid);
3388}
3389
3390static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3391{
3392        struct dlm_lkb *lkb;
3393        struct dlm_rsb *r;
3394        int error, mstype, result;
3395
3396        error = find_lkb(ls, ms->m_remid, &lkb);
3397        if (error) {
3398                log_debug(ls, "receive_request_reply from %d no lkb %x",
3399                          ms->m_header.h_nodeid, ms->m_remid);
3400                return;
3401        }
3402
3403        r = lkb->lkb_resource;
3404        hold_rsb(r);
3405        lock_rsb(r);
3406
3407        error = validate_message(lkb, ms);
3408        if (error)
3409                goto out;
3410
3411        mstype = lkb->lkb_wait_type;
3412        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3413        if (error)
3414                goto out;
3415
3416        /* Optimization: the dir node was also the master, so it took our
3417           lookup as a request and sent request reply instead of lookup reply */
3418        if (mstype == DLM_MSG_LOOKUP) {
3419                r->res_nodeid = ms->m_header.h_nodeid;
3420                lkb->lkb_nodeid = r->res_nodeid;
3421        }
3422
3423        /* this is the value returned from do_request() on the master */
3424        result = ms->m_result;
3425
3426        switch (result) {
3427        case -EAGAIN:
3428                /* request would block (be queued) on remote master */
3429                queue_cast(r, lkb, -EAGAIN);
3430                confirm_master(r, -EAGAIN);
3431                unhold_lkb(lkb); /* undoes create_lkb() */
3432                break;
3433
3434        case -EINPROGRESS:
3435        case 0:
3436                /* request was queued or granted on remote master */
3437                receive_flags_reply(lkb, ms);
3438                lkb->lkb_remid = ms->m_lkid;
3439                if (is_altmode(lkb))
3440                        munge_altmode(lkb, ms);
3441                if (result) {
3442                        add_lkb(r, lkb, DLM_LKSTS_WAITING);
3443                        add_timeout(lkb);
3444                } else {
3445                        grant_lock_pc(r, lkb, ms);
3446                        queue_cast(r, lkb, 0);
3447                }
3448                confirm_master(r, result);
3449                break;
3450
3451        case -EBADR:
3452        case -ENOTBLK:
3453                /* find_rsb failed to find rsb or rsb wasn't master */
3454                log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3455                          lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3456                r->res_nodeid = -1;
3457                lkb->lkb_nodeid = -1;
3458
3459                if (is_overlap(lkb)) {
3460                        /* we'll ignore error in cancel/unlock reply */
3461                        queue_cast_overlap(r, lkb);
3462                        confirm_master(r, result);
3463                        unhold_lkb(lkb); /* undoes create_lkb() */
3464                } else
3465                        _request_lock(r, lkb);
3466                break;
3467
3468        default:
3469                log_error(ls, "receive_request_reply %x error %d",
3470                          lkb->lkb_id, result);
3471        }
3472
3473        if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3474                log_debug(ls, "receive_request_reply %x result %d unlock",
3475                          lkb->lkb_id, result);
3476                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3477                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3478                send_unlock(r, lkb);
3479        } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3480                log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3481                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3482                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3483                send_cancel(r, lkb);
3484        } else {
3485                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3486                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3487        }
3488 out:
3489        unlock_rsb(r);
3490        put_rsb(r);
3491        dlm_put_lkb(lkb);
3492}
3493
3494static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3495                                    struct dlm_message *ms)
3496{
3497        /* this is the value returned from do_convert() on the master */
3498        switch (ms->m_result) {
3499        case -EAGAIN:
3500                /* convert would block (be queued) on remote master */
3501                queue_cast(r, lkb, -EAGAIN);
3502                break;
3503
3504        case -EDEADLK:
3505                receive_flags_reply(lkb, ms);
3506                revert_lock_pc(r, lkb);
3507                queue_cast(r, lkb, -EDEADLK);
3508                break;
3509
3510        case -EINPROGRESS:
3511                /* convert was queued on remote master */
3512                receive_flags_reply(lkb, ms);
3513                if (is_demoted(lkb))
3514                        munge_demoted(lkb, ms);
3515                del_lkb(r, lkb);
3516                add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3517                add_timeout(lkb);
3518                break;
3519
3520        case 0:
3521                /* convert was granted on remote master */
3522                receive_flags_reply(lkb, ms);
3523                if (is_demoted(lkb))
3524                        munge_demoted(lkb, ms);
3525                grant_lock_pc(r, lkb, ms);
3526                queue_cast(r, lkb, 0);
3527                break;
3528
3529        default:
3530                log_error(r->res_ls, "receive_convert_reply %x error %d",
3531                          lkb->lkb_id, ms->m_result);
3532        }
3533}
3534
3535static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3536{
3537        struct dlm_rsb *r = lkb->lkb_resource;
3538        int error;
3539
3540        hold_rsb(r);
3541        lock_rsb(r);
3542
3543        error = validate_message(lkb, ms);
3544        if (error)
3545                goto out;
3546
3547        /* stub reply can happen with waiters_mutex held */
3548        error = remove_from_waiters_ms(lkb, ms);
3549        if (error)
3550                goto out;
3551
3552        __receive_convert_reply(r, lkb, ms);
3553 out:
3554        unlock_rsb(r);
3555        put_rsb(r);
3556}
3557
3558static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3559{
3560        struct dlm_lkb *lkb;
3561        int error;
3562
3563        error = find_lkb(ls, ms->m_remid, &lkb);
3564        if (error) {
3565                log_debug(ls, "receive_convert_reply from %d no lkb %x",
3566                          ms->m_header.h_nodeid, ms->m_remid);
3567                return;
3568        }
3569
3570        _receive_convert_reply(lkb, ms);
3571        dlm_put_lkb(lkb);
3572}
3573
3574static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3575{
3576        struct dlm_rsb *r = lkb->lkb_resource;
3577        int error;
3578
3579        hold_rsb(r);
3580        lock_rsb(r);
3581
3582        error = validate_message(lkb, ms);
3583        if (error)
3584                goto out;
3585
3586        /* stub reply can happen with waiters_mutex held */
3587        error = remove_from_waiters_ms(lkb, ms);
3588        if (error)
3589                goto out;
3590
3591        /* this is the value returned from do_unlock() on the master */
3592
3593        switch (ms->m_result) {
3594        case -DLM_EUNLOCK:
3595                receive_flags_reply(lkb, ms);
3596                remove_lock_pc(r, lkb);
3597                queue_cast(r, lkb, -DLM_EUNLOCK);
3598                break;
3599        case -ENOENT:
3600                break;
3601        default:
3602                log_error(r->res_ls, "receive_unlock_reply %x error %d",
3603                          lkb->lkb_id, ms->m_result);
3604        }
3605 out:
3606        unlock_rsb(r);
3607        put_rsb(r);
3608}
3609
3610static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3611{
3612        struct dlm_lkb *lkb;
3613        int error;
3614
3615        error = find_lkb(ls, ms->m_remid, &lkb);
3616        if (error) {
3617                log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3618                          ms->m_header.h_nodeid, ms->m_remid);
3619                return;
3620        }
3621
3622        _receive_unlock_reply(lkb, ms);
3623        dlm_put_lkb(lkb);
3624}
3625
3626static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3627{
3628        struct dlm_rsb *r = lkb->lkb_resource;
3629        int error;
3630
3631        hold_rsb(r);
3632        lock_rsb(r);
3633
3634        error = validate_message(lkb, ms);
3635        if (error)
3636                goto out;
3637
3638        /* stub reply can happen with waiters_mutex held */
3639        error = remove_from_waiters_ms(lkb, ms);
3640        if (error)
3641                goto out;
3642
3643        /* this is the value returned from do_cancel() on the master */
3644
3645        switch (ms->m_result) {
3646        case -DLM_ECANCEL:
3647                receive_flags_reply(lkb, ms);
3648                revert_lock_pc(r, lkb);
3649                queue_cast(r, lkb, -DLM_ECANCEL);
3650                break;
3651        case 0:
3652                break;
3653        default:
3654                log_error(r->res_ls, "receive_cancel_reply %x error %d",
3655                          lkb->lkb_id, ms->m_result);
3656        }
3657 out:
3658        unlock_rsb(r);
3659        put_rsb(r);
3660}
3661
3662static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3663{
3664        struct dlm_lkb *lkb;
3665        int error;
3666
3667        error = find_lkb(ls, ms->m_remid, &lkb);
3668        if (error) {
3669                log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3670                          ms->m_header.h_nodeid, ms->m_remid);
3671                return;
3672        }
3673
3674        _receive_cancel_reply(lkb, ms);
3675        dlm_put_lkb(lkb);
3676}
3677
3678static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3679{
3680        struct dlm_lkb *lkb;
3681        struct dlm_rsb *r;
3682        int error, ret_nodeid;
3683
3684        error = find_lkb(ls, ms->m_lkid, &lkb);
3685        if (error) {
3686                log_error(ls, "receive_lookup_reply no lkb");
3687                return;
3688        }
3689
3690        /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3691           FIXME: will a non-zero error ever be returned? */
3692
3693        r = lkb->lkb_resource;
3694        hold_rsb(r);
3695        lock_rsb(r);
3696
3697        error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3698        if (error)
3699                goto out;
3700
3701        ret_nodeid = ms->m_nodeid;
3702        if (ret_nodeid == dlm_our_nodeid()) {
3703                r->res_nodeid = 0;
3704                ret_nodeid = 0;
3705                r->res_first_lkid = 0;
3706        } else {
3707                /* set_master() will copy res_nodeid to lkb_nodeid */
3708                r->res_nodeid = ret_nodeid;
3709        }
3710
3711        if (is_overlap(lkb)) {
3712                log_debug(ls, "receive_lookup_reply %x unlock %x",
3713                          lkb->lkb_id, lkb->lkb_flags);
3714                queue_cast_overlap(r, lkb);
3715                unhold_lkb(lkb); /* undoes create_lkb() */
3716                goto out_list;
3717        }
3718
3719        _request_lock(r, lkb);
3720
3721 out_list:
3722        if (!ret_nodeid)
3723                process_lookup_list(r);
3724 out:
3725        unlock_rsb(r);
3726        put_rsb(r);
3727        dlm_put_lkb(lkb);
3728}
3729
3730static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3731{
3732        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3733                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3734                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3735                          ms->m_remid, ms->m_result);
3736                return;
3737        }
3738
3739        switch (ms->m_type) {
3740
3741        /* messages sent to a master node */
3742
3743        case DLM_MSG_REQUEST:
3744                receive_request(ls, ms);
3745                break;
3746
3747        case DLM_MSG_CONVERT:
3748                receive_convert(ls, ms);
3749                break;
3750
3751        case DLM_MSG_UNLOCK:
3752                receive_unlock(ls, ms);
3753                break;
3754
3755        case DLM_MSG_CANCEL:
3756                receive_cancel(ls, ms);
3757                break;
3758
3759        /* messages sent from a master node (replies to above) */
3760
3761        case DLM_MSG_REQUEST_REPLY:
3762                receive_request_reply(ls, ms);
3763                break;
3764
3765        case DLM_MSG_CONVERT_REPLY:
3766                receive_convert_reply(ls, ms);
3767                break;
3768
3769        case DLM_MSG_UNLOCK_REPLY:
3770                receive_unlock_reply(ls, ms);
3771                break;
3772
3773        case DLM_MSG_CANCEL_REPLY:
3774                receive_cancel_reply(ls, ms);
3775                break;
3776
3777        /* messages sent from a master node (only two types of async msg) */
3778
3779        case DLM_MSG_GRANT:
3780                receive_grant(ls, ms);
3781                break;
3782
3783        case DLM_MSG_BAST:
3784                receive_bast(ls, ms);
3785                break;
3786
3787        /* messages sent to a dir node */
3788
3789        case DLM_MSG_LOOKUP:
3790                receive_lookup(ls, ms);
3791                break;
3792
3793        case DLM_MSG_REMOVE:
3794                receive_remove(ls, ms);
3795                break;
3796
3797        /* messages sent from a dir node (remove has no reply) */
3798
3799        case DLM_MSG_LOOKUP_REPLY:
3800                receive_lookup_reply(ls, ms);
3801                break;
3802
3803        /* other messages */
3804
3805        case DLM_MSG_PURGE:
3806                receive_purge(ls, ms);
3807                break;
3808
3809        default:
3810                log_error(ls, "unknown message type %d", ms->m_type);
3811        }
3812
3813        dlm_astd_wake();
3814}
3815
3816/* If the lockspace is in recovery mode (locking stopped), then normal
3817   messages are saved on the requestqueue for processing after recovery is
3818   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3819   messages off the requestqueue before we process new ones. This occurs right
3820   after recovery completes when we transition from saving all messages on
3821   requestqueue, to processing all the saved messages, to processing new
3822   messages as they arrive. */
3823
3824static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3825                                int nodeid)
3826{
3827        if (dlm_locking_stopped(ls)) {
3828                dlm_add_requestqueue(ls, nodeid, ms);
3829        } else {
3830                dlm_wait_requestqueue(ls);
3831                _receive_message(ls, ms);
3832        }
3833}
3834
3835/* This is called by dlm_recoverd to process messages that were saved on
3836   the requestqueue. */
3837
3838void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3839{
3840        _receive_message(ls, ms);
3841}
3842
3843/* This is called by the midcomms layer when something is received for
3844   the lockspace.  It could be either a MSG (normal message sent as part of
3845   standard locking activity) or an RCOM (recovery message sent as part of
3846   lockspace recovery). */
3847
3848void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3849{
3850        struct dlm_header *hd = &p->header;
3851        struct dlm_ls *ls;
3852        int type = 0;
3853
3854        switch (hd->h_cmd) {
3855        case DLM_MSG:
3856                dlm_message_in(&p->message);
3857                type = p->message.m_type;
3858                break;
3859        case DLM_RCOM:
3860                dlm_rcom_in(&p->rcom);
3861                type = p->rcom.rc_type;
3862                break;
3863        default:
3864                log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3865                return;
3866        }
3867
3868        if (hd->h_nodeid != nodeid) {
3869                log_print("invalid h_nodeid %d from %d lockspace %x",
3870                          hd->h_nodeid, nodeid, hd->h_lockspace);
3871                return;
3872        }
3873
3874        ls = dlm_find_lockspace_global(hd->h_lockspace);
3875        if (!ls) {
3876                if (dlm_config.ci_log_debug)
3877                        log_print("invalid lockspace %x from %d cmd %d type %d",
3878                                  hd->h_lockspace, nodeid, hd->h_cmd, type);
3879
3880                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3881                        dlm_send_ls_not_ready(nodeid, &p->rcom);
3882                return;
3883        }
3884
3885        /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3886           be inactive (in this ls) before transitioning to recovery mode */
3887
3888        down_read(&ls->ls_recv_active);
3889        if (hd->h_cmd == DLM_MSG)
3890                dlm_receive_message(ls, &p->message, nodeid);
3891        else
3892                dlm_receive_rcom(ls, &p->rcom, nodeid);
3893        up_read(&ls->ls_recv_active);
3894
3895        dlm_put_lockspace(ls);
3896}
3897
3898static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3899{
3900        if (middle_conversion(lkb)) {
3901                hold_lkb(lkb);
3902                ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3903                ls->ls_stub_ms.m_result = -EINPROGRESS;
3904                ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3905                ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3906                _receive_convert_reply(lkb, &ls->ls_stub_ms);
3907
3908                /* Same special case as in receive_rcom_lock_args() */
3909                lkb->lkb_grmode = DLM_LOCK_IV;
3910                rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3911                unhold_lkb(lkb);
3912
3913        } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3914                lkb->lkb_flags |= DLM_IFL_RESEND;
3915        }
3916
3917        /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3918           conversions are async; there's no reply from the remote master */
3919}
3920
3921/* A waiting lkb needs recovery if the master node has failed, or
3922   the master node is changing (only when no directory is used) */
3923
3924static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3925{
3926        if (dlm_is_removed(ls, lkb->lkb_nodeid))
3927                return 1;
3928
3929        if (!dlm_no_directory(ls))
3930                return 0;
3931
3932        if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3933                return 1;
3934
3935        return 0;
3936}
3937
3938/* Recovery for locks that are waiting for replies from nodes that are now
3939   gone.  We can just complete unlocks and cancels by faking a reply from the
3940   dead node.  Requests and up-conversions we flag to be resent after
3941   recovery.  Down-conversions can just be completed with a fake reply like
3942   unlocks.  Conversions between PR and CW need special attention. */
3943
3944void dlm_recover_waiters_pre(struct dlm_ls *ls)
3945{
3946        struct dlm_lkb *lkb, *safe;
3947        int wait_type, stub_unlock_result, stub_cancel_result;
3948
3949        mutex_lock(&ls->ls_waiters_mutex);
3950
3951        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3952                log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3953                          lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3954
3955                /* all outstanding lookups, regardless of destination  will be
3956                   resent after recovery is done */
3957
3958                if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3959                        lkb->lkb_flags |= DLM_IFL_RESEND;
3960                        continue;
3961                }
3962
3963                if (!waiter_needs_recovery(ls, lkb))
3964                        continue;
3965
3966                wait_type = lkb->lkb_wait_type;
3967                stub_unlock_result = -DLM_EUNLOCK;
3968                stub_cancel_result = -DLM_ECANCEL;
3969
3970                /* Main reply may have been received leaving a zero wait_type,
3971                   but a reply for the overlapping op may not have been
3972                   received.  In that case we need to fake the appropriate
3973                   reply for the overlap op. */
3974
3975                if (!wait_type) {
3976                        if (is_overlap_cancel(lkb)) {
3977                                wait_type = DLM_MSG_CANCEL;
3978                                if (lkb->lkb_grmode == DLM_LOCK_IV)
3979                                        stub_cancel_result = 0;
3980                        }
3981                        if (is_overlap_unlock(lkb)) {
3982                                wait_type = DLM_MSG_UNLOCK;
3983                                if (lkb->lkb_grmode == DLM_LOCK_IV)
3984                                        stub_unlock_result = -ENOENT;
3985                        }
3986
3987                        log_debug(ls, "rwpre overlap %x %x %d %d %d",
3988                                  lkb->lkb_id, lkb->lkb_flags, wait_type,
3989                                  stub_cancel_result, stub_unlock_result);
3990                }
3991
3992                switch (wait_type) {
3993
3994                case DLM_MSG_REQUEST:
3995                        lkb->lkb_flags |= DLM_IFL_RESEND;
3996                        break;
3997
3998                case DLM_MSG_CONVERT:
3999                        recover_convert_waiter(ls, lkb);
4000                        break;
4001
4002                case DLM_MSG_UNLOCK:
4003                        hold_lkb(lkb);
4004                        ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4005                        ls->ls_stub_ms.m_result = stub_unlock_result;
4006                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4007                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4008                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4009                        dlm_put_lkb(lkb);
4010                        break;
4011
4012                case DLM_MSG_CANCEL:
4013                        hold_lkb(lkb);
4014                        ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4015                        ls->ls_stub_ms.m_result = stub_cancel_result;
4016                        ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4017                        ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4018                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4019                        dlm_put_lkb(lkb);
4020                        break;
4021
4022                default:
4023                        log_error(ls, "invalid lkb wait_type %d %d",
4024                                  lkb->lkb_wait_type, wait_type);
4025                }
4026                schedule();
4027        }
4028        mutex_unlock(&ls->ls_waiters_mutex);
4029}
4030
4031static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4032{
4033        struct dlm_lkb *lkb;
4034        int found = 0;
4035
4036        mutex_lock(&ls->ls_waiters_mutex);
4037        list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4038                if (lkb->lkb_flags & DLM_IFL_RESEND) {
4039                        hold_lkb(lkb);
4040                        found = 1;
4041                        break;
4042                }
4043        }
4044        mutex_unlock(&ls->ls_waiters_mutex);
4045
4046        if (!found)
4047                lkb = NULL;
4048        return lkb;
4049}
4050
4051/* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4052   master or dir-node for r.  Processing the lkb may result in it being placed
4053   back on waiters. */
4054
4055/* We do this after normal locking has been enabled and any saved messages
4056   (in requestqueue) have been processed.  We should be confident that at
4057   this point we won't get or process a reply to any of these waiting
4058   operations.  But, new ops may be coming in on the rsbs/locks here from
4059   userspace or remotely. */
4060
4061/* there may have been an overlap unlock/cancel prior to recovery or after
4062   recovery.  if before, the lkb may still have a pos wait_count; if after, the
4063   overlap flag would just have been set and nothing new sent.  we can be
4064   confident here than any replies to either the initial op or overlap ops
4065   prior to recovery have been received. */
4066
4067int dlm_recover_waiters_post(struct dlm_ls *ls)
4068{
4069        struct dlm_lkb *lkb;
4070        struct dlm_rsb *r;
4071        int error = 0, mstype, err, oc, ou;
4072
4073        while (1) {
4074                if (dlm_locking_stopped(ls)) {
4075                        log_debug(ls, "recover_waiters_post aborted");
4076                        error = -EINTR;
4077                        break;
4078                }
4079
4080                lkb = find_resend_waiter(ls);
4081                if (!lkb)
4082                        break;
4083
4084                r = lkb->lkb_resource;
4085                hold_rsb(r);
4086                lock_rsb(r);
4087
4088                mstype = lkb->lkb_wait_type;
4089                oc = is_overlap_cancel(lkb);
4090                ou = is_overlap_unlock(lkb);
4091                err = 0;
4092
4093                log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4094                          lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4095
4096                /* At this point we assume that we won't get a reply to any
4097                   previous op or overlap op on this lock.  First, do a big
4098                   remove_from_waiters() for all previous ops. */
4099
4100                lkb->lkb_flags &= ~DLM_IFL_RESEND;
4101                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4102                lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4103                lkb->lkb_wait_type = 0;
4104                lkb->lkb_wait_count = 0;
4105                mutex_lock(&ls->ls_waiters_mutex);
4106                list_del_init(&lkb->lkb_wait_reply);
4107                mutex_unlock(&ls->ls_waiters_mutex);
4108                unhold_lkb(lkb); /* for waiters list */
4109
4110                if (oc || ou) {
4111                        /* do an unlock or cancel instead of resending */
4112                        switch (mstype) {
4113                        case DLM_MSG_LOOKUP:
4114                        case DLM_MSG_REQUEST:
4115                                queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4116                                                        -DLM_ECANCEL);
4117                                unhold_lkb(lkb); /* undoes create_lkb() */
4118                                break;
4119                        case DLM_MSG_CONVERT:
4120                                if (oc) {
4121                                        queue_cast(r, lkb, -DLM_ECANCEL);
4122                                } else {
4123                                        lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4124                                        _unlock_lock(r, lkb);
4125                                }
4126                                break;
4127                        default:
4128                                err = 1;
4129                        }
4130                } else {
4131                        switch (mstype) {
4132                        case DLM_MSG_LOOKUP:
4133                        case DLM_MSG_REQUEST:
4134                                _request_lock(r, lkb);
4135                                if (is_master(r))
4136                                        confirm_master(r, 0);
4137                                break;
4138                        case DLM_MSG_CONVERT:
4139                                _convert_lock(r, lkb);
4140                                break;
4141                        default:
4142                                err = 1;
4143                        }
4144                }
4145
4146                if (err)
4147                        log_error(ls, "recover_waiters_post %x %d %x %d %d",
4148                                    lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4149                unlock_rsb(r);
4150                put_rsb(r);
4151                dlm_put_lkb(lkb);
4152        }
4153
4154        return error;
4155}
4156
4157static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4158                        int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4159{
4160        struct dlm_ls *ls = r->res_ls;
4161        struct dlm_lkb *lkb, *safe;
4162
4163        list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4164                if (test(ls, lkb)) {
4165                        rsb_set_flag(r, RSB_LOCKS_PURGED);
4166                        del_lkb(r, lkb);
4167                        /* this put should free the lkb */
4168                        if (!dlm_put_lkb(lkb))
4169                                log_error(ls, "purged lkb not released");
4170                }
4171        }
4172}
4173
4174static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4175{
4176        return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4177}
4178
4179static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4180{
4181        return is_master_copy(lkb);
4182}
4183
4184static void purge_dead_locks(struct dlm_rsb *r)
4185{
4186        purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4187        purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4188        purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4189}
4190
4191void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4192{
4193        purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4194        purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4195        purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4196}
4197
4198/* Get rid of locks held by nodes that are gone. */
4199
4200int dlm_purge_locks(struct dlm_ls *ls)
4201{
4202        struct dlm_rsb *r;
4203
4204        log_debug(ls, "dlm_purge_locks");
4205
4206        down_write(&ls->ls_root_sem);
4207        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4208                hold_rsb(r);
4209                lock_rsb(r);
4210                if (is_master(r))
4211                        purge_dead_locks(r);
4212                unlock_rsb(r);
4213                unhold_rsb(r);
4214
4215                schedule();
4216        }
4217        up_write(&ls->ls_root_sem);
4218
4219        return 0;
4220}
4221
4222static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4223{
4224        struct dlm_rsb *r, *r_ret = NULL;
4225
4226        read_lock(&ls->ls_rsbtbl[bucket].lock);
4227        list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4228                if (!rsb_flag(r, RSB_LOCKS_PURGED))
4229                        continue;
4230                hold_rsb(r);
4231                rsb_clear_flag(r, RSB_LOCKS_PURGED);
4232                r_ret = r;
4233                break;
4234        }
4235        read_unlock(&ls->ls_rsbtbl[bucket].lock);
4236        return r_ret;
4237}
4238
4239void dlm_grant_after_purge(struct dlm_ls *ls)
4240{
4241        struct dlm_rsb *r;
4242        int bucket = 0;
4243
4244        while (1) {
4245                r = find_purged_rsb(ls, bucket);
4246                if (!r) {
4247                        if (bucket == ls->ls_rsbtbl_size - 1)
4248                                break;
4249                        bucket++;
4250                        continue;
4251                }
4252                lock_rsb(r);
4253                if (is_master(r)) {
4254                        grant_pending_locks(r);
4255                        confirm_master(r, 0);
4256                }
4257                unlock_rsb(r);
4258                put_rsb(r);
4259                schedule();
4260        }
4261}
4262
4263static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4264                                         uint32_t remid)
4265{
4266        struct dlm_lkb *lkb;
4267
4268        list_for_each_entry(lkb, head, lkb_statequeue) {
4269                if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4270                        return lkb;
4271        }
4272        return NULL;
4273}
4274
4275static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4276                                    uint32_t remid)
4277{
4278        struct dlm_lkb *lkb;
4279
4280        lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4281        if (lkb)
4282                return lkb;
4283        lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4284        if (lkb)
4285                return lkb;
4286        lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4287        if (lkb)
4288                return lkb;
4289        return NULL;
4290}
4291
4292/* needs at least dlm_rcom + rcom_lock */
4293static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4294                                  struct dlm_rsb *r, struct dlm_rcom *rc)
4295{
4296        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4297
4298        lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4299        lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4300        lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4301        lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4302        lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4303        lkb->lkb_flags |= DLM_IFL_MSTCPY;
4304        lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4305        lkb->lkb_rqmode = rl->rl_rqmode;
4306        lkb->lkb_grmode = rl->rl_grmode;
4307        /* don't set lkb_status because add_lkb wants to itself */
4308
4309        lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4310        lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4311
4312        if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4313                int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4314                         sizeof(struct rcom_lock);
4315                if (lvblen > ls->ls_lvblen)
4316                        return -EINVAL;
4317                lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4318                if (!lkb->lkb_lvbptr)
4319                        return -ENOMEM;
4320                memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4321        }
4322
4323        /* Conversions between PR and CW (middle modes) need special handling.
4324           The real granted mode of these converting locks cannot be determined
4325           until all locks have been rebuilt on the rsb (recover_conversion) */
4326
4327        if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4328            middle_conversion(lkb)) {
4329                rl->rl_status = DLM_LKSTS_CONVERT;
4330                lkb->lkb_grmode = DLM_LOCK_IV;
4331                rsb_set_flag(r, RSB_RECOVER_CONVERT);
4332        }
4333
4334        return 0;
4335}
4336
4337/* This lkb may have been recovered in a previous aborted recovery so we need
4338   to check if the rsb already has an lkb with the given remote nodeid/lkid.
4339   If so we just send back a standard reply.  If not, we create a new lkb with
4340   the given values and send back our lkid.  We send back our lkid by sending
4341   back the rcom_lock struct we got but with the remid field filled in. */
4342
4343/* needs at least dlm_rcom + rcom_lock */
4344int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4345{
4346        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4347        struct dlm_rsb *r;
4348        struct dlm_lkb *lkb;
4349        int error;
4350
4351        if (rl->rl_parent_lkid) {
4352                error = -EOPNOTSUPP;
4353                goto out;
4354        }
4355
4356        error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4357                         R_MASTER, &r);
4358        if (error)
4359                goto out;
4360
4361        lock_rsb(r);
4362
4363        lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4364        if (lkb) {
4365                error = -EEXIST;
4366                goto out_remid;
4367        }
4368
4369        error = create_lkb(ls, &lkb);
4370        if (error)
4371                goto out_unlock;
4372
4373        error = receive_rcom_lock_args(ls, lkb, r, rc);
4374        if (error) {
4375                __put_lkb(ls, lkb);
4376                goto out_unlock;
4377        }
4378
4379        attach_lkb(r, lkb);
4380        add_lkb(r, lkb, rl->rl_status);
4381        error = 0;
4382
4383 out_remid:
4384        /* this is the new value returned to the lock holder for
4385           saving in its process-copy lkb */
4386        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4387
4388 out_unlock:
4389        unlock_rsb(r);
4390        put_rsb(r);
4391 out:
4392        if (error)
4393                log_debug(ls, "recover_master_copy %d %x", error,
4394                          le32_to_cpu(rl->rl_lkid));
4395        rl->rl_result = cpu_to_le32(error);
4396        return error;
4397}
4398
4399/* needs at least dlm_rcom + rcom_lock */
4400int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4401{
4402        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4403        struct dlm_rsb *r;
4404        struct dlm_lkb *lkb;
4405        int error;
4406
4407        error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4408        if (error) {
4409                log_error(ls, "recover_process_copy no lkid %x",
4410                                le32_to_cpu(rl->rl_lkid));
4411                return error;
4412        }
4413
4414        DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4415
4416        error = le32_to_cpu(rl->rl_result);
4417
4418        r = lkb->lkb_resource;
4419        hold_rsb(r);
4420        lock_rsb(r);
4421
4422        switch (error) {
4423        case -EBADR:
4424                /* There's a chance the new master received our lock before
4425                   dlm_recover_master_reply(), this wouldn't happen if we did
4426                   a barrier between recover_masters and recover_locks. */
4427                log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4428                          (unsigned long)r, r->res_name);
4429                dlm_send_rcom_lock(r, lkb);
4430                goto out;
4431        case -EEXIST:
4432                log_debug(ls, "master copy exists %x", lkb->lkb_id);
4433                /* fall through */
4434        case 0:
4435                lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4436                break;
4437        default:
4438                log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4439                          error, lkb->lkb_id);
4440        }
4441
4442        /* an ack for dlm_recover_locks() which waits for replies from
4443           all the locks it sends to new masters */
4444        dlm_recovered_lock(r);
4445 out:
4446        unlock_rsb(r);
4447        put_rsb(r);
4448        dlm_put_lkb(lkb);
4449
4450        return 0;
4451}
4452
4453int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4454                     int mode, uint32_t flags, void *name, unsigned int namelen,
4455                     unsigned long timeout_cs)
4456{
4457        struct dlm_lkb *lkb;
4458        struct dlm_args args;
4459        int error;
4460
4461        dlm_lock_recovery(ls);
4462
4463        error = create_lkb(ls, &lkb);
4464        if (error) {
4465                kfree(ua);
4466                goto out;
4467        }
4468
4469        if (flags & DLM_LKF_VALBLK) {
4470                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4471                if (!ua->lksb.sb_lvbptr) {
4472                        kfree(ua);
4473                        __put_lkb(ls, lkb);
4474                        error = -ENOMEM;
4475                        goto out;
4476                }
4477        }
4478
4479        /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4480           When DLM_IFL_USER is set, the dlm knows that this is a userspace
4481           lock and that lkb_astparam is the dlm_user_args structure. */
4482
4483        error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4484                              fake_astfn, ua, fake_bastfn, &args);
4485        lkb->lkb_flags |= DLM_IFL_USER;
4486        ua->old_mode = DLM_LOCK_IV;
4487
4488        if (error) {
4489                __put_lkb(ls, lkb);
4490                goto out;
4491        }
4492
4493        error = request_lock(ls, lkb, name, namelen, &args);
4494
4495        switch (error) {
4496        case 0:
4497                break;
4498        case -EINPROGRESS:
4499                error = 0;
4500                break;
4501        case -EAGAIN:
4502                error = 0;
4503                /* fall through */
4504        default:
4505                __put_lkb(ls, lkb);
4506                goto out;
4507        }
4508
4509        /* add this new lkb to the per-process list of locks */
4510        spin_lock(&ua->proc->locks_spin);
4511        hold_lkb(lkb);
4512        list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4513        spin_unlock(&ua->proc->locks_spin);
4514 out:
4515        dlm_unlock_recovery(ls);
4516        return error;
4517}
4518
4519int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4520                     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4521                     unsigned long timeout_cs)
4522{
4523        struct dlm_lkb *lkb;
4524        struct dlm_args args;
4525        struct dlm_user_args *ua;
4526        int error;
4527
4528        dlm_lock_recovery(ls);
4529
4530        error = find_lkb(ls, lkid, &lkb);
4531        if (error)
4532                goto out;
4533
4534        /* user can change the params on its lock when it converts it, or
4535           add an lvb that didn't exist before */
4536
4537        ua = lkb->lkb_ua;
4538
4539        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4540                ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4541                if (!ua->lksb.sb_lvbptr) {
4542                        error = -ENOMEM;
4543                        goto out_put;
4544                }
4545        }
4546        if (lvb_in && ua->lksb.sb_lvbptr)
4547                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4548
4549        ua->xid = ua_tmp->xid;
4550        ua->castparam = ua_tmp->castparam;
4551        ua->castaddr = ua_tmp->castaddr;
4552        ua->bastparam = ua_tmp->bastparam;
4553        ua->bastaddr = ua_tmp->bastaddr;
4554        ua->user_lksb = ua_tmp->user_lksb;
4555        ua->old_mode = lkb->lkb_grmode;
4556
4557        error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4558                              fake_astfn, ua, fake_bastfn, &args);
4559        if (error)
4560                goto out_put;
4561
4562        error = convert_lock(ls, lkb, &args);
4563
4564        if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4565                error = 0;
4566 out_put:
4567        dlm_put_lkb(lkb);
4568 out:
4569        dlm_unlock_recovery(ls);
4570        kfree(ua_tmp);
4571        return error;
4572}
4573
4574int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4575                    uint32_t flags, uint32_t lkid, char *lvb_in)
4576{
4577        struct dlm_lkb *lkb;
4578        struct dlm_args args;
4579        struct dlm_user_args *ua;
4580        int error;
4581
4582        dlm_lock_recovery(ls);
4583
4584        error = find_lkb(ls, lkid, &lkb);
4585        if (error)
4586                goto out;
4587
4588        ua = lkb->lkb_ua;
4589
4590        if (lvb_in && ua->lksb.sb_lvbptr)
4591                memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4592        if (ua_tmp->castparam)
4593                ua->castparam = ua_tmp->castparam;
4594        ua->user_lksb = ua_tmp->user_lksb;
4595
4596        error = set_unlock_args(flags, ua, &args);
4597        if (error)
4598                goto out_put;
4599
4600        error = unlock_lock(ls, lkb, &args);
4601
4602        if (error == -DLM_EUNLOCK)
4603                error = 0;
4604        /* from validate_unlock_args() */
4605        if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4606                error = 0;
4607        if (error)
4608                goto out_put;
4609
4610        spin_lock(&ua->proc->locks_spin);
4611        /* dlm_user_add_ast() may have already taken lkb off the proc list */
4612        if (!list_empty(&lkb->lkb_ownqueue))
4613                list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4614        spin_unlock(&ua->proc->locks_spin);
4615 out_put:
4616        dlm_put_lkb(lkb);
4617 out:
4618        dlm_unlock_recovery(ls);
4619        kfree(ua_tmp);
4620        return error;
4621}
4622
4623int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4624                    uint32_t flags, uint32_t lkid)
4625{
4626        struct dlm_lkb *lkb;
4627        struct dlm_args args;
4628        struct dlm_user_args *ua;
4629        int error;
4630
4631        dlm_lock_recovery(ls);
4632
4633        error = find_lkb(ls, lkid, &lkb);
4634        if (error)
4635                goto out;
4636
4637        ua = lkb->lkb_ua;
4638        if (ua_tmp->castparam)
4639                ua->castparam = ua_tmp->castparam;
4640        ua->user_lksb = ua_tmp->user_lksb;
4641
4642        error = set_unlock_args(flags, ua, &args);
4643        if (error)
4644                goto out_put;
4645
4646        error = cancel_lock(ls, lkb, &args);
4647
4648        if (error == -DLM_ECANCEL)
4649                error = 0;
4650        /* from validate_unlock_args() */
4651        if (error == -EBUSY)
4652                error = 0;
4653 out_put:
4654        dlm_put_lkb(lkb);
4655 out:
4656        dlm_unlock_recovery(ls);
4657        kfree(ua_tmp);
4658        return error;
4659}
4660
4661int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4662{
4663        struct dlm_lkb *lkb;
4664        struct dlm_args args;
4665        struct dlm_user_args *ua;
4666        struct dlm_rsb *r;
4667        int error;
4668
4669        dlm_lock_recovery(ls);
4670
4671        error = find_lkb(ls, lkid, &lkb);
4672        if (error)
4673                goto out;
4674
4675        ua = lkb->lkb_ua;
4676
4677        error = set_unlock_args(flags, ua, &args);
4678        if (error)
4679                goto out_put;
4680
4681        /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4682
4683        r = lkb->lkb_resource;
4684        hold_rsb(r);
4685        lock_rsb(r);
4686
4687        error = validate_unlock_args(lkb, &args);
4688        if (error)
4689                goto out_r;
4690        lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4691
4692        error = _cancel_lock(r, lkb);
4693 out_r:
4694        unlock_rsb(r);
4695        put_rsb(r);
4696
4697        if (error == -DLM_ECANCEL)
4698                error = 0;
4699        /* from validate_unlock_args() */
4700        if (error == -EBUSY)
4701                error = 0;
4702 out_put:
4703        dlm_put_lkb(lkb);
4704 out:
4705        dlm_unlock_recovery(ls);
4706        return error;
4707}
4708
4709/* lkb's that are removed from the waiters list by revert are just left on the
4710   orphans list with the granted orphan locks, to be freed by purge */
4711
4712static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4713{
4714        struct dlm_args args;
4715        int error;
4716
4717        hold_lkb(lkb);
4718        mutex_lock(&ls->ls_orphans_mutex);
4719        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4720        mutex_unlock(&ls->ls_orphans_mutex);
4721
4722        set_unlock_args(0, lkb->lkb_ua, &args);
4723
4724        error = cancel_lock(ls, lkb, &args);
4725        if (error == -DLM_ECANCEL)
4726                error = 0;
4727        return error;
4728}
4729
4730/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4731   Regardless of what rsb queue the lock is on, it's removed and freed. */
4732
4733static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4734{
4735        struct dlm_args args;
4736        int error;
4737
4738        set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4739
4740        error = unlock_lock(ls, lkb, &args);
4741        if (error == -DLM_EUNLOCK)
4742                error = 0;
4743        return error;
4744}
4745
4746/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4747   (which does lock_rsb) due to deadlock with receiving a message that does
4748   lock_rsb followed by dlm_user_add_ast() */
4749
4750static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4751                                     struct dlm_user_proc *proc)
4752{
4753        struct dlm_lkb *lkb = NULL;
4754
4755        mutex_lock(&ls->ls_clear_proc_locks);
4756        if (list_empty(&proc->locks))
4757                goto out;
4758
4759        lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4760        list_del_init(&lkb->lkb_ownqueue);
4761
4762        if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4763                lkb->lkb_flags |= DLM_IFL_ORPHAN;
4764        else
4765                lkb->lkb_flags |= DLM_IFL_DEAD;
4766 out:
4767        mutex_unlock(&ls->ls_clear_proc_locks);
4768        return lkb;
4769}
4770
4771/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4772   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4773   which we clear here. */
4774
4775/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4776   list, and no more device_writes should add lkb's to proc->locks list; so we
4777   shouldn't need to take asts_spin or locks_spin here.  this assumes that
4778   device reads/writes/closes are serialized -- FIXME: we may need to serialize
4779   them ourself. */
4780
4781void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4782{
4783        struct dlm_lkb *lkb, *safe;
4784
4785        dlm_lock_recovery(ls);
4786
4787        while (1) {
4788                lkb = del_proc_lock(ls, proc);
4789                if (!lkb)
4790                        break;
4791                del_timeout(lkb);
4792                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4793                        orphan_proc_lock(ls, lkb);
4794                else
4795                        unlock_proc_lock(ls, lkb);
4796
4797                /* this removes the reference for the proc->locks list
4798                   added by dlm_user_request, it may result in the lkb
4799                   being freed */
4800
4801                dlm_put_lkb(lkb);
4802        }
4803
4804        mutex_lock(&ls->ls_clear_proc_locks);
4805
4806        /* in-progress unlocks */
4807        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4808                list_del_init(&lkb->lkb_ownqueue);
4809                lkb->lkb_flags |= DLM_IFL_DEAD;
4810                dlm_put_lkb(lkb);
4811        }
4812
4813        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4814                lkb->lkb_ast_type = 0;
4815                list_del(&lkb->lkb_astqueue);
4816                dlm_put_lkb(lkb);
4817        }
4818
4819        mutex_unlock(&ls->ls_clear_proc_locks);
4820        dlm_unlock_recovery(ls);
4821}
4822
4823static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4824{
4825        struct dlm_lkb *lkb, *safe;
4826
4827        while (1) {
4828                lkb = NULL;
4829                spin_lock(&proc->locks_spin);
4830                if (!list_empty(&proc->locks)) {
4831                        lkb = list_entry(proc->locks.next, struct dlm_lkb,
4832                                         lkb_ownqueue);
4833                        list_del_init(&lkb->lkb_ownqueue);
4834                }
4835                spin_unlock(&proc->locks_spin);
4836
4837                if (!lkb)
4838                        break;
4839
4840                lkb->lkb_flags |= DLM_IFL_DEAD;
4841                unlock_proc_lock(ls, lkb);
4842                dlm_put_lkb(lkb); /* ref from proc->locks list */
4843        }
4844
4845        spin_lock(&proc->locks_spin);
4846        list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4847                list_del_init(&lkb->lkb_ownqueue);
4848                lkb->lkb_flags |= DLM_IFL_DEAD;
4849                dlm_put_lkb(lkb);
4850        }
4851        spin_unlock(&proc->locks_spin);
4852
4853        spin_lock(&proc->asts_spin);
4854        list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4855                list_del(&lkb->lkb_astqueue);
4856                dlm_put_lkb(lkb);
4857        }
4858        spin_unlock(&proc->asts_spin);
4859}
4860
4861/* pid of 0 means purge all orphans */
4862
4863static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4864{
4865        struct dlm_lkb *lkb, *safe;
4866
4867        mutex_lock(&ls->ls_orphans_mutex);
4868        list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4869                if (pid && lkb->lkb_ownpid != pid)
4870                        continue;
4871                unlock_proc_lock(ls, lkb);
4872                list_del_init(&lkb->lkb_ownqueue);
4873                dlm_put_lkb(lkb);
4874        }
4875        mutex_unlock(&ls->ls_orphans_mutex);
4876}
4877
4878static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4879{
4880        struct dlm_message *ms;
4881        struct dlm_mhandle *mh;
4882        int error;
4883
4884        error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4885                                DLM_MSG_PURGE, &ms, &mh);
4886        if (error)
4887                return error;
4888        ms->m_nodeid = nodeid;
4889        ms->m_pid = pid;
4890
4891        return send_message(mh, ms);
4892}
4893
4894int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4895                   int nodeid, int pid)
4896{
4897        int error = 0;
4898
4899        if (nodeid != dlm_our_nodeid()) {
4900                error = send_purge(ls, nodeid, pid);
4901        } else {
4902                dlm_lock_recovery(ls);
4903                if (pid == current->pid)
4904                        purge_proc_locks(ls, proc);
4905                else
4906                        do_purge(ls, nodeid, pid);
4907                dlm_unlock_recovery(ls);
4908        }
4909        return error;
4910}
4911