Showing error 1046

User: Jiri Slaby
Error type: Leaving function in locked state
Error type description: Some lock is not unlocked on all paths of a function, so it is leaked
File location: fs/ocfs2/dlm/dlmthread.c
Line in file: 80
Project: Linux Kernel
Project version: 2.6.28
Tools: Undetermined 1
Entered: 2012-03-04 17:07:06 UTC


Source:

  1/* -*- mode: c; c-basic-offset: 8; -*-
  2 * vim: noexpandtab sw=8 ts=8 sts=0:
  3 *
  4 * dlmthread.c
  5 *
  6 * standalone DLM module
  7 *
  8 * Copyright (C) 2004 Oracle.  All rights reserved.
  9 *
 10 * This program is free software; you can redistribute it and/or
 11 * modify it under the terms of the GNU General Public
 12 * License as published by the Free Software Foundation; either
 13 * version 2 of the License, or (at your option) any later version.
 14 *
 15 * This program is distributed in the hope that it will be useful,
 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 18 * General Public License for more details.
 19 *
 20 * You should have received a copy of the GNU General Public
 21 * License along with this program; if not, write to the
 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 23 * Boston, MA 021110-1307, USA.
 24 *
 25 */
 26
 27
 28#include <linux/module.h>
 29#include <linux/fs.h>
 30#include <linux/types.h>
 31#include <linux/slab.h>
 32#include <linux/highmem.h>
 33#include <linux/utsname.h>
 34#include <linux/init.h>
 35#include <linux/sysctl.h>
 36#include <linux/random.h>
 37#include <linux/blkdev.h>
 38#include <linux/socket.h>
 39#include <linux/inet.h>
 40#include <linux/timer.h>
 41#include <linux/kthread.h>
 42#include <linux/delay.h>
 43
 44
 45#include "cluster/heartbeat.h"
 46#include "cluster/nodemanager.h"
 47#include "cluster/tcp.h"
 48
 49#include "dlmapi.h"
 50#include "dlmcommon.h"
 51#include "dlmdomain.h"
 52
 53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
 54#include "cluster/masklog.h"
 55
 56static int dlm_thread(void *data);
 57static void dlm_flush_asts(struct dlm_ctxt *dlm);
 58
 59#define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
 60
 61/* will exit holding res->spinlock, but may drop in function */
 62/* waits until flags are cleared on res->state */
 63void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
 64{
 65        DECLARE_WAITQUEUE(wait, current);
 66
 67        assert_spin_locked(&res->spinlock);
 68
 69        add_wait_queue(&res->wq, &wait);
 70repeat:
 71        set_current_state(TASK_UNINTERRUPTIBLE);
 72        if (res->state & flags) {
 73                spin_unlock(&res->spinlock);
 74                schedule();
 75                spin_lock(&res->spinlock);
 76                goto repeat;
 77        }
 78        remove_wait_queue(&res->wq, &wait);
 79        __set_current_state(TASK_RUNNING);
 80}
 81
 82int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
 83{
 84        if (list_empty(&res->granted) &&
 85            list_empty(&res->converting) &&
 86            list_empty(&res->blocked))
 87                return 0;
 88        return 1;
 89}
 90
 91/* "unused": the lockres has no locks, is not on the dirty list,
 92 * has no inflight locks (in the gap between mastery and acquiring
 93 * the first lock), and has no bits in its refmap.
 94 * truly ready to be freed. */
 95int __dlm_lockres_unused(struct dlm_lock_resource *res)
 96{
 97        if (!__dlm_lockres_has_locks(res) &&
 98            (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
 99                /* try not to scan the bitmap unless the first two
100                 * conditions are already true */
101                int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
102                if (bit >= O2NM_MAX_NODES) {
103                        /* since the bit for dlm->node_num is not
104                         * set, inflight_locks better be zero */
105                        BUG_ON(res->inflight_locks != 0);
106                        return 1;
107                }
108        }
109        return 0;
110}
111
112
113/* Call whenever you may have added or deleted something from one of
114 * the lockres queue's. This will figure out whether it belongs on the
115 * unused list or not and does the appropriate thing. */
116void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
117                              struct dlm_lock_resource *res)
118{
119        mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
120
121        assert_spin_locked(&dlm->spinlock);
122        assert_spin_locked(&res->spinlock);
123
124        if (__dlm_lockres_unused(res)){
125                if (list_empty(&res->purge)) {
126                        mlog(0, "putting lockres %.*s:%p onto purge list\n",
127                             res->lockname.len, res->lockname.name, res);
128
129                        res->last_used = jiffies;
130                        dlm_lockres_get(res);
131                        list_add_tail(&res->purge, &dlm->purge_list);
132                        dlm->purge_count++;
133                }
134        } else if (!list_empty(&res->purge)) {
135                mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
136                     res->lockname.len, res->lockname.name, res, res->owner);
137
138                list_del_init(&res->purge);
139                dlm_lockres_put(res);
140                dlm->purge_count--;
141        }
142}
143
144void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
145                            struct dlm_lock_resource *res)
146{
147        mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
148        spin_lock(&dlm->spinlock);
149        spin_lock(&res->spinlock);
150
151        __dlm_lockres_calc_usage(dlm, res);
152
153        spin_unlock(&res->spinlock);
154        spin_unlock(&dlm->spinlock);
155}
156
157static int dlm_purge_lockres(struct dlm_ctxt *dlm,
158                             struct dlm_lock_resource *res)
159{
160        int master;
161        int ret = 0;
162
163        spin_lock(&res->spinlock);
164        if (!__dlm_lockres_unused(res)) {
165                spin_unlock(&res->spinlock);
166                mlog(0, "%s:%.*s: tried to purge but not unused\n",
167                     dlm->name, res->lockname.len, res->lockname.name);
168                return -ENOTEMPTY;
169        }
170        master = (res->owner == dlm->node_num);
171        if (!master)
172                res->state |= DLM_LOCK_RES_DROPPING_REF;
173        spin_unlock(&res->spinlock);
174
175        mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
176             res->lockname.name, master);
177
178        if (!master) {
179                /* drop spinlock...  retake below */
180                spin_unlock(&dlm->spinlock);
181
182                spin_lock(&res->spinlock);
183                /* This ensures that clear refmap is sent after the set */
184                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
185                spin_unlock(&res->spinlock);
186
187                /* clear our bit from the master's refmap, ignore errors */
188                ret = dlm_drop_lockres_ref(dlm, res);
189                if (ret < 0) {
190                        mlog_errno(ret);
191                        if (!dlm_is_host_down(ret))
192                                BUG();
193                }
194                mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
195                     dlm->name, res->lockname.len, res->lockname.name, ret);
196                spin_lock(&dlm->spinlock);
197        }
198
199        if (!list_empty(&res->purge)) {
200                mlog(0, "removing lockres %.*s:%p from purgelist, "
201                     "master = %d\n", res->lockname.len, res->lockname.name,
202                     res, master);
203                list_del_init(&res->purge);
204                dlm_lockres_put(res);
205                dlm->purge_count--;
206        }
207        __dlm_unhash_lockres(res);
208
209        /* lockres is not in the hash now.  drop the flag and wake up
210         * any processes waiting in dlm_get_lock_resource. */
211        if (!master) {
212                spin_lock(&res->spinlock);
213                res->state &= ~DLM_LOCK_RES_DROPPING_REF;
214                spin_unlock(&res->spinlock);
215                wake_up(&res->wq);
216        }
217        return 0;
218}
219
220static void dlm_run_purge_list(struct dlm_ctxt *dlm,
221                               int purge_now)
222{
223        unsigned int run_max, unused;
224        unsigned long purge_jiffies;
225        struct dlm_lock_resource *lockres;
226
227        spin_lock(&dlm->spinlock);
228        run_max = dlm->purge_count;
229
230        while(run_max && !list_empty(&dlm->purge_list)) {
231                run_max--;
232
233                lockres = list_entry(dlm->purge_list.next,
234                                     struct dlm_lock_resource, purge);
235
236                /* Status of the lockres *might* change so double
237                 * check. If the lockres is unused, holding the dlm
238                 * spinlock will prevent people from getting and more
239                 * refs on it -- there's no need to keep the lockres
240                 * spinlock. */
241                spin_lock(&lockres->spinlock);
242                unused = __dlm_lockres_unused(lockres);
243                spin_unlock(&lockres->spinlock);
244
245                if (!unused)
246                        continue;
247
248                purge_jiffies = lockres->last_used +
249                        msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
250
251                /* Make sure that we want to be processing this guy at
252                 * this time. */
253                if (!purge_now && time_after(purge_jiffies, jiffies)) {
254                        /* Since resources are added to the purge list
255                         * in tail order, we can stop at the first
256                         * unpurgable resource -- anyone added after
257                         * him will have a greater last_used value */
258                        break;
259                }
260
261                dlm_lockres_get(lockres);
262
263                /* This may drop and reacquire the dlm spinlock if it
264                 * has to do migration. */
265                if (dlm_purge_lockres(dlm, lockres))
266                        BUG();
267
268                dlm_lockres_put(lockres);
269
270                /* Avoid adding any scheduling latencies */
271                cond_resched_lock(&dlm->spinlock);
272        }
273
274        spin_unlock(&dlm->spinlock);
275}
276
277static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
278                              struct dlm_lock_resource *res)
279{
280        struct dlm_lock *lock, *target;
281        struct list_head *iter;
282        struct list_head *head;
283        int can_grant = 1;
284
285        //mlog(0, "res->lockname.len=%d\n", res->lockname.len);
286        //mlog(0, "res->lockname.name=%p\n", res->lockname.name);
287        //mlog(0, "shuffle res %.*s\n", res->lockname.len,
288        //          res->lockname.name);
289
290        /* because this function is called with the lockres
291         * spinlock, and because we know that it is not migrating/
292         * recovering/in-progress, it is fine to reserve asts and
293         * basts right before queueing them all throughout */
294        assert_spin_locked(&res->spinlock);
295        BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
296                              DLM_LOCK_RES_RECOVERING|
297                              DLM_LOCK_RES_IN_PROGRESS)));
298
299converting:
300        if (list_empty(&res->converting))
301                goto blocked;
302        mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
303             res->lockname.name);
304
305        target = list_entry(res->converting.next, struct dlm_lock, list);
306        if (target->ml.convert_type == LKM_IVMODE) {
307                mlog(ML_ERROR, "%.*s: converting a lock with no "
308                     "convert_type!\n", res->lockname.len, res->lockname.name);
309                BUG();
310        }
311        head = &res->granted;
312        list_for_each(iter, head) {
313                lock = list_entry(iter, struct dlm_lock, list);
314                if (lock==target)
315                        continue;
316                if (!dlm_lock_compatible(lock->ml.type,
317                                         target->ml.convert_type)) {
318                        can_grant = 0;
319                        /* queue the BAST if not already */
320                        if (lock->ml.highest_blocked == LKM_IVMODE) {
321                                __dlm_lockres_reserve_ast(res);
322                                dlm_queue_bast(dlm, lock);
323                        }
324                        /* update the highest_blocked if needed */
325                        if (lock->ml.highest_blocked < target->ml.convert_type)
326                                lock->ml.highest_blocked =
327                                        target->ml.convert_type;
328                }
329        }
330        head = &res->converting;
331        list_for_each(iter, head) {
332                lock = list_entry(iter, struct dlm_lock, list);
333                if (lock==target)
334                        continue;
335                if (!dlm_lock_compatible(lock->ml.type,
336                                         target->ml.convert_type)) {
337                        can_grant = 0;
338                        if (lock->ml.highest_blocked == LKM_IVMODE) {
339                                __dlm_lockres_reserve_ast(res);
340                                dlm_queue_bast(dlm, lock);
341                        }
342                        if (lock->ml.highest_blocked < target->ml.convert_type)
343                                lock->ml.highest_blocked =
344                                        target->ml.convert_type;
345                }
346        }
347
348        /* we can convert the lock */
349        if (can_grant) {
350                spin_lock(&target->spinlock);
351                BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
352
353                mlog(0, "calling ast for converting lock: %.*s, have: %d, "
354                     "granting: %d, node: %u\n", res->lockname.len,
355                     res->lockname.name, target->ml.type,
356                     target->ml.convert_type, target->ml.node);
357
358                target->ml.type = target->ml.convert_type;
359                target->ml.convert_type = LKM_IVMODE;
360                list_move_tail(&target->list, &res->granted);
361
362                BUG_ON(!target->lksb);
363                target->lksb->status = DLM_NORMAL;
364
365                spin_unlock(&target->spinlock);
366
367                __dlm_lockres_reserve_ast(res);
368                dlm_queue_ast(dlm, target);
369                /* go back and check for more */
370                goto converting;
371        }
372
373blocked:
374        if (list_empty(&res->blocked))
375                goto leave;
376        target = list_entry(res->blocked.next, struct dlm_lock, list);
377
378        head = &res->granted;
379        list_for_each(iter, head) {
380                lock = list_entry(iter, struct dlm_lock, list);
381                if (lock==target)
382                        continue;
383                if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
384                        can_grant = 0;
385                        if (lock->ml.highest_blocked == LKM_IVMODE) {
386                                __dlm_lockres_reserve_ast(res);
387                                dlm_queue_bast(dlm, lock);
388                        }
389                        if (lock->ml.highest_blocked < target->ml.type)
390                                lock->ml.highest_blocked = target->ml.type;
391                }
392        }
393
394        head = &res->converting;
395        list_for_each(iter, head) {
396                lock = list_entry(iter, struct dlm_lock, list);
397                if (lock==target)
398                        continue;
399                if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
400                        can_grant = 0;
401                        if (lock->ml.highest_blocked == LKM_IVMODE) {
402                                __dlm_lockres_reserve_ast(res);
403                                dlm_queue_bast(dlm, lock);
404                        }
405                        if (lock->ml.highest_blocked < target->ml.type)
406                                lock->ml.highest_blocked = target->ml.type;
407                }
408        }
409
410        /* we can grant the blocked lock (only
411         * possible if converting list empty) */
412        if (can_grant) {
413                spin_lock(&target->spinlock);
414                BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
415
416                mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
417                     "node: %u\n", res->lockname.len, res->lockname.name,
418                     target->ml.type, target->ml.node);
419
420                // target->ml.type is already correct
421                list_move_tail(&target->list, &res->granted);
422
423                BUG_ON(!target->lksb);
424                target->lksb->status = DLM_NORMAL;
425
426                spin_unlock(&target->spinlock);
427
428                __dlm_lockres_reserve_ast(res);
429                dlm_queue_ast(dlm, target);
430                /* go back and check for more */
431                goto converting;
432        }
433
434leave:
435        return;
436}
437
438/* must have NO locks when calling this with res !=NULL * */
439void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
440{
441        mlog_entry("dlm=%p, res=%p\n", dlm, res);
442        if (res) {
443                spin_lock(&dlm->spinlock);
444                spin_lock(&res->spinlock);
445                __dlm_dirty_lockres(dlm, res);
446                spin_unlock(&res->spinlock);
447                spin_unlock(&dlm->spinlock);
448        }
449        wake_up(&dlm->dlm_thread_wq);
450}
451
452void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
453{
454        mlog_entry("dlm=%p, res=%p\n", dlm, res);
455
456        assert_spin_locked(&dlm->spinlock);
457        assert_spin_locked(&res->spinlock);
458
459        /* don't shuffle secondary queues */
460        if ((res->owner == dlm->node_num)) {
461                if (res->state & (DLM_LOCK_RES_MIGRATING |
462                                  DLM_LOCK_RES_BLOCK_DIRTY))
463                    return;
464
465                if (list_empty(&res->dirty)) {
466                        /* ref for dirty_list */
467                        dlm_lockres_get(res);
468                        list_add_tail(&res->dirty, &dlm->dirty_list);
469                        res->state |= DLM_LOCK_RES_DIRTY;
470                }
471        }
472}
473
474
475/* Launch the NM thread for the mounted volume */
476int dlm_launch_thread(struct dlm_ctxt *dlm)
477{
478        mlog(0, "starting dlm thread...\n");
479
480        dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
481        if (IS_ERR(dlm->dlm_thread_task)) {
482                mlog_errno(PTR_ERR(dlm->dlm_thread_task));
483                dlm->dlm_thread_task = NULL;
484                return -EINVAL;
485        }
486
487        return 0;
488}
489
490void dlm_complete_thread(struct dlm_ctxt *dlm)
491{
492        if (dlm->dlm_thread_task) {
493                mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
494                kthread_stop(dlm->dlm_thread_task);
495                dlm->dlm_thread_task = NULL;
496        }
497}
498
499static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
500{
501        int empty;
502
503        spin_lock(&dlm->spinlock);
504        empty = list_empty(&dlm->dirty_list);
505        spin_unlock(&dlm->spinlock);
506
507        return empty;
508}
509
510static void dlm_flush_asts(struct dlm_ctxt *dlm)
511{
512        int ret;
513        struct dlm_lock *lock;
514        struct dlm_lock_resource *res;
515        u8 hi;
516
517        spin_lock(&dlm->ast_lock);
518        while (!list_empty(&dlm->pending_asts)) {
519                lock = list_entry(dlm->pending_asts.next,
520                                  struct dlm_lock, ast_list);
521                /* get an extra ref on lock */
522                dlm_lock_get(lock);
523                res = lock->lockres;
524                mlog(0, "delivering an ast for this lockres\n");
525
526                BUG_ON(!lock->ast_pending);
527
528                /* remove from list (including ref) */
529                list_del_init(&lock->ast_list);
530                dlm_lock_put(lock);
531                spin_unlock(&dlm->ast_lock);
532
533                if (lock->ml.node != dlm->node_num) {
534                        ret = dlm_do_remote_ast(dlm, res, lock);
535                        if (ret < 0)
536                                mlog_errno(ret);
537                } else
538                        dlm_do_local_ast(dlm, res, lock);
539
540                spin_lock(&dlm->ast_lock);
541
542                /* possible that another ast was queued while
543                 * we were delivering the last one */
544                if (!list_empty(&lock->ast_list)) {
545                        mlog(0, "aha another ast got queued while "
546                             "we were finishing the last one.  will "
547                             "keep the ast_pending flag set.\n");
548                } else
549                        lock->ast_pending = 0;
550
551                /* drop the extra ref.
552                 * this may drop it completely. */
553                dlm_lock_put(lock);
554                dlm_lockres_release_ast(dlm, res);
555        }
556
557        while (!list_empty(&dlm->pending_basts)) {
558                lock = list_entry(dlm->pending_basts.next,
559                                  struct dlm_lock, bast_list);
560                /* get an extra ref on lock */
561                dlm_lock_get(lock);
562                res = lock->lockres;
563
564                BUG_ON(!lock->bast_pending);
565
566                /* get the highest blocked lock, and reset */
567                spin_lock(&lock->spinlock);
568                BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
569                hi = lock->ml.highest_blocked;
570                lock->ml.highest_blocked = LKM_IVMODE;
571                spin_unlock(&lock->spinlock);
572
573                /* remove from list (including ref) */
574                list_del_init(&lock->bast_list);
575                dlm_lock_put(lock);
576                spin_unlock(&dlm->ast_lock);
577
578                mlog(0, "delivering a bast for this lockres "
579                     "(blocked = %d\n", hi);
580
581                if (lock->ml.node != dlm->node_num) {
582                        ret = dlm_send_proxy_bast(dlm, res, lock, hi);
583                        if (ret < 0)
584                                mlog_errno(ret);
585                } else
586                        dlm_do_local_bast(dlm, res, lock, hi);
587
588                spin_lock(&dlm->ast_lock);
589
590                /* possible that another bast was queued while
591                 * we were delivering the last one */
592                if (!list_empty(&lock->bast_list)) {
593                        mlog(0, "aha another bast got queued while "
594                             "we were finishing the last one.  will "
595                             "keep the bast_pending flag set.\n");
596                } else
597                        lock->bast_pending = 0;
598
599                /* drop the extra ref.
600                 * this may drop it completely. */
601                dlm_lock_put(lock);
602                dlm_lockres_release_ast(dlm, res);
603        }
604        wake_up(&dlm->ast_wq);
605        spin_unlock(&dlm->ast_lock);
606}
607
608
609#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
610#define DLM_THREAD_MAX_DIRTY  100
611#define DLM_THREAD_MAX_ASTS   10
612
613static int dlm_thread(void *data)
614{
615        struct dlm_lock_resource *res;
616        struct dlm_ctxt *dlm = data;
617        unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
618
619        mlog(0, "dlm thread running for %s...\n", dlm->name);
620
621        while (!kthread_should_stop()) {
622                int n = DLM_THREAD_MAX_DIRTY;
623
624                /* dlm_shutting_down is very point-in-time, but that
625                 * doesn't matter as we'll just loop back around if we
626                 * get false on the leading edge of a state
627                 * transition. */
628                dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
629
630                /* We really don't want to hold dlm->spinlock while
631                 * calling dlm_shuffle_lists on each lockres that
632                 * needs to have its queues adjusted and AST/BASTs
633                 * run.  So let's pull each entry off the dirty_list
634                 * and drop dlm->spinlock ASAP.  Once off the list,
635                 * res->spinlock needs to be taken again to protect
636                 * the queues while calling dlm_shuffle_lists.  */
637                spin_lock(&dlm->spinlock);
638                while (!list_empty(&dlm->dirty_list)) {
639                        int delay = 0;
640                        res = list_entry(dlm->dirty_list.next,
641                                         struct dlm_lock_resource, dirty);
642
643                        /* peel a lockres off, remove it from the list,
644                         * unset the dirty flag and drop the dlm lock */
645                        BUG_ON(!res);
646                        dlm_lockres_get(res);
647
648                        spin_lock(&res->spinlock);
649                        /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
650                        list_del_init(&res->dirty);
651                        spin_unlock(&res->spinlock);
652                        spin_unlock(&dlm->spinlock);
653                        /* Drop dirty_list ref */
654                        dlm_lockres_put(res);
655
656                         /* lockres can be re-dirtied/re-added to the
657                         * dirty_list in this gap, but that is ok */
658
659                        spin_lock(&res->spinlock);
660                        if (res->owner != dlm->node_num) {
661                                __dlm_print_one_lock_resource(res);
662                                mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
663                                     res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
664                                     res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
665                                     res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
666                                     res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
667                        }
668                        BUG_ON(res->owner != dlm->node_num);
669
670                        /* it is now ok to move lockreses in these states
671                         * to the dirty list, assuming that they will only be
672                         * dirty for a short while. */
673                        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
674                        if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
675                                          DLM_LOCK_RES_RECOVERING)) {
676                                /* move it to the tail and keep going */
677                                res->state &= ~DLM_LOCK_RES_DIRTY;
678                                spin_unlock(&res->spinlock);
679                                mlog(0, "delaying list shuffling for in-"
680                                     "progress lockres %.*s, state=%d\n",
681                                     res->lockname.len, res->lockname.name,
682                                     res->state);
683                                delay = 1;
684                                goto in_progress;
685                        }
686
687                        /* at this point the lockres is not migrating/
688                         * recovering/in-progress.  we have the lockres
689                         * spinlock and do NOT have the dlm lock.
690                         * safe to reserve/queue asts and run the lists. */
691
692                        mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
693                             "res=%.*s\n", dlm->name,
694                             res->lockname.len, res->lockname.name);
695
696                        /* called while holding lockres lock */
697                        dlm_shuffle_lists(dlm, res);
698                        res->state &= ~DLM_LOCK_RES_DIRTY;
699                        spin_unlock(&res->spinlock);
700
701                        dlm_lockres_calc_usage(dlm, res);
702
703in_progress:
704
705                        spin_lock(&dlm->spinlock);
706                        /* if the lock was in-progress, stick
707                         * it on the back of the list */
708                        if (delay) {
709                                spin_lock(&res->spinlock);
710                                __dlm_dirty_lockres(dlm, res);
711                                spin_unlock(&res->spinlock);
712                        }
713                        dlm_lockres_put(res);
714
715                        /* unlikely, but we may need to give time to
716                         * other tasks */
717                        if (!--n) {
718                                mlog(0, "throttling dlm_thread\n");
719                                break;
720                        }
721                }
722
723                spin_unlock(&dlm->spinlock);
724                dlm_flush_asts(dlm);
725
726                /* yield and continue right away if there is more work to do */
727                if (!n) {
728                        cond_resched();
729                        continue;
730                }
731
732                wait_event_interruptible_timeout(dlm->dlm_thread_wq,
733                                                 !dlm_dirty_list_empty(dlm) ||
734                                                 kthread_should_stop(),
735                                                 timeout);
736        }
737
738        mlog(0, "quitting DLM thread\n");
739        return 0;
740}