aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Whitehouse <swhiteho@redhat.com>2008-05-21 12:21:42 -0400
committerSteven Whitehouse <swhiteho@redhat.com>2008-06-27 04:39:25 -0400
commitf3c9d38a26be32abf9b8897e9e0afc7166c712dd (patch)
tree01b892d0d01720a396589c1f537dd0997128a060
parent6802e3400ff4549525930ee744030c36fce9cc73 (diff)
[GFS2] Fix ordering bug in lock_dlm
This looks like a lot of change, but in fact its not. Mostly its things moving from one file to another. The change is just that instead of queuing lock completions and callbacks from the DLM we now pass them directly to GFS2. This gives us a net loss of two list heads per glock (a fair saving in memory) plus a reduction in the latency of delivering the messages to GFS2, plus we now have one thread fewer as well. There was a bug where callbacks and completions could be delivered in the wrong order due to this unnecessary queuing which is fixed by this patch. Signed-off-by: Steven Whitehouse <swhiteho@redhat.com> Cc: Bob Peterson <rpeterso@redhat.com>
-rw-r--r--fs/gfs2/locking/dlm/lock.c353
-rw-r--r--fs/gfs2/locking/dlm/lock_dlm.h12
-rw-r--r--fs/gfs2/locking/dlm/mount.c4
-rw-r--r--fs/gfs2/locking/dlm/thread.c324
4 files changed, 306 insertions, 387 deletions
diff --git a/fs/gfs2/locking/dlm/lock.c b/fs/gfs2/locking/dlm/lock.c
index fed9a67be0f1..871ffc9578f2 100644
--- a/fs/gfs2/locking/dlm/lock.c
+++ b/fs/gfs2/locking/dlm/lock.c
@@ -11,46 +11,63 @@
11 11
12static char junk_lvb[GDLM_LVB_SIZE]; 12static char junk_lvb[GDLM_LVB_SIZE];
13 13
14static void queue_complete(struct gdlm_lock *lp) 14
15/* convert dlm lock-mode to gfs lock-state */
16
17static s16 gdlm_make_lmstate(s16 dlmmode)
15{ 18{
16 struct gdlm_ls *ls = lp->ls; 19 switch (dlmmode) {
20 case DLM_LOCK_IV:
21 case DLM_LOCK_NL:
22 return LM_ST_UNLOCKED;
23 case DLM_LOCK_EX:
24 return LM_ST_EXCLUSIVE;
25 case DLM_LOCK_CW:
26 return LM_ST_DEFERRED;
27 case DLM_LOCK_PR:
28 return LM_ST_SHARED;
29 }
30 gdlm_assert(0, "unknown DLM mode %d", dlmmode);
31 return -1;
32}
17 33
18 clear_bit(LFL_ACTIVE, &lp->flags); 34/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
35 thread gets to it. */
36
37static void queue_submit(struct gdlm_lock *lp)
38{
39 struct gdlm_ls *ls = lp->ls;
19 40
20 spin_lock(&ls->async_lock); 41 spin_lock(&ls->async_lock);
21 list_add_tail(&lp->clist, &ls->complete); 42 list_add_tail(&lp->delay_list, &ls->submit);
22 spin_unlock(&ls->async_lock); 43 spin_unlock(&ls->async_lock);
23 wake_up(&ls->thread_wait); 44 wake_up(&ls->thread_wait);
24} 45}
25 46
26static inline void gdlm_ast(void *astarg) 47static void wake_up_ast(struct gdlm_lock *lp)
27{ 48{
28 queue_complete(astarg); 49 clear_bit(LFL_AST_WAIT, &lp->flags);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp->flags, LFL_AST_WAIT);
29} 52}
30 53
31static inline void gdlm_bast(void *astarg, int mode) 54static void gdlm_delete_lp(struct gdlm_lock *lp)
32{ 55{
33 struct gdlm_lock *lp = astarg;
34 struct gdlm_ls *ls = lp->ls; 56 struct gdlm_ls *ls = lp->ls;
35 57
36 if (!mode) {
37 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38 lp->lockname.ln_type,
39 (unsigned long long)lp->lockname.ln_number);
40 return;
41 }
42
43 spin_lock(&ls->async_lock); 58 spin_lock(&ls->async_lock);
44 if (!lp->bast_mode) { 59 if (!list_empty(&lp->delay_list))
45 list_add_tail(&lp->blist, &ls->blocking); 60 list_del_init(&lp->delay_list);
46 lp->bast_mode = mode; 61 gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
47 } else if (lp->bast_mode < mode) 62 (unsigned long long)lp->lockname.ln_number);
48 lp->bast_mode = mode; 63 list_del_init(&lp->all_list);
64 ls->all_locks_count--;
49 spin_unlock(&ls->async_lock); 65 spin_unlock(&ls->async_lock);
50 wake_up(&ls->thread_wait); 66
67 kfree(lp);
51} 68}
52 69
53void gdlm_queue_delayed(struct gdlm_lock *lp) 70static void gdlm_queue_delayed(struct gdlm_lock *lp)
54{ 71{
55 struct gdlm_ls *ls = lp->ls; 72 struct gdlm_ls *ls = lp->ls;
56 73
@@ -59,6 +76,249 @@ void gdlm_queue_delayed(struct gdlm_lock *lp)
59 spin_unlock(&ls->async_lock); 76 spin_unlock(&ls->async_lock);
60} 77}
61 78
79static void process_complete(struct gdlm_lock *lp)
80{
81 struct gdlm_ls *ls = lp->ls;
82 struct lm_async_cb acb;
83 s16 prev_mode = lp->cur;
84
85 memset(&acb, 0, sizeof(acb));
86
87 if (lp->lksb.sb_status == -DLM_ECANCEL) {
88 log_info("complete dlm cancel %x,%llx flags %lx",
89 lp->lockname.ln_type,
90 (unsigned long long)lp->lockname.ln_number,
91 lp->flags);
92
93 lp->req = lp->cur;
94 acb.lc_ret |= LM_OUT_CANCELED;
95 if (lp->cur == DLM_LOCK_IV)
96 lp->lksb.sb_lkid = 0;
97 goto out;
98 }
99
100 if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
101 if (lp->lksb.sb_status != -DLM_EUNLOCK) {
102 log_info("unlock sb_status %d %x,%llx flags %lx",
103 lp->lksb.sb_status, lp->lockname.ln_type,
104 (unsigned long long)lp->lockname.ln_number,
105 lp->flags);
106 return;
107 }
108
109 lp->cur = DLM_LOCK_IV;
110 lp->req = DLM_LOCK_IV;
111 lp->lksb.sb_lkid = 0;
112
113 if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
114 gdlm_delete_lp(lp);
115 return;
116 }
117 goto out;
118 }
119
120 if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
121 memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
122
123 if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
124 if (lp->req == DLM_LOCK_PR)
125 lp->req = DLM_LOCK_CW;
126 else if (lp->req == DLM_LOCK_CW)
127 lp->req = DLM_LOCK_PR;
128 }
129
130 /*
131 * A canceled lock request. The lock was just taken off the delayed
132 * list and was never even submitted to dlm.
133 */
134
135 if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
136 log_info("complete internal cancel %x,%llx",
137 lp->lockname.ln_type,
138 (unsigned long long)lp->lockname.ln_number);
139 lp->req = lp->cur;
140 acb.lc_ret |= LM_OUT_CANCELED;
141 goto out;
142 }
143
144 /*
145 * An error occured.
146 */
147
148 if (lp->lksb.sb_status) {
149 /* a "normal" error */
150 if ((lp->lksb.sb_status == -EAGAIN) &&
151 (lp->lkf & DLM_LKF_NOQUEUE)) {
152 lp->req = lp->cur;
153 if (lp->cur == DLM_LOCK_IV)
154 lp->lksb.sb_lkid = 0;
155 goto out;
156 }
157
158 /* this could only happen with cancels I think */
159 log_info("ast sb_status %d %x,%llx flags %lx",
160 lp->lksb.sb_status, lp->lockname.ln_type,
161 (unsigned long long)lp->lockname.ln_number,
162 lp->flags);
163 if (lp->lksb.sb_status == -EDEADLOCK &&
164 lp->ls->fsflags & LM_MFLAG_CONV_NODROP) {
165 lp->req = lp->cur;
166 acb.lc_ret |= LM_OUT_CONV_DEADLK;
167 if (lp->cur == DLM_LOCK_IV)
168 lp->lksb.sb_lkid = 0;
169 goto out;
170 } else
171 return;
172 }
173
174 /*
175 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
176 */
177
178 if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
179 wake_up_ast(lp);
180 return;
181 }
182
183 /*
184 * A lock has been demoted to NL because it initially completed during
185 * BLOCK_LOCKS. Now it must be requested in the originally requested
186 * mode.
187 */
188
189 if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
190 gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
191 lp->lockname.ln_type,
192 (unsigned long long)lp->lockname.ln_number);
193 gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
194 lp->lockname.ln_type,
195 (unsigned long long)lp->lockname.ln_number);
196
197 lp->cur = DLM_LOCK_NL;
198 lp->req = lp->prev_req;
199 lp->prev_req = DLM_LOCK_IV;
200 lp->lkf &= ~DLM_LKF_CONVDEADLK;
201
202 set_bit(LFL_NOCACHE, &lp->flags);
203
204 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
205 !test_bit(LFL_NOBLOCK, &lp->flags))
206 gdlm_queue_delayed(lp);
207 else
208 queue_submit(lp);
209 return;
210 }
211
212 /*
213 * A request is granted during dlm recovery. It may be granted
214 * because the locks of a failed node were cleared. In that case,
215 * there may be inconsistent data beneath this lock and we must wait
216 * for recovery to complete to use it. When gfs recovery is done this
217 * granted lock will be converted to NL and then reacquired in this
218 * granted state.
219 */
220
221 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
222 !test_bit(LFL_NOBLOCK, &lp->flags) &&
223 lp->req != DLM_LOCK_NL) {
224
225 lp->cur = lp->req;
226 lp->prev_req = lp->req;
227 lp->req = DLM_LOCK_NL;
228 lp->lkf |= DLM_LKF_CONVERT;
229 lp->lkf &= ~DLM_LKF_CONVDEADLK;
230
231 log_debug("rereq %x,%llx id %x %d,%d",
232 lp->lockname.ln_type,
233 (unsigned long long)lp->lockname.ln_number,
234 lp->lksb.sb_lkid, lp->cur, lp->req);
235
236 set_bit(LFL_REREQUEST, &lp->flags);
237 queue_submit(lp);
238 return;
239 }
240
241 /*
242 * DLM demoted the lock to NL before it was granted so GFS must be
243 * told it cannot cache data for this lock.
244 */
245
246 if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
247 set_bit(LFL_NOCACHE, &lp->flags);
248
249out:
250 /*
251 * This is an internal lock_dlm lock
252 */
253
254 if (test_bit(LFL_INLOCK, &lp->flags)) {
255 clear_bit(LFL_NOBLOCK, &lp->flags);
256 lp->cur = lp->req;
257 wake_up_ast(lp);
258 return;
259 }
260
261 /*
262 * Normal completion of a lock request. Tell GFS it now has the lock.
263 */
264
265 clear_bit(LFL_NOBLOCK, &lp->flags);
266 lp->cur = lp->req;
267
268 acb.lc_name = lp->lockname;
269 acb.lc_ret |= gdlm_make_lmstate(lp->cur);
270
271 if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
272 (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
273 acb.lc_ret |= LM_OUT_CACHEABLE;
274
275 ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
276}
277
278static void gdlm_ast(void *astarg)
279{
280 struct gdlm_lock *lp = astarg;
281 clear_bit(LFL_ACTIVE, &lp->flags);
282 process_complete(lp);
283}
284
285static void process_blocking(struct gdlm_lock *lp, int bast_mode)
286{
287 struct gdlm_ls *ls = lp->ls;
288 unsigned int cb = 0;
289
290 switch (gdlm_make_lmstate(bast_mode)) {
291 case LM_ST_EXCLUSIVE:
292 cb = LM_CB_NEED_E;
293 break;
294 case LM_ST_DEFERRED:
295 cb = LM_CB_NEED_D;
296 break;
297 case LM_ST_SHARED:
298 cb = LM_CB_NEED_S;
299 break;
300 default:
301 gdlm_assert(0, "unknown bast mode %u", bast_mode);
302 }
303
304 ls->fscb(ls->sdp, cb, &lp->lockname);
305}
306
307
308static void gdlm_bast(void *astarg, int mode)
309{
310 struct gdlm_lock *lp = astarg;
311
312 if (!mode) {
313 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
314 lp->lockname.ln_type,
315 (unsigned long long)lp->lockname.ln_number);
316 return;
317 }
318
319 process_blocking(lp, mode);
320}
321
62/* convert gfs lock-state to dlm lock-mode */ 322/* convert gfs lock-state to dlm lock-mode */
63 323
64static s16 make_mode(s16 lmstate) 324static s16 make_mode(s16 lmstate)
@@ -77,24 +337,6 @@ static s16 make_mode(s16 lmstate)
77 return -1; 337 return -1;
78} 338}
79 339
80/* convert dlm lock-mode to gfs lock-state */
81
82s16 gdlm_make_lmstate(s16 dlmmode)
83{
84 switch (dlmmode) {
85 case DLM_LOCK_IV:
86 case DLM_LOCK_NL:
87 return LM_ST_UNLOCKED;
88 case DLM_LOCK_EX:
89 return LM_ST_EXCLUSIVE;
90 case DLM_LOCK_CW:
91 return LM_ST_DEFERRED;
92 case DLM_LOCK_PR:
93 return LM_ST_SHARED;
94 }
95 gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96 return -1;
97}
98 340
99/* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and 341/* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */ 342 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
@@ -173,10 +415,6 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
173 make_strname(name, &lp->strname); 415 make_strname(name, &lp->strname);
174 lp->ls = ls; 416 lp->ls = ls;
175 lp->cur = DLM_LOCK_IV; 417 lp->cur = DLM_LOCK_IV;
176 lp->lvb = NULL;
177 lp->hold_null = NULL;
178 INIT_LIST_HEAD(&lp->clist);
179 INIT_LIST_HEAD(&lp->blist);
180 INIT_LIST_HEAD(&lp->delay_list); 418 INIT_LIST_HEAD(&lp->delay_list);
181 419
182 spin_lock(&ls->async_lock); 420 spin_lock(&ls->async_lock);
@@ -188,26 +426,6 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
188 return 0; 426 return 0;
189} 427}
190 428
191void gdlm_delete_lp(struct gdlm_lock *lp)
192{
193 struct gdlm_ls *ls = lp->ls;
194
195 spin_lock(&ls->async_lock);
196 if (!list_empty(&lp->clist))
197 list_del_init(&lp->clist);
198 if (!list_empty(&lp->blist))
199 list_del_init(&lp->blist);
200 if (!list_empty(&lp->delay_list))
201 list_del_init(&lp->delay_list);
202 gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
203 (unsigned long long)lp->lockname.ln_number);
204 list_del_init(&lp->all_list);
205 ls->all_locks_count--;
206 spin_unlock(&ls->async_lock);
207
208 kfree(lp);
209}
210
211int gdlm_get_lock(void *lockspace, struct lm_lockname *name, 429int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
212 void **lockp) 430 void **lockp)
213{ 431{
@@ -261,7 +479,7 @@ unsigned int gdlm_do_lock(struct gdlm_lock *lp)
261 479
262 if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) { 480 if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
263 lp->lksb.sb_status = -EAGAIN; 481 lp->lksb.sb_status = -EAGAIN;
264 queue_complete(lp); 482 gdlm_ast(lp);
265 error = 0; 483 error = 0;
266 } 484 }
267 485
@@ -311,6 +529,9 @@ unsigned int gdlm_lock(void *lock, unsigned int cur_state,
311 if (req_state == LM_ST_UNLOCKED) 529 if (req_state == LM_ST_UNLOCKED)
312 return gdlm_unlock(lock, cur_state); 530 return gdlm_unlock(lock, cur_state);
313 531
532 if (req_state == LM_ST_UNLOCKED)
533 return gdlm_unlock(lock, cur_state);
534
314 clear_bit(LFL_DLM_CANCEL, &lp->flags); 535 clear_bit(LFL_DLM_CANCEL, &lp->flags);
315 if (flags & LM_FLAG_NOEXP) 536 if (flags & LM_FLAG_NOEXP)
316 set_bit(LFL_NOBLOCK, &lp->flags); 537 set_bit(LFL_NOBLOCK, &lp->flags);
@@ -354,7 +575,7 @@ void gdlm_cancel(void *lock)
354 if (delay_list) { 575 if (delay_list) {
355 set_bit(LFL_CANCEL, &lp->flags); 576 set_bit(LFL_CANCEL, &lp->flags);
356 set_bit(LFL_ACTIVE, &lp->flags); 577 set_bit(LFL_ACTIVE, &lp->flags);
357 queue_complete(lp); 578 gdlm_ast(lp);
358 return; 579 return;
359 } 580 }
360 581
diff --git a/fs/gfs2/locking/dlm/lock_dlm.h b/fs/gfs2/locking/dlm/lock_dlm.h
index a243cf69c54e..ad944c64eab1 100644
--- a/fs/gfs2/locking/dlm/lock_dlm.h
+++ b/fs/gfs2/locking/dlm/lock_dlm.h
@@ -72,15 +72,12 @@ struct gdlm_ls {
72 int recover_jid_done; 72 int recover_jid_done;
73 int recover_jid_status; 73 int recover_jid_status;
74 spinlock_t async_lock; 74 spinlock_t async_lock;
75 struct list_head complete;
76 struct list_head blocking;
77 struct list_head delayed; 75 struct list_head delayed;
78 struct list_head submit; 76 struct list_head submit;
79 struct list_head all_locks; 77 struct list_head all_locks;
80 u32 all_locks_count; 78 u32 all_locks_count;
81 wait_queue_head_t wait_control; 79 wait_queue_head_t wait_control;
82 struct task_struct *thread1; 80 struct task_struct *thread;
83 struct task_struct *thread2;
84 wait_queue_head_t thread_wait; 81 wait_queue_head_t thread_wait;
85 unsigned long drop_time; 82 unsigned long drop_time;
86 int drop_locks_count; 83 int drop_locks_count;
@@ -117,10 +114,6 @@ struct gdlm_lock {
117 u32 lkf; /* dlm flags DLM_LKF_ */ 114 u32 lkf; /* dlm flags DLM_LKF_ */
118 unsigned long flags; /* lock_dlm flags LFL_ */ 115 unsigned long flags; /* lock_dlm flags LFL_ */
119 116
120 int bast_mode; /* protected by async_lock */
121
122 struct list_head clist; /* complete */
123 struct list_head blist; /* blocking */
124 struct list_head delay_list; /* delayed */ 117 struct list_head delay_list; /* delayed */
125 struct list_head all_list; /* all locks for the fs */ 118 struct list_head all_list; /* all locks for the fs */
126 struct gdlm_lock *hold_null; /* NL lock for hold_lvb */ 119 struct gdlm_lock *hold_null; /* NL lock for hold_lvb */
@@ -159,11 +152,8 @@ void gdlm_release_threads(struct gdlm_ls *);
159 152
160/* lock.c */ 153/* lock.c */
161 154
162s16 gdlm_make_lmstate(s16);
163void gdlm_queue_delayed(struct gdlm_lock *);
164void gdlm_submit_delayed(struct gdlm_ls *); 155void gdlm_submit_delayed(struct gdlm_ls *);
165int gdlm_release_all_locks(struct gdlm_ls *); 156int gdlm_release_all_locks(struct gdlm_ls *);
166void gdlm_delete_lp(struct gdlm_lock *);
167unsigned int gdlm_do_lock(struct gdlm_lock *); 157unsigned int gdlm_do_lock(struct gdlm_lock *);
168 158
169int gdlm_get_lock(void *, struct lm_lockname *, void **); 159int gdlm_get_lock(void *, struct lm_lockname *, void **);
diff --git a/fs/gfs2/locking/dlm/mount.c b/fs/gfs2/locking/dlm/mount.c
index 470bdf650b50..0628520a445f 100644
--- a/fs/gfs2/locking/dlm/mount.c
+++ b/fs/gfs2/locking/dlm/mount.c
@@ -28,15 +28,11 @@ static struct gdlm_ls *init_gdlm(lm_callback_t cb, struct gfs2_sbd *sdp,
28 ls->sdp = sdp; 28 ls->sdp = sdp;
29 ls->fsflags = flags; 29 ls->fsflags = flags;
30 spin_lock_init(&ls->async_lock); 30 spin_lock_init(&ls->async_lock);
31 INIT_LIST_HEAD(&ls->complete);
32 INIT_LIST_HEAD(&ls->blocking);
33 INIT_LIST_HEAD(&ls->delayed); 31 INIT_LIST_HEAD(&ls->delayed);
34 INIT_LIST_HEAD(&ls->submit); 32 INIT_LIST_HEAD(&ls->submit);
35 INIT_LIST_HEAD(&ls->all_locks); 33 INIT_LIST_HEAD(&ls->all_locks);
36 init_waitqueue_head(&ls->thread_wait); 34 init_waitqueue_head(&ls->thread_wait);
37 init_waitqueue_head(&ls->wait_control); 35 init_waitqueue_head(&ls->wait_control);
38 ls->thread1 = NULL;
39 ls->thread2 = NULL;
40 ls->drop_time = jiffies; 36 ls->drop_time = jiffies;
41 ls->jid = -1; 37 ls->jid = -1;
42 38
diff --git a/fs/gfs2/locking/dlm/thread.c b/fs/gfs2/locking/dlm/thread.c
index e53db6fd28ab..f30350abd62f 100644
--- a/fs/gfs2/locking/dlm/thread.c
+++ b/fs/gfs2/locking/dlm/thread.c
@@ -9,255 +9,12 @@
9 9
10#include "lock_dlm.h" 10#include "lock_dlm.h"
11 11
12/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm 12static inline int no_work(struct gdlm_ls *ls)
13 thread gets to it. */
14
15static void queue_submit(struct gdlm_lock *lp)
16{
17 struct gdlm_ls *ls = lp->ls;
18
19 spin_lock(&ls->async_lock);
20 list_add_tail(&lp->delay_list, &ls->submit);
21 spin_unlock(&ls->async_lock);
22 wake_up(&ls->thread_wait);
23}
24
25static void process_blocking(struct gdlm_lock *lp, int bast_mode)
26{
27 struct gdlm_ls *ls = lp->ls;
28 unsigned int cb = 0;
29
30 switch (gdlm_make_lmstate(bast_mode)) {
31 case LM_ST_EXCLUSIVE:
32 cb = LM_CB_NEED_E;
33 break;
34 case LM_ST_DEFERRED:
35 cb = LM_CB_NEED_D;
36 break;
37 case LM_ST_SHARED:
38 cb = LM_CB_NEED_S;
39 break;
40 default:
41 gdlm_assert(0, "unknown bast mode %u", lp->bast_mode);
42 }
43
44 ls->fscb(ls->sdp, cb, &lp->lockname);
45}
46
47static void wake_up_ast(struct gdlm_lock *lp)
48{
49 clear_bit(LFL_AST_WAIT, &lp->flags);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp->flags, LFL_AST_WAIT);
52}
53
54static void process_complete(struct gdlm_lock *lp)
55{
56 struct gdlm_ls *ls = lp->ls;
57 struct lm_async_cb acb;
58 s16 prev_mode = lp->cur;
59
60 memset(&acb, 0, sizeof(acb));
61
62 if (lp->lksb.sb_status == -DLM_ECANCEL) {
63 log_info("complete dlm cancel %x,%llx flags %lx",
64 lp->lockname.ln_type,
65 (unsigned long long)lp->lockname.ln_number,
66 lp->flags);
67
68 lp->req = lp->cur;
69 acb.lc_ret |= LM_OUT_CANCELED;
70 if (lp->cur == DLM_LOCK_IV)
71 lp->lksb.sb_lkid = 0;
72 goto out;
73 }
74
75 if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
76 if (lp->lksb.sb_status != -DLM_EUNLOCK) {
77 log_info("unlock sb_status %d %x,%llx flags %lx",
78 lp->lksb.sb_status, lp->lockname.ln_type,
79 (unsigned long long)lp->lockname.ln_number,
80 lp->flags);
81 return;
82 }
83
84 lp->cur = DLM_LOCK_IV;
85 lp->req = DLM_LOCK_IV;
86 lp->lksb.sb_lkid = 0;
87
88 if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
89 gdlm_delete_lp(lp);
90 return;
91 }
92 goto out;
93 }
94
95 if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
96 memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
97
98 if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
99 if (lp->req == DLM_LOCK_PR)
100 lp->req = DLM_LOCK_CW;
101 else if (lp->req == DLM_LOCK_CW)
102 lp->req = DLM_LOCK_PR;
103 }
104
105 /*
106 * A canceled lock request. The lock was just taken off the delayed
107 * list and was never even submitted to dlm.
108 */
109
110 if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
111 log_info("complete internal cancel %x,%llx",
112 lp->lockname.ln_type,
113 (unsigned long long)lp->lockname.ln_number);
114 lp->req = lp->cur;
115 acb.lc_ret |= LM_OUT_CANCELED;
116 goto out;
117 }
118
119 /*
120 * An error occured.
121 */
122
123 if (lp->lksb.sb_status) {
124 /* a "normal" error */
125 if ((lp->lksb.sb_status == -EAGAIN) &&
126 (lp->lkf & DLM_LKF_NOQUEUE)) {
127 lp->req = lp->cur;
128 if (lp->cur == DLM_LOCK_IV)
129 lp->lksb.sb_lkid = 0;
130 goto out;
131 }
132
133 /* this could only happen with cancels I think */
134 log_info("ast sb_status %d %x,%llx flags %lx",
135 lp->lksb.sb_status, lp->lockname.ln_type,
136 (unsigned long long)lp->lockname.ln_number,
137 lp->flags);
138 if (lp->lksb.sb_status == -EDEADLOCK &&
139 lp->ls->fsflags & LM_MFLAG_CONV_NODROP) {
140 lp->req = lp->cur;
141 acb.lc_ret |= LM_OUT_CONV_DEADLK;
142 if (lp->cur == DLM_LOCK_IV)
143 lp->lksb.sb_lkid = 0;
144 goto out;
145 } else
146 return;
147 }
148
149 /*
150 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
151 */
152
153 if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
154 wake_up_ast(lp);
155 return;
156 }
157
158 /*
159 * A lock has been demoted to NL because it initially completed during
160 * BLOCK_LOCKS. Now it must be requested in the originally requested
161 * mode.
162 */
163
164 if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
165 gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
166 lp->lockname.ln_type,
167 (unsigned long long)lp->lockname.ln_number);
168 gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
169 lp->lockname.ln_type,
170 (unsigned long long)lp->lockname.ln_number);
171
172 lp->cur = DLM_LOCK_NL;
173 lp->req = lp->prev_req;
174 lp->prev_req = DLM_LOCK_IV;
175 lp->lkf &= ~DLM_LKF_CONVDEADLK;
176
177 set_bit(LFL_NOCACHE, &lp->flags);
178
179 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
180 !test_bit(LFL_NOBLOCK, &lp->flags))
181 gdlm_queue_delayed(lp);
182 else
183 queue_submit(lp);
184 return;
185 }
186
187 /*
188 * A request is granted during dlm recovery. It may be granted
189 * because the locks of a failed node were cleared. In that case,
190 * there may be inconsistent data beneath this lock and we must wait
191 * for recovery to complete to use it. When gfs recovery is done this
192 * granted lock will be converted to NL and then reacquired in this
193 * granted state.
194 */
195
196 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
197 !test_bit(LFL_NOBLOCK, &lp->flags) &&
198 lp->req != DLM_LOCK_NL) {
199
200 lp->cur = lp->req;
201 lp->prev_req = lp->req;
202 lp->req = DLM_LOCK_NL;
203 lp->lkf |= DLM_LKF_CONVERT;
204 lp->lkf &= ~DLM_LKF_CONVDEADLK;
205
206 log_debug("rereq %x,%llx id %x %d,%d",
207 lp->lockname.ln_type,
208 (unsigned long long)lp->lockname.ln_number,
209 lp->lksb.sb_lkid, lp->cur, lp->req);
210
211 set_bit(LFL_REREQUEST, &lp->flags);
212 queue_submit(lp);
213 return;
214 }
215
216 /*
217 * DLM demoted the lock to NL before it was granted so GFS must be
218 * told it cannot cache data for this lock.
219 */
220
221 if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
222 set_bit(LFL_NOCACHE, &lp->flags);
223
224out:
225 /*
226 * This is an internal lock_dlm lock
227 */
228
229 if (test_bit(LFL_INLOCK, &lp->flags)) {
230 clear_bit(LFL_NOBLOCK, &lp->flags);
231 lp->cur = lp->req;
232 wake_up_ast(lp);
233 return;
234 }
235
236 /*
237 * Normal completion of a lock request. Tell GFS it now has the lock.
238 */
239
240 clear_bit(LFL_NOBLOCK, &lp->flags);
241 lp->cur = lp->req;
242
243 acb.lc_name = lp->lockname;
244 acb.lc_ret |= gdlm_make_lmstate(lp->cur);
245
246 if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
247 (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
248 acb.lc_ret |= LM_OUT_CACHEABLE;
249
250 ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
251}
252
253static inline int no_work(struct gdlm_ls *ls, int blocking)
254{ 13{
255 int ret; 14 int ret;
256 15
257 spin_lock(&ls->async_lock); 16 spin_lock(&ls->async_lock);
258 ret = list_empty(&ls->complete) && list_empty(&ls->submit); 17 ret = list_empty(&ls->submit);
259 if (ret && blocking)
260 ret = list_empty(&ls->blocking);
261 spin_unlock(&ls->async_lock); 18 spin_unlock(&ls->async_lock);
262 19
263 return ret; 20 return ret;
@@ -276,100 +33,55 @@ static inline int check_drop(struct gdlm_ls *ls)
276 return 0; 33 return 0;
277} 34}
278 35
279static int gdlm_thread(void *data, int blist) 36static int gdlm_thread(void *data)
280{ 37{
281 struct gdlm_ls *ls = (struct gdlm_ls *) data; 38 struct gdlm_ls *ls = (struct gdlm_ls *) data;
282 struct gdlm_lock *lp = NULL; 39 struct gdlm_lock *lp = NULL;
283 uint8_t complete, blocking, submit, drop;
284
285 /* Only thread1 is allowed to do blocking callbacks since gfs
286 may wait for a completion callback within a blocking cb. */
287 40
288 while (!kthread_should_stop()) { 41 while (!kthread_should_stop()) {
289 wait_event_interruptible(ls->thread_wait, 42 wait_event_interruptible(ls->thread_wait,
290 !no_work(ls, blist) || kthread_should_stop()); 43 !no_work(ls) || kthread_should_stop());
291
292 complete = blocking = submit = drop = 0;
293 44
294 spin_lock(&ls->async_lock); 45 spin_lock(&ls->async_lock);
295 46
296 if (blist && !list_empty(&ls->blocking)) { 47 if (!list_empty(&ls->submit)) {
297 lp = list_entry(ls->blocking.next, struct gdlm_lock,
298 blist);
299 list_del_init(&lp->blist);
300 blocking = lp->bast_mode;
301 lp->bast_mode = 0;
302 } else if (!list_empty(&ls->complete)) {
303 lp = list_entry(ls->complete.next, struct gdlm_lock,
304 clist);
305 list_del_init(&lp->clist);
306 complete = 1;
307 } else if (!list_empty(&ls->submit)) {
308 lp = list_entry(ls->submit.next, struct gdlm_lock, 48 lp = list_entry(ls->submit.next, struct gdlm_lock,
309 delay_list); 49 delay_list);
310 list_del_init(&lp->delay_list); 50 list_del_init(&lp->delay_list);
311 submit = 1; 51 spin_unlock(&ls->async_lock);
312 }
313
314 drop = check_drop(ls);
315 spin_unlock(&ls->async_lock);
316
317 if (complete)
318 process_complete(lp);
319
320 else if (blocking)
321 process_blocking(lp, blocking);
322
323 else if (submit)
324 gdlm_do_lock(lp); 52 gdlm_do_lock(lp);
325 53 spin_lock(&ls->async_lock);
326 if (drop) 54 }
55 /* Does this ever happen these days? I hope not anyway */
56 if (check_drop(ls)) {
57 spin_unlock(&ls->async_lock);
327 ls->fscb(ls->sdp, LM_CB_DROPLOCKS, NULL); 58 ls->fscb(ls->sdp, LM_CB_DROPLOCKS, NULL);
328 59 spin_lock(&ls->async_lock);
329 schedule(); 60 }
61 spin_unlock(&ls->async_lock);
330 } 62 }
331 63
332 return 0; 64 return 0;
333} 65}
334 66
335static int gdlm_thread1(void *data)
336{
337 return gdlm_thread(data, 1);
338}
339
340static int gdlm_thread2(void *data)
341{
342 return gdlm_thread(data, 0);
343}
344
345int gdlm_init_threads(struct gdlm_ls *ls) 67int gdlm_init_threads(struct gdlm_ls *ls)
346{ 68{
347 struct task_struct *p; 69 struct task_struct *p;
348 int error; 70 int error;
349 71
350 p = kthread_run(gdlm_thread1, ls, "lock_dlm1"); 72 p = kthread_run(gdlm_thread, ls, "lock_dlm");
351 error = IS_ERR(p);
352 if (error) {
353 log_error("can't start lock_dlm1 thread %d", error);
354 return error;
355 }
356 ls->thread1 = p;
357
358 p = kthread_run(gdlm_thread2, ls, "lock_dlm2");
359 error = IS_ERR(p); 73 error = IS_ERR(p);
360 if (error) { 74 if (error) {
361 log_error("can't start lock_dlm2 thread %d", error); 75 log_error("can't start lock_dlm thread %d", error);
362 kthread_stop(ls->thread1);
363 return error; 76 return error;
364 } 77 }
365 ls->thread2 = p; 78 ls->thread = p;
366 79
367 return 0; 80 return 0;
368} 81}
369 82
370void gdlm_release_threads(struct gdlm_ls *ls) 83void gdlm_release_threads(struct gdlm_ls *ls)
371{ 84{
372 kthread_stop(ls->thread1); 85 kthread_stop(ls->thread);
373 kthread_stop(ls->thread2);
374} 86}
375 87