blob: 610dc76a851b985a5364c88006cfb47b549196c5 [file] [log] [blame]
Kurt Hackel6714d8e2005-12-15 14:31:23 -08001/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmthread.c
5 *
6 * standalone DLM module
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28#include <linux/module.h>
29#include <linux/fs.h>
30#include <linux/types.h>
31#include <linux/slab.h>
32#include <linux/highmem.h>
33#include <linux/utsname.h>
34#include <linux/init.h>
35#include <linux/sysctl.h>
36#include <linux/random.h>
37#include <linux/blkdev.h>
38#include <linux/socket.h>
39#include <linux/inet.h>
40#include <linux/timer.h>
41#include <linux/kthread.h>
Kurt Hackel8d79d082006-04-27 17:58:23 -070042#include <linux/delay.h>
Kurt Hackel6714d8e2005-12-15 14:31:23 -080043
44
45#include "cluster/heartbeat.h"
46#include "cluster/nodemanager.h"
47#include "cluster/tcp.h"
48
49#include "dlmapi.h"
50#include "dlmcommon.h"
51#include "dlmdomain.h"
52
53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
54#include "cluster/masklog.h"
55
Kurt Hackel6714d8e2005-12-15 14:31:23 -080056static int dlm_thread(void *data);
Kurt Hackel8b219802006-05-01 11:16:45 -070057static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
Kurt Hackel6714d8e2005-12-15 14:31:23 -080059
60static void dlm_flush_asts(struct dlm_ctxt *dlm);
61
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
63
64/* will exit holding res->spinlock, but may drop in function */
65/* waits until flags are cleared on res->state */
66void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
67{
68 DECLARE_WAITQUEUE(wait, current);
69
70 assert_spin_locked(&res->spinlock);
71
72 add_wait_queue(&res->wq, &wait);
73repeat:
74 set_current_state(TASK_UNINTERRUPTIBLE);
75 if (res->state & flags) {
76 spin_unlock(&res->spinlock);
77 schedule();
78 spin_lock(&res->spinlock);
79 goto repeat;
80 }
81 remove_wait_queue(&res->wq, &wait);
82 current->state = TASK_RUNNING;
83}
84
85
Kurt Hackel69d72b02006-05-01 10:57:51 -070086int __dlm_lockres_unused(struct dlm_lock_resource *res)
Kurt Hackel6714d8e2005-12-15 14:31:23 -080087{
88 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) &&
90 list_empty(&res->blocked) &&
91 list_empty(&res->dirty))
92 return 1;
93 return 0;
94}
95
96
97/* Call whenever you may have added or deleted something from one of
98 * the lockres queue's. This will figure out whether it belongs on the
99 * unused list or not and does the appropriate thing. */
100void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
101 struct dlm_lock_resource *res)
102{
103 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
104
105 assert_spin_locked(&dlm->spinlock);
106 assert_spin_locked(&res->spinlock);
107
108 if (__dlm_lockres_unused(res)){
109 if (list_empty(&res->purge)) {
110 mlog(0, "putting lockres %.*s from purge list\n",
111 res->lockname.len, res->lockname.name);
112
113 res->last_used = jiffies;
114 list_add_tail(&res->purge, &dlm->purge_list);
115 dlm->purge_count++;
Kurt Hackel8b219802006-05-01 11:16:45 -0700116
117 /* if this node is not the owner, there is
118 * no way to keep track of who the owner could be.
119 * unhash it to avoid serious problems. */
120 if (res->owner != dlm->node_num) {
121 mlog(0, "%s:%.*s: doing immediate "
122 "purge of lockres owned by %u\n",
123 dlm->name, res->lockname.len,
124 res->lockname.name, res->owner);
125
126 dlm_purge_lockres_now(dlm, res);
127 }
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800128 }
129 } else if (!list_empty(&res->purge)) {
Kurt Hackel8b219802006-05-01 11:16:45 -0700130 mlog(0, "removing lockres %.*s from purge list, "
131 "owner=%u\n", res->lockname.len, res->lockname.name,
132 res->owner);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800133
134 list_del_init(&res->purge);
135 dlm->purge_count--;
136 }
137}
138
139void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
140 struct dlm_lock_resource *res)
141{
142 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
143 spin_lock(&dlm->spinlock);
144 spin_lock(&res->spinlock);
145
146 __dlm_lockres_calc_usage(dlm, res);
147
148 spin_unlock(&res->spinlock);
149 spin_unlock(&dlm->spinlock);
150}
151
152/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
153 * to do migration, but will re-acquire before exit. */
154void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
155{
156 int master;
157 int ret;
158
159 spin_lock(&lockres->spinlock);
160 master = lockres->owner == dlm->node_num;
161 spin_unlock(&lockres->spinlock);
162
163 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
164 lockres->lockname.name, master);
165
166 /* Non master is the easy case -- no migration required, just
167 * quit. */
168 if (!master)
169 goto finish;
170
171 /* Wheee! Migrate lockres here! */
172 spin_unlock(&dlm->spinlock);
173again:
174
175 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
176 if (ret == -ENOTEMPTY) {
177 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
178 lockres->lockname.len, lockres->lockname.name);
179
180 BUG();
181 } else if (ret < 0) {
182 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
183 lockres->lockname.len, lockres->lockname.name);
Kurt Hackel8d79d082006-04-27 17:58:23 -0700184 msleep(100);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800185 goto again;
186 }
187
188 spin_lock(&dlm->spinlock);
189
190finish:
191 if (!list_empty(&lockres->purge)) {
192 list_del_init(&lockres->purge);
193 dlm->purge_count--;
194 }
195 __dlm_unhash_lockres(lockres);
196}
197
Kurt Hackel8b219802006-05-01 11:16:45 -0700198/* make an unused lockres go away immediately.
199 * as soon as the dlm spinlock is dropped, this lockres
200 * will not be found. kfree still happens on last put. */
201static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
202 struct dlm_lock_resource *lockres)
203{
204 assert_spin_locked(&dlm->spinlock);
205 assert_spin_locked(&lockres->spinlock);
206
207 BUG_ON(!__dlm_lockres_unused(lockres));
208
209 if (!list_empty(&lockres->purge)) {
210 list_del_init(&lockres->purge);
211 dlm->purge_count--;
212 }
213 __dlm_unhash_lockres(lockres);
214}
215
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800216static void dlm_run_purge_list(struct dlm_ctxt *dlm,
217 int purge_now)
218{
219 unsigned int run_max, unused;
220 unsigned long purge_jiffies;
221 struct dlm_lock_resource *lockres;
222
223 spin_lock(&dlm->spinlock);
224 run_max = dlm->purge_count;
225
226 while(run_max && !list_empty(&dlm->purge_list)) {
227 run_max--;
228
229 lockres = list_entry(dlm->purge_list.next,
230 struct dlm_lock_resource, purge);
231
232 /* Status of the lockres *might* change so double
233 * check. If the lockres is unused, holding the dlm
234 * spinlock will prevent people from getting and more
235 * refs on it -- there's no need to keep the lockres
236 * spinlock. */
237 spin_lock(&lockres->spinlock);
238 unused = __dlm_lockres_unused(lockres);
239 spin_unlock(&lockres->spinlock);
240
241 if (!unused)
242 continue;
243
244 purge_jiffies = lockres->last_used +
245 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
246
247 /* Make sure that we want to be processing this guy at
248 * this time. */
249 if (!purge_now && time_after(purge_jiffies, jiffies)) {
250 /* Since resources are added to the purge list
251 * in tail order, we can stop at the first
252 * unpurgable resource -- anyone added after
253 * him will have a greater last_used value */
254 break;
255 }
256
257 list_del_init(&lockres->purge);
258 dlm->purge_count--;
259
260 /* This may drop and reacquire the dlm spinlock if it
261 * has to do migration. */
262 mlog(0, "calling dlm_purge_lockres!\n");
263 dlm_purge_lockres(dlm, lockres);
264 mlog(0, "DONE calling dlm_purge_lockres!\n");
265
266 /* Avoid adding any scheduling latencies */
267 cond_resched_lock(&dlm->spinlock);
268 }
269
270 spin_unlock(&dlm->spinlock);
271}
272
273static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
274 struct dlm_lock_resource *res)
275{
276 struct dlm_lock *lock, *target;
277 struct list_head *iter;
278 struct list_head *head;
279 int can_grant = 1;
280
281 //mlog(0, "res->lockname.len=%d\n", res->lockname.len);
282 //mlog(0, "res->lockname.name=%p\n", res->lockname.name);
283 //mlog(0, "shuffle res %.*s\n", res->lockname.len,
284 // res->lockname.name);
285
286 /* because this function is called with the lockres
287 * spinlock, and because we know that it is not migrating/
288 * recovering/in-progress, it is fine to reserve asts and
289 * basts right before queueing them all throughout */
290 assert_spin_locked(&res->spinlock);
291 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
292 DLM_LOCK_RES_RECOVERING|
293 DLM_LOCK_RES_IN_PROGRESS)));
294
295converting:
296 if (list_empty(&res->converting))
297 goto blocked;
298 mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
299 res->lockname.name);
300
301 target = list_entry(res->converting.next, struct dlm_lock, list);
302 if (target->ml.convert_type == LKM_IVMODE) {
303 mlog(ML_ERROR, "%.*s: converting a lock with no "
304 "convert_type!\n", res->lockname.len, res->lockname.name);
305 BUG();
306 }
307 head = &res->granted;
308 list_for_each(iter, head) {
309 lock = list_entry(iter, struct dlm_lock, list);
310 if (lock==target)
311 continue;
312 if (!dlm_lock_compatible(lock->ml.type,
313 target->ml.convert_type)) {
314 can_grant = 0;
315 /* queue the BAST if not already */
316 if (lock->ml.highest_blocked == LKM_IVMODE) {
317 __dlm_lockres_reserve_ast(res);
318 dlm_queue_bast(dlm, lock);
319 }
320 /* update the highest_blocked if needed */
321 if (lock->ml.highest_blocked < target->ml.convert_type)
322 lock->ml.highest_blocked =
323 target->ml.convert_type;
324 }
325 }
326 head = &res->converting;
327 list_for_each(iter, head) {
328 lock = list_entry(iter, struct dlm_lock, list);
329 if (lock==target)
330 continue;
331 if (!dlm_lock_compatible(lock->ml.type,
332 target->ml.convert_type)) {
333 can_grant = 0;
334 if (lock->ml.highest_blocked == LKM_IVMODE) {
335 __dlm_lockres_reserve_ast(res);
336 dlm_queue_bast(dlm, lock);
337 }
338 if (lock->ml.highest_blocked < target->ml.convert_type)
339 lock->ml.highest_blocked =
340 target->ml.convert_type;
341 }
342 }
343
344 /* we can convert the lock */
345 if (can_grant) {
346 spin_lock(&target->spinlock);
347 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
348
349 mlog(0, "calling ast for converting lock: %.*s, have: %d, "
350 "granting: %d, node: %u\n", res->lockname.len,
351 res->lockname.name, target->ml.type,
352 target->ml.convert_type, target->ml.node);
353
354 target->ml.type = target->ml.convert_type;
355 target->ml.convert_type = LKM_IVMODE;
Akinobu Mitaf1166292006-06-26 00:24:46 -0700356 list_move_tail(&target->list, &res->granted);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800357
358 BUG_ON(!target->lksb);
359 target->lksb->status = DLM_NORMAL;
360
361 spin_unlock(&target->spinlock);
362
363 __dlm_lockres_reserve_ast(res);
364 dlm_queue_ast(dlm, target);
365 /* go back and check for more */
366 goto converting;
367 }
368
369blocked:
370 if (list_empty(&res->blocked))
371 goto leave;
372 target = list_entry(res->blocked.next, struct dlm_lock, list);
373
374 head = &res->granted;
375 list_for_each(iter, head) {
376 lock = list_entry(iter, struct dlm_lock, list);
377 if (lock==target)
378 continue;
379 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
380 can_grant = 0;
381 if (lock->ml.highest_blocked == LKM_IVMODE) {
382 __dlm_lockres_reserve_ast(res);
383 dlm_queue_bast(dlm, lock);
384 }
385 if (lock->ml.highest_blocked < target->ml.type)
386 lock->ml.highest_blocked = target->ml.type;
387 }
388 }
389
390 head = &res->converting;
391 list_for_each(iter, head) {
392 lock = list_entry(iter, struct dlm_lock, list);
393 if (lock==target)
394 continue;
395 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
396 can_grant = 0;
397 if (lock->ml.highest_blocked == LKM_IVMODE) {
398 __dlm_lockres_reserve_ast(res);
399 dlm_queue_bast(dlm, lock);
400 }
401 if (lock->ml.highest_blocked < target->ml.type)
402 lock->ml.highest_blocked = target->ml.type;
403 }
404 }
405
406 /* we can grant the blocked lock (only
407 * possible if converting list empty) */
408 if (can_grant) {
409 spin_lock(&target->spinlock);
410 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
411
412 mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
413 "node: %u\n", res->lockname.len, res->lockname.name,
414 target->ml.type, target->ml.node);
415
416 // target->ml.type is already correct
Akinobu Mitaf1166292006-06-26 00:24:46 -0700417 list_move_tail(&target->list, &res->granted);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800418
419 BUG_ON(!target->lksb);
420 target->lksb->status = DLM_NORMAL;
421
422 spin_unlock(&target->spinlock);
423
424 __dlm_lockres_reserve_ast(res);
425 dlm_queue_ast(dlm, target);
426 /* go back and check for more */
427 goto converting;
428 }
429
430leave:
431 return;
432}
433
434/* must have NO locks when calling this with res !=NULL * */
435void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
436{
437 mlog_entry("dlm=%p, res=%p\n", dlm, res);
438 if (res) {
439 spin_lock(&dlm->spinlock);
440 spin_lock(&res->spinlock);
441 __dlm_dirty_lockres(dlm, res);
442 spin_unlock(&res->spinlock);
443 spin_unlock(&dlm->spinlock);
444 }
445 wake_up(&dlm->dlm_thread_wq);
446}
447
448void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
449{
450 mlog_entry("dlm=%p, res=%p\n", dlm, res);
451
452 assert_spin_locked(&dlm->spinlock);
453 assert_spin_locked(&res->spinlock);
454
455 /* don't shuffle secondary queues */
456 if ((res->owner == dlm->node_num) &&
457 !(res->state & DLM_LOCK_RES_DIRTY)) {
Kurt Hackel6ff06a92006-05-01 11:51:45 -0700458 /* ref for dirty_list */
459 dlm_lockres_get(res);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800460 list_add_tail(&res->dirty, &dlm->dirty_list);
461 res->state |= DLM_LOCK_RES_DIRTY;
462 }
463}
464
465
466/* Launch the NM thread for the mounted volume */
467int dlm_launch_thread(struct dlm_ctxt *dlm)
468{
469 mlog(0, "starting dlm thread...\n");
470
471 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
472 if (IS_ERR(dlm->dlm_thread_task)) {
473 mlog_errno(PTR_ERR(dlm->dlm_thread_task));
474 dlm->dlm_thread_task = NULL;
475 return -EINVAL;
476 }
477
478 return 0;
479}
480
481void dlm_complete_thread(struct dlm_ctxt *dlm)
482{
483 if (dlm->dlm_thread_task) {
484 mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
485 kthread_stop(dlm->dlm_thread_task);
486 dlm->dlm_thread_task = NULL;
487 }
488}
489
490static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
491{
492 int empty;
493
494 spin_lock(&dlm->spinlock);
495 empty = list_empty(&dlm->dirty_list);
496 spin_unlock(&dlm->spinlock);
497
498 return empty;
499}
500
501static void dlm_flush_asts(struct dlm_ctxt *dlm)
502{
503 int ret;
504 struct dlm_lock *lock;
505 struct dlm_lock_resource *res;
506 u8 hi;
507
508 spin_lock(&dlm->ast_lock);
509 while (!list_empty(&dlm->pending_asts)) {
510 lock = list_entry(dlm->pending_asts.next,
511 struct dlm_lock, ast_list);
512 /* get an extra ref on lock */
513 dlm_lock_get(lock);
514 res = lock->lockres;
515 mlog(0, "delivering an ast for this lockres\n");
516
517 BUG_ON(!lock->ast_pending);
518
519 /* remove from list (including ref) */
520 list_del_init(&lock->ast_list);
521 dlm_lock_put(lock);
522 spin_unlock(&dlm->ast_lock);
523
524 if (lock->ml.node != dlm->node_num) {
525 ret = dlm_do_remote_ast(dlm, res, lock);
526 if (ret < 0)
527 mlog_errno(ret);
528 } else
529 dlm_do_local_ast(dlm, res, lock);
530
531 spin_lock(&dlm->ast_lock);
532
533 /* possible that another ast was queued while
534 * we were delivering the last one */
535 if (!list_empty(&lock->ast_list)) {
536 mlog(0, "aha another ast got queued while "
537 "we were finishing the last one. will "
538 "keep the ast_pending flag set.\n");
539 } else
540 lock->ast_pending = 0;
541
542 /* drop the extra ref.
543 * this may drop it completely. */
544 dlm_lock_put(lock);
545 dlm_lockres_release_ast(dlm, res);
546 }
547
548 while (!list_empty(&dlm->pending_basts)) {
549 lock = list_entry(dlm->pending_basts.next,
550 struct dlm_lock, bast_list);
551 /* get an extra ref on lock */
552 dlm_lock_get(lock);
553 res = lock->lockres;
554
555 BUG_ON(!lock->bast_pending);
556
557 /* get the highest blocked lock, and reset */
558 spin_lock(&lock->spinlock);
559 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
560 hi = lock->ml.highest_blocked;
561 lock->ml.highest_blocked = LKM_IVMODE;
562 spin_unlock(&lock->spinlock);
563
564 /* remove from list (including ref) */
565 list_del_init(&lock->bast_list);
566 dlm_lock_put(lock);
567 spin_unlock(&dlm->ast_lock);
568
569 mlog(0, "delivering a bast for this lockres "
570 "(blocked = %d\n", hi);
571
572 if (lock->ml.node != dlm->node_num) {
573 ret = dlm_send_proxy_bast(dlm, res, lock, hi);
574 if (ret < 0)
575 mlog_errno(ret);
576 } else
577 dlm_do_local_bast(dlm, res, lock, hi);
578
579 spin_lock(&dlm->ast_lock);
580
581 /* possible that another bast was queued while
582 * we were delivering the last one */
583 if (!list_empty(&lock->bast_list)) {
584 mlog(0, "aha another bast got queued while "
585 "we were finishing the last one. will "
586 "keep the bast_pending flag set.\n");
587 } else
588 lock->bast_pending = 0;
589
590 /* drop the extra ref.
591 * this may drop it completely. */
592 dlm_lock_put(lock);
593 dlm_lockres_release_ast(dlm, res);
594 }
595 wake_up(&dlm->ast_wq);
596 spin_unlock(&dlm->ast_lock);
597}
598
599
600#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
601#define DLM_THREAD_MAX_DIRTY 100
602#define DLM_THREAD_MAX_ASTS 10
603
604static int dlm_thread(void *data)
605{
606 struct dlm_lock_resource *res;
607 struct dlm_ctxt *dlm = data;
608 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
609
610 mlog(0, "dlm thread running for %s...\n", dlm->name);
611
612 while (!kthread_should_stop()) {
613 int n = DLM_THREAD_MAX_DIRTY;
614
615 /* dlm_shutting_down is very point-in-time, but that
616 * doesn't matter as we'll just loop back around if we
617 * get false on the leading edge of a state
618 * transition. */
619 dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
620
621 /* We really don't want to hold dlm->spinlock while
622 * calling dlm_shuffle_lists on each lockres that
623 * needs to have its queues adjusted and AST/BASTs
624 * run. So let's pull each entry off the dirty_list
625 * and drop dlm->spinlock ASAP. Once off the list,
626 * res->spinlock needs to be taken again to protect
627 * the queues while calling dlm_shuffle_lists. */
628 spin_lock(&dlm->spinlock);
629 while (!list_empty(&dlm->dirty_list)) {
630 int delay = 0;
631 res = list_entry(dlm->dirty_list.next,
632 struct dlm_lock_resource, dirty);
633
634 /* peel a lockres off, remove it from the list,
635 * unset the dirty flag and drop the dlm lock */
636 BUG_ON(!res);
637 dlm_lockres_get(res);
638
639 spin_lock(&res->spinlock);
640 res->state &= ~DLM_LOCK_RES_DIRTY;
641 list_del_init(&res->dirty);
642 spin_unlock(&res->spinlock);
643 spin_unlock(&dlm->spinlock);
Kurt Hackel6ff06a92006-05-01 11:51:45 -0700644 /* Drop dirty_list ref */
645 dlm_lockres_put(res);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800646
647 /* lockres can be re-dirtied/re-added to the
648 * dirty_list in this gap, but that is ok */
649
650 spin_lock(&res->spinlock);
651 if (res->owner != dlm->node_num) {
652 __dlm_print_one_lock_resource(res);
653 mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
654 res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
655 res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
656 res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
657 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
658 }
659 BUG_ON(res->owner != dlm->node_num);
660
661 /* it is now ok to move lockreses in these states
662 * to the dirty list, assuming that they will only be
663 * dirty for a short while. */
664 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
665 DLM_LOCK_RES_MIGRATING |
666 DLM_LOCK_RES_RECOVERING)) {
667 /* move it to the tail and keep going */
668 spin_unlock(&res->spinlock);
669 mlog(0, "delaying list shuffling for in-"
670 "progress lockres %.*s, state=%d\n",
671 res->lockname.len, res->lockname.name,
672 res->state);
673 delay = 1;
674 goto in_progress;
675 }
676
677 /* at this point the lockres is not migrating/
678 * recovering/in-progress. we have the lockres
679 * spinlock and do NOT have the dlm lock.
680 * safe to reserve/queue asts and run the lists. */
681
Kurt Hackel8d79d082006-04-27 17:58:23 -0700682 mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
683 "res=%.*s\n", dlm->name,
684 res->lockname.len, res->lockname.name);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800685
686 /* called while holding lockres lock */
687 dlm_shuffle_lists(dlm, res);
688 spin_unlock(&res->spinlock);
689
690 dlm_lockres_calc_usage(dlm, res);
691
692in_progress:
693
694 spin_lock(&dlm->spinlock);
695 /* if the lock was in-progress, stick
696 * it on the back of the list */
697 if (delay) {
Kurt Hackel6ff06a92006-05-01 11:51:45 -0700698 /* ref for dirty_list */
699 dlm_lockres_get(res);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800700 spin_lock(&res->spinlock);
701 list_add_tail(&res->dirty, &dlm->dirty_list);
702 res->state |= DLM_LOCK_RES_DIRTY;
703 spin_unlock(&res->spinlock);
704 }
705 dlm_lockres_put(res);
706
707 /* unlikely, but we may need to give time to
708 * other tasks */
709 if (!--n) {
710 mlog(0, "throttling dlm_thread\n");
711 break;
712 }
713 }
714
715 spin_unlock(&dlm->spinlock);
716 dlm_flush_asts(dlm);
717
718 /* yield and continue right away if there is more work to do */
719 if (!n) {
720 yield();
721 continue;
722 }
723
724 wait_event_interruptible_timeout(dlm->dlm_thread_wq,
725 !dlm_dirty_list_empty(dlm) ||
726 kthread_should_stop(),
727 timeout);
728 }
729
730 mlog(0, "quitting DLM thread\n");
731 return 0;
732}