aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/dlm_internal.h1
-rw-r--r--fs/dlm/lock.c136
-rw-r--r--fs/dlm/lock.h3
-rw-r--r--fs/dlm/lockspace.c1
-rw-r--r--fs/dlm/member.c41
-rw-r--r--fs/dlm/midcomms.c17
-rw-r--r--fs/dlm/rcom.c36
-rw-r--r--fs/dlm/rcom.h5
-rw-r--r--fs/dlm/recoverd.c11
-rw-r--r--fs/dlm/requestqueue.c58
-rw-r--r--fs/dlm/requestqueue.h4
11 files changed, 161 insertions, 152 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 74901e981e1..d2fc2384c3b 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -491,6 +491,7 @@ struct dlm_ls {
491 uint64_t ls_recover_seq; 491 uint64_t ls_recover_seq;
492 struct dlm_recover *ls_recover_args; 492 struct dlm_recover *ls_recover_args;
493 struct rw_semaphore ls_in_recovery; /* block local requests */ 493 struct rw_semaphore ls_in_recovery; /* block local requests */
494 struct rw_semaphore ls_recv_active; /* block dlm_recv */
494 struct list_head ls_requestqueue;/* queue remote requests */ 495 struct list_head ls_requestqueue;/* queue remote requests */
495 struct mutex ls_requestqueue_mutex; 496 struct mutex ls_requestqueue_mutex;
496 char *ls_recover_buf; 497 char *ls_recover_buf;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 031229f144f..3915b8e1414 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3638 dlm_put_lkb(lkb); 3638 dlm_put_lkb(lkb);
3639} 3639}
3640 3640
3641int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) 3641static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3642{ 3642{
3643 struct dlm_message *ms = (struct dlm_message *) hd;
3644 struct dlm_ls *ls;
3645 int error = 0;
3646
3647 if (!recovery)
3648 dlm_message_in(ms);
3649
3650 ls = dlm_find_lockspace_global(hd->h_lockspace);
3651 if (!ls) {
3652 log_print("drop message %d from %d for unknown lockspace %d",
3653 ms->m_type, nodeid, hd->h_lockspace);
3654 return -EINVAL;
3655 }
3656
3657 /* recovery may have just ended leaving a bunch of backed-up requests
3658 in the requestqueue; wait while dlm_recoverd clears them */
3659
3660 if (!recovery)
3661 dlm_wait_requestqueue(ls);
3662
3663 /* recovery may have just started while there were a bunch of
3664 in-flight requests -- save them in requestqueue to be processed
3665 after recovery. we can't let dlm_recvd block on the recovery
3666 lock. if dlm_recoverd is calling this function to clear the
3667 requestqueue, it needs to be interrupted (-EINTR) if another
3668 recovery operation is starting. */
3669
3670 while (1) {
3671 if (dlm_locking_stopped(ls)) {
3672 if (recovery) {
3673 error = -EINTR;
3674 goto out;
3675 }
3676 error = dlm_add_requestqueue(ls, nodeid, hd);
3677 if (error == -EAGAIN)
3678 continue;
3679 else {
3680 error = -EINTR;
3681 goto out;
3682 }
3683 }
3684
3685 if (dlm_lock_recovery_try(ls))
3686 break;
3687 schedule();
3688 }
3689
3690 switch (ms->m_type) { 3643 switch (ms->m_type) {
3691 3644
3692 /* messages sent to a master node */ 3645 /* messages sent to a master node */
@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3761 log_error(ls, "unknown message type %d", ms->m_type); 3714 log_error(ls, "unknown message type %d", ms->m_type);
3762 } 3715 }
3763 3716
3764 dlm_unlock_recovery(ls);
3765 out:
3766 dlm_put_lockspace(ls);
3767 dlm_astd_wake(); 3717 dlm_astd_wake();
3768 return error;
3769} 3718}
3770 3719
3720/* If the lockspace is in recovery mode (locking stopped), then normal
3721 messages are saved on the requestqueue for processing after recovery is
3722 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
3723 messages off the requestqueue before we process new ones. This occurs right
3724 after recovery completes when we transition from saving all messages on
3725 requestqueue, to processing all the saved messages, to processing new
3726 messages as they arrive. */
3771 3727
3772/* 3728static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3773 * Recovery related 3729 int nodeid)
3774 */ 3730{
3731 if (dlm_locking_stopped(ls)) {
3732 dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
3733 } else {
3734 dlm_wait_requestqueue(ls);
3735 _receive_message(ls, ms);
3736 }
3737}
3738
3739/* This is called by dlm_recoverd to process messages that were saved on
3740 the requestqueue. */
3741
3742void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3743{
3744 _receive_message(ls, ms);
3745}
3746
3747/* This is called by the midcomms layer when something is received for
3748 the lockspace. It could be either a MSG (normal message sent as part of
3749 standard locking activity) or an RCOM (recovery message sent as part of
3750 lockspace recovery). */
3751
3752void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
3753{
3754 struct dlm_message *ms = (struct dlm_message *) hd;
3755 struct dlm_rcom *rc = (struct dlm_rcom *) hd;
3756 struct dlm_ls *ls;
3757 int type = 0;
3758
3759 switch (hd->h_cmd) {
3760 case DLM_MSG:
3761 dlm_message_in(ms);
3762 type = ms->m_type;
3763 break;
3764 case DLM_RCOM:
3765 dlm_rcom_in(rc);
3766 type = rc->rc_type;
3767 break;
3768 default:
3769 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3770 return;
3771 }
3772
3773 if (hd->h_nodeid != nodeid) {
3774 log_print("invalid h_nodeid %d from %d lockspace %x",
3775 hd->h_nodeid, nodeid, hd->h_lockspace);
3776 return;
3777 }
3778
3779 ls = dlm_find_lockspace_global(hd->h_lockspace);
3780 if (!ls) {
3781 log_print("invalid h_lockspace %x from %d cmd %d type %d",
3782 hd->h_lockspace, nodeid, hd->h_cmd, type);
3783
3784 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3785 dlm_send_ls_not_ready(nodeid, rc);
3786 return;
3787 }
3788
3789 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3790 be inactive (in this ls) before transitioning to recovery mode */
3791
3792 down_read(&ls->ls_recv_active);
3793 if (hd->h_cmd == DLM_MSG)
3794 dlm_receive_message(ls, ms, nodeid);
3795 else
3796 dlm_receive_rcom(ls, rc, nodeid);
3797 up_read(&ls->ls_recv_active);
3798
3799 dlm_put_lockspace(ls);
3800}
3775 3801
3776static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 3802static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3777{ 3803{
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1720313c22d..ada04680a1e 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -16,7 +16,8 @@
16void dlm_print_rsb(struct dlm_rsb *r); 16void dlm_print_rsb(struct dlm_rsb *r);
17void dlm_dump_rsb(struct dlm_rsb *r); 17void dlm_dump_rsb(struct dlm_rsb *r);
18void dlm_print_lkb(struct dlm_lkb *lkb); 18void dlm_print_lkb(struct dlm_lkb *lkb);
19int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery); 19void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
20void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
20int dlm_modes_compat(int mode1, int mode2); 21int dlm_modes_compat(int mode1, int mode2);
21int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, 22int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
22 unsigned int flags, struct dlm_rsb **r_ret); 23 unsigned int flags, struct dlm_rsb **r_ret);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 1dc72105ab1..628eaa669e6 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
519 ls->ls_recover_seq = 0; 519 ls->ls_recover_seq = 0;
520 ls->ls_recover_args = NULL; 520 ls->ls_recover_args = NULL;
521 init_rwsem(&ls->ls_in_recovery); 521 init_rwsem(&ls->ls_in_recovery);
522 init_rwsem(&ls->ls_recv_active);
522 INIT_LIST_HEAD(&ls->ls_requestqueue); 523 INIT_LIST_HEAD(&ls->ls_requestqueue);
523 mutex_init(&ls->ls_requestqueue_mutex); 524 mutex_init(&ls->ls_requestqueue_mutex);
524 mutex_init(&ls->ls_clear_proc_locks); 525 mutex_init(&ls->ls_clear_proc_locks);
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index d09977528f6..e9cdcab306e 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -18,10 +18,6 @@
18#include "rcom.h" 18#include "rcom.h"
19#include "config.h" 19#include "config.h"
20 20
21/*
22 * Following called by dlm_recoverd thread
23 */
24
25static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 21static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
26{ 22{
27 struct dlm_member *memb = NULL; 23 struct dlm_member *memb = NULL;
@@ -250,18 +246,30 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
250 return error; 246 return error;
251} 247}
252 248
253/* 249/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
254 * Following called from lockspace.c 250 dlm_ls_start() is called on any of them to start the new recovery. */
255 */
256 251
257int dlm_ls_stop(struct dlm_ls *ls) 252int dlm_ls_stop(struct dlm_ls *ls)
258{ 253{
259 int new; 254 int new;
260 255
261 /* 256 /*
262 * A stop cancels any recovery that's in progress (see RECOVERY_STOP, 257 * Prevent dlm_recv from being in the middle of something when we do
263 * dlm_recovery_stopped()) and prevents any new locks from being 258 * the stop. This includes ensuring dlm_recv isn't processing a
264 * processed (see RUNNING, dlm_locking_stopped()). 259 * recovery message (rcom), while dlm_recoverd is aborting and
260 * resetting things from an in-progress recovery. i.e. we want
261 * dlm_recoverd to abort its recovery without worrying about dlm_recv
262 * processing an rcom at the same time. Stopping dlm_recv also makes
263 * it easy for dlm_receive_message() to check locking stopped and add a
264 * message to the requestqueue without races.
265 */
266
267 down_write(&ls->ls_recv_active);
268
269 /*
270 * Abort any recovery that's in progress (see RECOVERY_STOP,
271 * dlm_recovery_stopped()) and tell any other threads running in the
272 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
265 */ 273 */
266 274
267 spin_lock(&ls->ls_recover_lock); 275 spin_lock(&ls->ls_recover_lock);
@@ -271,8 +279,14 @@ int dlm_ls_stop(struct dlm_ls *ls)
271 spin_unlock(&ls->ls_recover_lock); 279 spin_unlock(&ls->ls_recover_lock);
272 280
273 /* 281 /*
282 * Let dlm_recv run again, now any normal messages will be saved on the
283 * requestqueue for later.
284 */
285
286 up_write(&ls->ls_recv_active);
287
288 /*
274 * This in_recovery lock does two things: 289 * This in_recovery lock does two things:
275 *
276 * 1) Keeps this function from returning until all threads are out 290 * 1) Keeps this function from returning until all threads are out
277 * of locking routines and locking is truely stopped. 291 * of locking routines and locking is truely stopped.
278 * 2) Keeps any new requests from being processed until it's unlocked 292 * 2) Keeps any new requests from being processed until it's unlocked
@@ -284,9 +298,8 @@ int dlm_ls_stop(struct dlm_ls *ls)
284 298
285 /* 299 /*
286 * The recoverd suspend/resume makes sure that dlm_recoverd (if 300 * The recoverd suspend/resume makes sure that dlm_recoverd (if
287 * running) has noticed the clearing of RUNNING above and quit 301 * running) has noticed RECOVERY_STOP above and quit processing the
288 * processing the previous recovery. This will be true for all nodes 302 * previous recovery.
289 * before any nodes start the new recovery.
290 */ 303 */
291 304
292 dlm_recoverd_suspend(ls); 305 dlm_recoverd_suspend(ls);
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index a5126e0c68a..f8c69dda16a 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -27,7 +27,6 @@
27#include "dlm_internal.h" 27#include "dlm_internal.h"
28#include "lowcomms.h" 28#include "lowcomms.h"
29#include "config.h" 29#include "config.h"
30#include "rcom.h"
31#include "lock.h" 30#include "lock.h"
32#include "midcomms.h" 31#include "midcomms.h"
33 32
@@ -117,19 +116,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
117 offset &= (limit - 1); 116 offset &= (limit - 1);
118 len -= msglen; 117 len -= msglen;
119 118
120 switch (msg->h_cmd) { 119 dlm_receive_buffer(msg, nodeid);
121 case DLM_MSG:
122 dlm_receive_message(msg, nodeid, 0);
123 break;
124
125 case DLM_RCOM:
126 dlm_receive_rcom(msg, nodeid);
127 break;
128
129 default:
130 log_print("unknown msg type %x from %u: %u %u %u %u",
131 msg->h_cmd, nodeid, msglen, len, offset, ret);
132 }
133 } 120 }
134 121
135 if (msg != (struct dlm_header *) __tmp) 122 if (msg != (struct dlm_header *) __tmp)
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 188b91c027e..ae2fd97fa4a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -386,7 +386,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
386 dlm_recover_process_copy(ls, rc_in); 386 dlm_recover_process_copy(ls, rc_in);
387} 387}
388 388
389static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) 389/* If the lockspace doesn't exist then still send a status message
390 back; it's possible that it just doesn't have its global_id yet. */
391
392int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
390{ 393{
391 struct dlm_rcom *rc; 394 struct dlm_rcom *rc;
392 struct rcom_config *rf; 395 struct rcom_config *rf;
@@ -446,28 +449,11 @@ static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
446 return rv; 449 return rv;
447} 450}
448 451
449/* Called by dlm_recvd; corresponds to dlm_receive_message() but special 452/* Called by dlm_recv; corresponds to dlm_receive_message() but special
450 recovery-only comms are sent through here. */ 453 recovery-only comms are sent through here. */
451 454
452void dlm_receive_rcom(struct dlm_header *hd, int nodeid) 455void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
453{ 456{
454 struct dlm_rcom *rc = (struct dlm_rcom *) hd;
455 struct dlm_ls *ls;
456
457 dlm_rcom_in(rc);
458
459 /* If the lockspace doesn't exist then still send a status message
460 back; it's possible that it just doesn't have its global_id yet. */
461
462 ls = dlm_find_lockspace_global(hd->h_lockspace);
463 if (!ls) {
464 log_print("lockspace %x from %d type %x not found",
465 hd->h_lockspace, nodeid, rc->rc_type);
466 if (rc->rc_type == DLM_RCOM_STATUS)
467 send_ls_not_ready(nodeid, rc);
468 return;
469 }
470
471 if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { 457 if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
472 log_debug(ls, "ignoring recovery message %x from %d", 458 log_debug(ls, "ignoring recovery message %x from %d",
473 rc->rc_type, nodeid); 459 rc->rc_type, nodeid);
@@ -477,12 +463,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
477 if (is_old_reply(ls, rc)) 463 if (is_old_reply(ls, rc))
478 goto out; 464 goto out;
479 465
480 if (nodeid != rc->rc_header.h_nodeid) {
481 log_error(ls, "bad rcom nodeid %d from %d",
482 rc->rc_header.h_nodeid, nodeid);
483 goto out;
484 }
485
486 switch (rc->rc_type) { 466 switch (rc->rc_type) {
487 case DLM_RCOM_STATUS: 467 case DLM_RCOM_STATUS:
488 receive_rcom_status(ls, rc); 468 receive_rcom_status(ls, rc);
@@ -520,6 +500,6 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
520 DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type);); 500 DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
521 } 501 }
522 out: 502 out:
523 dlm_put_lockspace(ls); 503 return;
524} 504}
525 505
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index d7984321ff4..b09abd29ba3 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -18,7 +18,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); 18int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); 19int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 20int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
21void dlm_receive_rcom(struct dlm_header *hd, int nodeid); 21void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
22int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
22 23
23#endif 24#endif
24 25
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 66575997861..4b89e20eebe 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -24,19 +24,28 @@
24 24
25 25
26/* If the start for which we're re-enabling locking (seq) has been superseded 26/* If the start for which we're re-enabling locking (seq) has been superseded
27 by a newer stop (ls_recover_seq), we need to leave locking disabled. */ 27 by a newer stop (ls_recover_seq), we need to leave locking disabled.
28
29 We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
30 locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
31 enables locking and clears the requestqueue between a and b. */
28 32
29static int enable_locking(struct dlm_ls *ls, uint64_t seq) 33static int enable_locking(struct dlm_ls *ls, uint64_t seq)
30{ 34{
31 int error = -EINTR; 35 int error = -EINTR;
32 36
37 down_write(&ls->ls_recv_active);
38
33 spin_lock(&ls->ls_recover_lock); 39 spin_lock(&ls->ls_recover_lock);
34 if (ls->ls_recover_seq == seq) { 40 if (ls->ls_recover_seq == seq) {
35 set_bit(LSFL_RUNNING, &ls->ls_flags); 41 set_bit(LSFL_RUNNING, &ls->ls_flags);
42 /* unblocks processes waiting to enter the dlm */
36 up_write(&ls->ls_in_recovery); 43 up_write(&ls->ls_in_recovery);
37 error = 0; 44 error = 0;
38 } 45 }
39 spin_unlock(&ls->ls_recover_lock); 46 spin_unlock(&ls->ls_recover_lock);
47
48 up_write(&ls->ls_recv_active);
40 return error; 49 return error;
41} 50}
42 51
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 65008d79c96..0de04f17cce 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -20,7 +20,7 @@
20struct rq_entry { 20struct rq_entry {
21 struct list_head list; 21 struct list_head list;
22 int nodeid; 22 int nodeid;
23 char request[1]; 23 char request[0];
24}; 24};
25 25
26/* 26/*
@@ -30,42 +30,39 @@ struct rq_entry {
30 * lockspace is enabled on some while still suspended on others. 30 * lockspace is enabled on some while still suspended on others.
31 */ 31 */
32 32
33int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) 33void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
34{ 34{
35 struct rq_entry *e; 35 struct rq_entry *e;
36 int length = hd->h_length; 36 int length = hd->h_length;
37 int rv = 0;
38 37
39 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); 38 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
40 if (!e) { 39 if (!e) {
41 log_print("dlm_add_requestqueue: out of memory\n"); 40 log_print("dlm_add_requestqueue: out of memory len %d", length);
42 return 0; 41 return;
43 } 42 }
44 43
45 e->nodeid = nodeid; 44 e->nodeid = nodeid;
46 memcpy(e->request, hd, length); 45 memcpy(e->request, hd, length);
47 46
48 /* We need to check dlm_locking_stopped() after taking the mutex to
49 avoid a race where dlm_recoverd enables locking and runs
50 process_requestqueue between our earlier dlm_locking_stopped check
51 and this addition to the requestqueue. */
52
53 mutex_lock(&ls->ls_requestqueue_mutex); 47 mutex_lock(&ls->ls_requestqueue_mutex);
54 if (dlm_locking_stopped(ls)) 48 list_add_tail(&e->list, &ls->ls_requestqueue);
55 list_add_tail(&e->list, &ls->ls_requestqueue);
56 else {
57 log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
58 kfree(e);
59 rv = -EAGAIN;
60 }
61 mutex_unlock(&ls->ls_requestqueue_mutex); 49 mutex_unlock(&ls->ls_requestqueue_mutex);
62 return rv;
63} 50}
64 51
52/*
53 * Called by dlm_recoverd to process normal messages saved while recovery was
54 * happening. Normal locking has been enabled before this is called. dlm_recv
55 * upon receiving a message, will wait for all saved messages to be drained
56 * here before processing the message it got. If a new dlm_ls_stop() arrives
57 * while we're processing these saved messages, it may block trying to suspend
58 * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that
59 * case, we don't abort since locking_stopped is still 0. If dlm_recv is not
60 * waiting for us, then this processing may be aborted due to locking_stopped.
61 */
62
65int dlm_process_requestqueue(struct dlm_ls *ls) 63int dlm_process_requestqueue(struct dlm_ls *ls)
66{ 64{
67 struct rq_entry *e; 65 struct rq_entry *e;
68 struct dlm_header *hd;
69 int error = 0; 66 int error = 0;
70 67
71 mutex_lock(&ls->ls_requestqueue_mutex); 68 mutex_lock(&ls->ls_requestqueue_mutex);
@@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
79 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 76 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
80 mutex_unlock(&ls->ls_requestqueue_mutex); 77 mutex_unlock(&ls->ls_requestqueue_mutex);
81 78
82 hd = (struct dlm_header *) e->request; 79 dlm_receive_message_saved(ls, (struct dlm_message *)e->request);
83 error = dlm_receive_message(hd, e->nodeid, 1);
84
85 if (error == -EINTR) {
86 /* entry is left on requestqueue */
87 log_debug(ls, "process_requestqueue abort eintr");
88 break;
89 }
90 80
91 mutex_lock(&ls->ls_requestqueue_mutex); 81 mutex_lock(&ls->ls_requestqueue_mutex);
92 list_del(&e->list); 82 list_del(&e->list);
@@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
106 96
107/* 97/*
108 * After recovery is done, locking is resumed and dlm_recoverd takes all the 98 * After recovery is done, locking is resumed and dlm_recoverd takes all the
109 * saved requests and processes them as they would have been by dlm_recvd. At 99 * saved requests and processes them as they would have been by dlm_recv. At
110 * the same time, dlm_recvd will start receiving new requests from remote 100 * the same time, dlm_recv will start receiving new requests from remote nodes.
111 * nodes. We want to delay dlm_recvd processing new requests until 101 * We want to delay dlm_recv processing new requests until dlm_recoverd has
112 * dlm_recoverd has finished processing the old saved requests. 102 * finished processing the old saved requests. We don't check for locking
103 * stopped here because dlm_ls_stop won't stop locking until it's suspended us
104 * (dlm_recv).
113 */ 105 */
114 106
115void dlm_wait_requestqueue(struct dlm_ls *ls) 107void dlm_wait_requestqueue(struct dlm_ls *ls)
@@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls)
118 mutex_lock(&ls->ls_requestqueue_mutex); 110 mutex_lock(&ls->ls_requestqueue_mutex);
119 if (list_empty(&ls->ls_requestqueue)) 111 if (list_empty(&ls->ls_requestqueue))
120 break; 112 break;
121 if (dlm_locking_stopped(ls))
122 break;
123 mutex_unlock(&ls->ls_requestqueue_mutex); 113 mutex_unlock(&ls->ls_requestqueue_mutex);
124 schedule(); 114 schedule();
125 } 115 }
diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h
index 6a53ea03335..aba34fc05ee 100644
--- a/fs/dlm/requestqueue.h
+++ b/fs/dlm/requestqueue.h
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -13,7 +13,7 @@
13#ifndef __REQUESTQUEUE_DOT_H__ 13#ifndef __REQUESTQUEUE_DOT_H__
14#define __REQUESTQUEUE_DOT_H__ 14#define __REQUESTQUEUE_DOT_H__
15 15
16int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd); 16void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
17int dlm_process_requestqueue(struct dlm_ls *ls); 17int dlm_process_requestqueue(struct dlm_ls *ls);
18void dlm_wait_requestqueue(struct dlm_ls *ls); 18void dlm_wait_requestqueue(struct dlm_ls *ls);
19void dlm_purge_requestqueue(struct dlm_ls *ls); 19void dlm_purge_requestqueue(struct dlm_ls *ls);