aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/requestqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/requestqueue.c')
-rw-r--r--fs/dlm/requestqueue.c58
1 files changed, 24 insertions, 34 deletions
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 65008d79c96d..0de04f17ccea 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -20,7 +20,7 @@
20struct rq_entry { 20struct rq_entry {
21 struct list_head list; 21 struct list_head list;
22 int nodeid; 22 int nodeid;
23 char request[1]; 23 char request[0];
24}; 24};
25 25
26/* 26/*
@@ -30,42 +30,39 @@ struct rq_entry {
30 * lockspace is enabled on some while still suspended on others. 30 * lockspace is enabled on some while still suspended on others.
31 */ 31 */
32 32
33int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) 33void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
34{ 34{
35 struct rq_entry *e; 35 struct rq_entry *e;
36 int length = hd->h_length; 36 int length = hd->h_length;
37 int rv = 0;
38 37
39 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); 38 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
40 if (!e) { 39 if (!e) {
41 log_print("dlm_add_requestqueue: out of memory\n"); 40 log_print("dlm_add_requestqueue: out of memory len %d", length);
42 return 0; 41 return;
43 } 42 }
44 43
45 e->nodeid = nodeid; 44 e->nodeid = nodeid;
46 memcpy(e->request, hd, length); 45 memcpy(e->request, hd, length);
47 46
48 /* We need to check dlm_locking_stopped() after taking the mutex to
49 avoid a race where dlm_recoverd enables locking and runs
50 process_requestqueue between our earlier dlm_locking_stopped check
51 and this addition to the requestqueue. */
52
53 mutex_lock(&ls->ls_requestqueue_mutex); 47 mutex_lock(&ls->ls_requestqueue_mutex);
54 if (dlm_locking_stopped(ls)) 48 list_add_tail(&e->list, &ls->ls_requestqueue);
55 list_add_tail(&e->list, &ls->ls_requestqueue);
56 else {
57 log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
58 kfree(e);
59 rv = -EAGAIN;
60 }
61 mutex_unlock(&ls->ls_requestqueue_mutex); 49 mutex_unlock(&ls->ls_requestqueue_mutex);
62 return rv;
63} 50}
64 51
52/*
53 * Called by dlm_recoverd to process normal messages saved while recovery was
54 * happening. Normal locking has been enabled before this is called. dlm_recv
55 * upon receiving a message, will wait for all saved messages to be drained
56 * here before processing the message it got. If a new dlm_ls_stop() arrives
57 * while we're processing these saved messages, it may block trying to suspend
58 * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that
59 * case, we don't abort since locking_stopped is still 0. If dlm_recv is not
60 * waiting for us, then this processing may be aborted due to locking_stopped.
61 */
62
65int dlm_process_requestqueue(struct dlm_ls *ls) 63int dlm_process_requestqueue(struct dlm_ls *ls)
66{ 64{
67 struct rq_entry *e; 65 struct rq_entry *e;
68 struct dlm_header *hd;
69 int error = 0; 66 int error = 0;
70 67
71 mutex_lock(&ls->ls_requestqueue_mutex); 68 mutex_lock(&ls->ls_requestqueue_mutex);
@@ -79,14 +76,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
79 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 76 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
80 mutex_unlock(&ls->ls_requestqueue_mutex); 77 mutex_unlock(&ls->ls_requestqueue_mutex);
81 78
82 hd = (struct dlm_header *) e->request; 79 dlm_receive_message_saved(ls, (struct dlm_message *)e->request);
83 error = dlm_receive_message(hd, e->nodeid, 1);
84
85 if (error == -EINTR) {
86 /* entry is left on requestqueue */
87 log_debug(ls, "process_requestqueue abort eintr");
88 break;
89 }
90 80
91 mutex_lock(&ls->ls_requestqueue_mutex); 81 mutex_lock(&ls->ls_requestqueue_mutex);
92 list_del(&e->list); 82 list_del(&e->list);
@@ -106,10 +96,12 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
106 96
107/* 97/*
108 * After recovery is done, locking is resumed and dlm_recoverd takes all the 98 * After recovery is done, locking is resumed and dlm_recoverd takes all the
109 * saved requests and processes them as they would have been by dlm_recvd. At 99 * saved requests and processes them as they would have been by dlm_recv. At
110 * the same time, dlm_recvd will start receiving new requests from remote 100 * the same time, dlm_recv will start receiving new requests from remote nodes.
111 * nodes. We want to delay dlm_recvd processing new requests until 101 * We want to delay dlm_recv processing new requests until dlm_recoverd has
112 * dlm_recoverd has finished processing the old saved requests. 102 * finished processing the old saved requests. We don't check for locking
103 * stopped here because dlm_ls_stop won't stop locking until it's suspended us
104 * (dlm_recv).
113 */ 105 */
114 106
115void dlm_wait_requestqueue(struct dlm_ls *ls) 107void dlm_wait_requestqueue(struct dlm_ls *ls)
@@ -118,8 +110,6 @@ void dlm_wait_requestqueue(struct dlm_ls *ls)
118 mutex_lock(&ls->ls_requestqueue_mutex); 110 mutex_lock(&ls->ls_requestqueue_mutex);
119 if (list_empty(&ls->ls_requestqueue)) 111 if (list_empty(&ls->ls_requestqueue))
120 break; 112 break;
121 if (dlm_locking_stopped(ls))
122 break;
123 mutex_unlock(&ls->ls_requestqueue_mutex); 113 mutex_unlock(&ls->ls_requestqueue_mutex);
124 schedule(); 114 schedule();
125 } 115 }