aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/gfs2/locking/dlm/lock.c353
-rw-r--r--fs/gfs2/locking/dlm/lock_dlm.h12
-rw-r--r--fs/gfs2/locking/dlm/mount.c4
-rw-r--r--fs/gfs2/locking/dlm/thread.c324
4 files changed, 306 insertions, 387 deletions
diff --git a/fs/gfs2/locking/dlm/lock.c b/fs/gfs2/locking/dlm/lock.c
index fed9a67be0f1..871ffc9578f2 100644
--- a/fs/gfs2/locking/dlm/lock.c
+++ b/fs/gfs2/locking/dlm/lock.c
@@ -11,46 +11,63 @@
11 11
12static char junk_lvb[GDLM_LVB_SIZE]; 12static char junk_lvb[GDLM_LVB_SIZE];
13 13
14static void queue_complete(struct gdlm_lock *lp) 14
15/* convert dlm lock-mode to gfs lock-state */
16
17static s16 gdlm_make_lmstate(s16 dlmmode)
15{ 18{
16 struct gdlm_ls *ls = lp->ls; 19 switch (dlmmode) {
20 case DLM_LOCK_IV:
21 case DLM_LOCK_NL:
22 return LM_ST_UNLOCKED;
23 case DLM_LOCK_EX:
24 return LM_ST_EXCLUSIVE;
25 case DLM_LOCK_CW:
26 return LM_ST_DEFERRED;
27 case DLM_LOCK_PR:
28 return LM_ST_SHARED;
29 }
30 gdlm_assert(0, "unknown DLM mode %d", dlmmode);
31 return -1;
32}
17 33
18 clear_bit(LFL_ACTIVE, &lp->flags); 34/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
35 thread gets to it. */
36
37static void queue_submit(struct gdlm_lock *lp)
38{
39 struct gdlm_ls *ls = lp->ls;
19 40
20 spin_lock(&ls->async_lock); 41 spin_lock(&ls->async_lock);
21 list_add_tail(&lp->clist, &ls->complete); 42 list_add_tail(&lp->delay_list, &ls->submit);
22 spin_unlock(&ls->async_lock); 43 spin_unlock(&ls->async_lock);
23 wake_up(&ls->thread_wait); 44 wake_up(&ls->thread_wait);
24} 45}
25 46
26static inline void gdlm_ast(void *astarg) 47static void wake_up_ast(struct gdlm_lock *lp)
27{ 48{
28 queue_complete(astarg); 49 clear_bit(LFL_AST_WAIT, &lp->flags);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp->flags, LFL_AST_WAIT);
29} 52}
30 53
31static inline void gdlm_bast(void *astarg, int mode) 54static void gdlm_delete_lp(struct gdlm_lock *lp)
32{ 55{
33 struct gdlm_lock *lp = astarg;
34 struct gdlm_ls *ls = lp->ls; 56 struct gdlm_ls *ls = lp->ls;
35 57
36 if (!mode) {
37 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38 lp->lockname.ln_type,
39 (unsigned long long)lp->lockname.ln_number);
40 return;
41 }
42
43 spin_lock(&ls->async_lock); 58 spin_lock(&ls->async_lock);
44 if (!lp->bast_mode) { 59 if (!list_empty(&lp->delay_list))
45 list_add_tail(&lp->blist, &ls->blocking); 60 list_del_init(&lp->delay_list);
46 lp->bast_mode = mode; 61 gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
47 } else if (lp->bast_mode < mode) 62 (unsigned long long)lp->lockname.ln_number);
48 lp->bast_mode = mode; 63 list_del_init(&lp->all_list);
64 ls->all_locks_count--;
49 spin_unlock(&ls->async_lock); 65 spin_unlock(&ls->async_lock);
50 wake_up(&ls->thread_wait); 66
67 kfree(lp);
51} 68}
52 69
53void gdlm_queue_delayed(struct gdlm_lock *lp) 70static void gdlm_queue_delayed(struct gdlm_lock *lp)
54{ 71{
55 struct gdlm_ls *ls = lp->ls; 72 struct gdlm_ls *ls = lp->ls;
56 73
@@ -59,6 +76,249 @@ void gdlm_queue_delayed(struct gdlm_lock *lp)
59 spin_unlock(&ls->async_lock); 76 spin_unlock(&ls->async_lock);
60} 77}
61 78
79static void process_complete(struct gdlm_lock *lp)
80{
81 struct gdlm_ls *ls = lp->ls;
82 struct lm_async_cb acb;
83 s16 prev_mode = lp->cur;
84
85 memset(&acb, 0, sizeof(acb));
86
87 if (lp->lksb.sb_status == -DLM_ECANCEL) {
88 log_info("complete dlm cancel %x,%llx flags %lx",
89 lp->lockname.ln_type,
90 (unsigned long long)lp->lockname.ln_number,
91 lp->flags);
92
93 lp->req = lp->cur;
94 acb.lc_ret |= LM_OUT_CANCELED;
95 if (lp->cur == DLM_LOCK_IV)
96 lp->lksb.sb_lkid = 0;
97 goto out;
98 }
99
100 if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
101 if (lp->lksb.sb_status != -DLM_EUNLOCK) {
102 log_info("unlock sb_status %d %x,%llx flags %lx",
103 lp->lksb.sb_status, lp->lockname.ln_type,
104 (unsigned long long)lp->lockname.ln_number,
105 lp->flags);
106 return;
107 }
108
109 lp->cur = DLM_LOCK_IV;
110 lp->req = DLM_LOCK_IV;
111 lp->lksb.sb_lkid = 0;
112
113 if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
114 gdlm_delete_lp(lp);
115 return;
116 }
117 goto out;
118 }
119
120 if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
121 memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
122
123 if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
124 if (lp->req == DLM_LOCK_PR)
125 lp->req = DLM_LOCK_CW;
126 else if (lp->req == DLM_LOCK_CW)
127 lp->req = DLM_LOCK_PR;
128 }
129
130 /*
131 * A canceled lock request. The lock was just taken off the delayed
132 * list and was never even submitted to dlm.
133 */
134
135 if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
136 log_info("complete internal cancel %x,%llx",
137 lp->lockname.ln_type,
138 (unsigned long long)lp->lockname.ln_number);
139 lp->req = lp->cur;
140 acb.lc_ret |= LM_OUT_CANCELED;
141 goto out;
142 }
143
144 /*
145 * An error occured.
146 */
147
148 if (lp->lksb.sb_status) {
149 /* a "normal" error */
150 if ((lp->lksb.sb_status == -EAGAIN) &&
151 (lp->lkf & DLM_LKF_NOQUEUE)) {
152 lp->req = lp->cur;
153 if (lp->cur == DLM_LOCK_IV)
154 lp->lksb.sb_lkid = 0;
155 goto out;
156 }
157
158 /* this could only happen with cancels I think */
159 log_info("ast sb_status %d %x,%llx flags %lx",
160 lp->lksb.sb_status, lp->lockname.ln_type,
161 (unsigned long long)lp->lockname.ln_number,
162 lp->flags);
163 if (lp->lksb.sb_status == -EDEADLOCK &&
164 lp->ls->fsflags & LM_MFLAG_CONV_NODROP) {
165 lp->req = lp->cur;
166 acb.lc_ret |= LM_OUT_CONV_DEADLK;
167 if (lp->cur == DLM_LOCK_IV)
168 lp->lksb.sb_lkid = 0;
169 goto out;
170 } else
171 return;
172 }
173
174 /*
175 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
176 */
177
178 if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
179 wake_up_ast(lp);
180 return;
181 }
182
183 /*
184 * A lock has been demoted to NL because it initially completed during
185 * BLOCK_LOCKS. Now it must be requested in the originally requested
186 * mode.
187 */
188
189 if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
190 gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
191 lp->lockname.ln_type,
192 (unsigned long long)lp->lockname.ln_number);
193 gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
194 lp->lockname.ln_type,
195 (unsigned long long)lp->lockname.ln_number);
196
197 lp->cur = DLM_LOCK_NL;
198 lp->req = lp->prev_req;
199 lp->prev_req = DLM_LOCK_IV;
200 lp->lkf &= ~DLM_LKF_CONVDEADLK;
201
202 set_bit(LFL_NOCACHE, &lp->flags);
203
204 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
205 !test_bit(LFL_NOBLOCK, &lp->flags))
206 gdlm_queue_delayed(lp);
207 else
208 queue_submit(lp);
209 return;
210 }
211
212 /*
213 * A request is granted during dlm recovery. It may be granted
214 * because the locks of a failed node were cleared. In that case,
215 * there may be inconsistent data beneath this lock and we must wait
216 * for recovery to complete to use it. When gfs recovery is done this
217 * granted lock will be converted to NL and then reacquired in this
218 * granted state.
219 */
220
221 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
222 !test_bit(LFL_NOBLOCK, &lp->flags) &&
223 lp->req != DLM_LOCK_NL) {
224
225 lp->cur = lp->req;
226 lp->prev_req = lp->req;
227 lp->req = DLM_LOCK_NL;
228 lp->lkf |= DLM_LKF_CONVERT;
229 lp->lkf &= ~DLM_LKF_CONVDEADLK;
230
231 log_debug("rereq %x,%llx id %x %d,%d",
232 lp->lockname.ln_type,
233 (unsigned long long)lp->lockname.ln_number,
234 lp->lksb.sb_lkid, lp->cur, lp->req);
235
236 set_bit(LFL_REREQUEST, &lp->flags);
237 queue_submit(lp);
238 return;
239 }
240
241 /*
242 * DLM demoted the lock to NL before it was granted so GFS must be
243 * told it cannot cache data for this lock.
244 */
245
246 if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
247 set_bit(LFL_NOCACHE, &lp->flags);
248
249out:
250 /*
251 * This is an internal lock_dlm lock
252 */
253
254 if (test_bit(LFL_INLOCK, &lp->flags)) {
255 clear_bit(LFL_NOBLOCK, &lp->flags);
256 lp->cur = lp->req;
257 wake_up_ast(lp);
258 return;
259 }
260
261 /*
262 * Normal completion of a lock request. Tell GFS it now has the lock.
263 */
264
265 clear_bit(LFL_NOBLOCK, &lp->flags);
266 lp->cur = lp->req;
267
268 acb.lc_name = lp->lockname;
269 acb.lc_ret |= gdlm_make_lmstate(lp->cur);
270
271 if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
272 (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
273 acb.lc_ret |= LM_OUT_CACHEABLE;
274
275 ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
276}
277
278static void gdlm_ast(void *astarg)
279{
280 struct gdlm_lock *lp = astarg;
281 clear_bit(LFL_ACTIVE, &lp->flags);
282 process_complete(lp);
283}
284
285static void process_blocking(struct gdlm_lock *lp, int bast_mode)
286{
287 struct gdlm_ls *ls = lp->ls;
288 unsigned int cb = 0;
289
290 switch (gdlm_make_lmstate(bast_mode)) {
291 case LM_ST_EXCLUSIVE:
292 cb = LM_CB_NEED_E;
293 break;
294 case LM_ST_DEFERRED:
295 cb = LM_CB_NEED_D;
296 break;
297 case LM_ST_SHARED:
298 cb = LM_CB_NEED_S;
299 break;
300 default:
301 gdlm_assert(0, "unknown bast mode %u", bast_mode);
302 }
303
304 ls->fscb(ls->sdp, cb, &lp->lockname);
305}
306
307
308static void gdlm_bast(void *astarg, int mode)
309{
310 struct gdlm_lock *lp = astarg;
311
312 if (!mode) {
313 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
314 lp->lockname.ln_type,
315 (unsigned long long)lp->lockname.ln_number);
316 return;
317 }
318
319 process_blocking(lp, mode);
320}
321
62/* convert gfs lock-state to dlm lock-mode */ 322/* convert gfs lock-state to dlm lock-mode */
63 323
64static s16 make_mode(s16 lmstate) 324static s16 make_mode(s16 lmstate)
@@ -77,24 +337,6 @@ static s16 make_mode(s16 lmstate)
77 return -1; 337 return -1;
78} 338}
79 339
80/* convert dlm lock-mode to gfs lock-state */
81
82s16 gdlm_make_lmstate(s16 dlmmode)
83{
84 switch (dlmmode) {
85 case DLM_LOCK_IV:
86 case DLM_LOCK_NL:
87 return LM_ST_UNLOCKED;
88 case DLM_LOCK_EX:
89 return LM_ST_EXCLUSIVE;
90 case DLM_LOCK_CW:
91 return LM_ST_DEFERRED;
92 case DLM_LOCK_PR:
93 return LM_ST_SHARED;
94 }
95 gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96 return -1;
97}
98 340
99/* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and 341/* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */ 342 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
@@ -173,10 +415,6 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
173 make_strname(name, &lp->strname); 415 make_strname(name, &lp->strname);
174 lp->ls = ls; 416 lp->ls = ls;
175 lp->cur = DLM_LOCK_IV; 417 lp->cur = DLM_LOCK_IV;
176 lp->lvb = NULL;
177 lp->hold_null = NULL;
178 INIT_LIST_HEAD(&lp->clist);
179 INIT_LIST_HEAD(&lp->blist);
180 INIT_LIST_HEAD(&lp->delay_list); 418 INIT_LIST_HEAD(&lp->delay_list);
181 419
182 spin_lock(&ls->async_lock); 420 spin_lock(&ls->async_lock);
@@ -188,26 +426,6 @@ static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
188 return 0; 426 return 0;
189} 427}
190 428
191void gdlm_delete_lp(struct gdlm_lock *lp)
192{
193 struct gdlm_ls *ls = lp->ls;
194
195 spin_lock(&ls->async_lock);
196 if (!list_empty(&lp->clist))
197 list_del_init(&lp->clist);
198 if (!list_empty(&lp->blist))
199 list_del_init(&lp->blist);
200 if (!list_empty(&lp->delay_list))
201 list_del_init(&lp->delay_list);
202 gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
203 (unsigned long long)lp->lockname.ln_number);
204 list_del_init(&lp->all_list);
205 ls->all_locks_count--;
206 spin_unlock(&ls->async_lock);
207
208 kfree(lp);
209}
210
211int gdlm_get_lock(void *lockspace, struct lm_lockname *name, 429int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
212 void **lockp) 430 void **lockp)
213{ 431{
@@ -261,7 +479,7 @@ unsigned int gdlm_do_lock(struct gdlm_lock *lp)
261 479
262 if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) { 480 if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
263 lp->lksb.sb_status = -EAGAIN; 481 lp->lksb.sb_status = -EAGAIN;
264 queue_complete(lp); 482 gdlm_ast(lp);
265 error = 0; 483 error = 0;
266 } 484 }
267 485
@@ -311,6 +529,9 @@ unsigned int gdlm_lock(void *lock, unsigned int cur_state,
311 if (req_state == LM_ST_UNLOCKED) 529 if (req_state == LM_ST_UNLOCKED)
312 return gdlm_unlock(lock, cur_state); 530 return gdlm_unlock(lock, cur_state);
313 531
532 if (req_state == LM_ST_UNLOCKED)
533 return gdlm_unlock(lock, cur_state);
534
314 clear_bit(LFL_DLM_CANCEL, &lp->flags); 535 clear_bit(LFL_DLM_CANCEL, &lp->flags);
315 if (flags & LM_FLAG_NOEXP) 536 if (flags & LM_FLAG_NOEXP)
316 set_bit(LFL_NOBLOCK, &lp->flags); 537 set_bit(LFL_NOBLOCK, &lp->flags);
@@ -354,7 +575,7 @@ void gdlm_cancel(void *lock)
354 if (delay_list) { 575 if (delay_list) {
355 set_bit(LFL_CANCEL, &lp->flags); 576 set_bit(LFL_CANCEL, &lp->flags);
356 set_bit(LFL_ACTIVE, &lp->flags); 577 set_bit(LFL_ACTIVE, &lp->flags);
357 queue_complete(lp); 578 gdlm_ast(lp);
358 return; 579 return;
359 } 580 }
360 581
diff --git a/fs/gfs2/locking/dlm/lock_dlm.h b/fs/gfs2/locking/dlm/lock_dlm.h
index a243cf69c54e..ad944c64eab1 100644
--- a/fs/gfs2/locking/dlm/lock_dlm.h
+++ b/fs/gfs2/locking/dlm/lock_dlm.h
@@ -72,15 +72,12 @@ struct gdlm_ls {
72 int recover_jid_done; 72 int recover_jid_done;
73 int recover_jid_status; 73 int recover_jid_status;
74 spinlock_t async_lock; 74 spinlock_t async_lock;
75 struct list_head complete;
76 struct list_head blocking;
77 struct list_head delayed; 75 struct list_head delayed;
78 struct list_head submit; 76 struct list_head submit;
79 struct list_head all_locks; 77 struct list_head all_locks;
80 u32 all_locks_count; 78 u32 all_locks_count;
81 wait_queue_head_t wait_control; 79 wait_queue_head_t wait_control;
82 struct task_struct *thread1; 80 struct task_struct *thread;
83 struct task_struct *thread2;
84 wait_queue_head_t thread_wait; 81 wait_queue_head_t thread_wait;
85 unsigned long drop_time; 82 unsigned long drop_time;
86 int drop_locks_count; 83 int drop_locks_count;
@@ -117,10 +114,6 @@ struct gdlm_lock {
117 u32 lkf; /* dlm flags DLM_LKF_ */ 114 u32 lkf; /* dlm flags DLM_LKF_ */
118 unsigned long flags; /* lock_dlm flags LFL_ */ 115 unsigned long flags; /* lock_dlm flags LFL_ */
119 116
120 int bast_mode; /* protected by async_lock */
121
122 struct list_head clist; /* complete */
123 struct list_head blist; /* blocking */
124 struct list_head delay_list; /* delayed */ 117 struct list_head delay_list; /* delayed */
125 struct list_head all_list; /* all locks for the fs */ 118 struct list_head all_list; /* all locks for the fs */
126 struct gdlm_lock *hold_null; /* NL lock for hold_lvb */ 119 struct gdlm_lock *hold_null; /* NL lock for hold_lvb */
@@ -159,11 +152,8 @@ void gdlm_release_threads(struct gdlm_ls *);
159 152
160/* lock.c */ 153/* lock.c */
161 154
162s16 gdlm_make_lmstate(s16);
163void gdlm_queue_delayed(struct gdlm_lock *);
164void gdlm_submit_delayed(struct gdlm_ls *); 155void gdlm_submit_delayed(struct gdlm_ls *);
165int gdlm_release_all_locks(struct gdlm_ls *); 156int gdlm_release_all_locks(struct gdlm_ls *);
166void gdlm_delete_lp(struct gdlm_lock *);
167unsigned int gdlm_do_lock(struct gdlm_lock *); 157unsigned int gdlm_do_lock(struct gdlm_lock *);
168 158
169int gdlm_get_lock(void *, struct lm_lockname *, void **); 159int gdlm_get_lock(void *, struct lm_lockname *, void **);
diff --git a/fs/gfs2/locking/dlm/mount.c b/fs/gfs2/locking/dlm/mount.c
index 470bdf650b50..0628520a445f 100644
--- a/fs/gfs2/locking/dlm/mount.c
+++ b/fs/gfs2/locking/dlm/mount.c
@@ -28,15 +28,11 @@ static struct gdlm_ls *init_gdlm(lm_callback_t cb, struct gfs2_sbd *sdp,
28 ls->sdp = sdp; 28 ls->sdp = sdp;
29 ls->fsflags = flags; 29 ls->fsflags = flags;
30 spin_lock_init(&ls->async_lock); 30 spin_lock_init(&ls->async_lock);
31 INIT_LIST_HEAD(&ls->complete);
32 INIT_LIST_HEAD(&ls->blocking);
33 INIT_LIST_HEAD(&ls->delayed); 31 INIT_LIST_HEAD(&ls->delayed);
34 INIT_LIST_HEAD(&ls->submit); 32 INIT_LIST_HEAD(&ls->submit);
35 INIT_LIST_HEAD(&ls->all_locks); 33 INIT_LIST_HEAD(&ls->all_locks);
36 init_waitqueue_head(&ls->thread_wait); 34 init_waitqueue_head(&ls->thread_wait);
37 init_waitqueue_head(&ls->wait_control); 35 init_waitqueue_head(&ls->wait_control);
38 ls->thread1 = NULL;
39 ls->thread2 = NULL;
40 ls->drop_time = jiffies; 36 ls->drop_time = jiffies;
41 ls->jid = -1; 37 ls->jid = -1;
42 38
diff --git a/fs/gfs2/locking/dlm/thread.c b/fs/gfs2/locking/dlm/thread.c
index e53db6fd28ab..f30350abd62f 100644
--- a/fs/gfs2/locking/dlm/thread.c
+++ b/fs/gfs2/locking/dlm/thread.c
@@ -9,255 +9,12 @@
9 9
10#include "lock_dlm.h" 10#include "lock_dlm.h"
11 11
12/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm 12static inline int no_work(struct gdlm_ls *ls)
13 thread gets to it. */
14
15static void queue_submit(struct gdlm_lock *lp)
16{
17 struct gdlm_ls *ls = lp->ls;
18
19 spin_lock(&ls->async_lock);
20 list_add_tail(&lp->delay_list, &ls->submit);
21 spin_unlock(&ls->async_lock);
22 wake_up(&ls->thread_wait);
23}
24
25static void process_blocking(struct gdlm_lock *lp, int bast_mode)
26{
27 struct gdlm_ls *ls = lp->ls;
28 unsigned int cb = 0;
29
30 switch (gdlm_make_lmstate(bast_mode)) {
31 case LM_ST_EXCLUSIVE:
32 cb = LM_CB_NEED_E;
33 break;
34 case LM_ST_DEFERRED:
35 cb = LM_CB_NEED_D;
36 break;
37 case LM_ST_SHARED:
38 cb = LM_CB_NEED_S;
39 break;
40 default:
41 gdlm_assert(0, "unknown bast mode %u", lp->bast_mode);
42 }
43
44 ls->fscb(ls->sdp, cb, &lp->lockname);
45}
46
47static void wake_up_ast(struct gdlm_lock *lp)
48{
49 clear_bit(LFL_AST_WAIT, &lp->flags);
50 smp_mb__after_clear_bit();
51 wake_up_bit(&lp->flags, LFL_AST_WAIT);
52}
53
54static void process_complete(struct gdlm_lock *lp)
55{
56 struct gdlm_ls *ls = lp->ls;
57 struct lm_async_cb acb;
58 s16 prev_mode = lp->cur;
59
60 memset(&acb, 0, sizeof(acb));
61
62 if (lp->lksb.sb_status == -DLM_ECANCEL) {
63 log_info("complete dlm cancel %x,%llx flags %lx",
64 lp->lockname.ln_type,
65 (unsigned long long)lp->lockname.ln_number,
66 lp->flags);
67
68 lp->req = lp->cur;
69 acb.lc_ret |= LM_OUT_CANCELED;
70 if (lp->cur == DLM_LOCK_IV)
71 lp->lksb.sb_lkid = 0;
72 goto out;
73 }
74
75 if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
76 if (lp->lksb.sb_status != -DLM_EUNLOCK) {
77 log_info("unlock sb_status %d %x,%llx flags %lx",
78 lp->lksb.sb_status, lp->lockname.ln_type,
79 (unsigned long long)lp->lockname.ln_number,
80 lp->flags);
81 return;
82 }
83
84 lp->cur = DLM_LOCK_IV;
85 lp->req = DLM_LOCK_IV;
86 lp->lksb.sb_lkid = 0;
87
88 if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
89 gdlm_delete_lp(lp);
90 return;
91 }
92 goto out;
93 }
94
95 if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
96 memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
97
98 if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
99 if (lp->req == DLM_LOCK_PR)
100 lp->req = DLM_LOCK_CW;
101 else if (lp->req == DLM_LOCK_CW)
102 lp->req = DLM_LOCK_PR;
103 }
104
105 /*
106 * A canceled lock request. The lock was just taken off the delayed
107 * list and was never even submitted to dlm.
108 */
109
110 if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
111 log_info("complete internal cancel %x,%llx",
112 lp->lockname.ln_type,
113 (unsigned long long)lp->lockname.ln_number);
114 lp->req = lp->cur;
115 acb.lc_ret |= LM_OUT_CANCELED;
116 goto out;
117 }
118
119 /*
120 * An error occured.
121 */
122
123 if (lp->lksb.sb_status) {
124 /* a "normal" error */
125 if ((lp->lksb.sb_status == -EAGAIN) &&
126 (lp->lkf & DLM_LKF_NOQUEUE)) {
127 lp->req = lp->cur;
128 if (lp->cur == DLM_LOCK_IV)
129 lp->lksb.sb_lkid = 0;
130 goto out;
131 }
132
133 /* this could only happen with cancels I think */
134 log_info("ast sb_status %d %x,%llx flags %lx",
135 lp->lksb.sb_status, lp->lockname.ln_type,
136 (unsigned long long)lp->lockname.ln_number,
137 lp->flags);
138 if (lp->lksb.sb_status == -EDEADLOCK &&
139 lp->ls->fsflags & LM_MFLAG_CONV_NODROP) {
140 lp->req = lp->cur;
141 acb.lc_ret |= LM_OUT_CONV_DEADLK;
142 if (lp->cur == DLM_LOCK_IV)
143 lp->lksb.sb_lkid = 0;
144 goto out;
145 } else
146 return;
147 }
148
149 /*
150 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
151 */
152
153 if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
154 wake_up_ast(lp);
155 return;
156 }
157
158 /*
159 * A lock has been demoted to NL because it initially completed during
160 * BLOCK_LOCKS. Now it must be requested in the originally requested
161 * mode.
162 */
163
164 if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
165 gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
166 lp->lockname.ln_type,
167 (unsigned long long)lp->lockname.ln_number);
168 gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
169 lp->lockname.ln_type,
170 (unsigned long long)lp->lockname.ln_number);
171
172 lp->cur = DLM_LOCK_NL;
173 lp->req = lp->prev_req;
174 lp->prev_req = DLM_LOCK_IV;
175 lp->lkf &= ~DLM_LKF_CONVDEADLK;
176
177 set_bit(LFL_NOCACHE, &lp->flags);
178
179 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
180 !test_bit(LFL_NOBLOCK, &lp->flags))
181 gdlm_queue_delayed(lp);
182 else
183 queue_submit(lp);
184 return;
185 }
186
187 /*
188 * A request is granted during dlm recovery. It may be granted
189 * because the locks of a failed node were cleared. In that case,
190 * there may be inconsistent data beneath this lock and we must wait
191 * for recovery to complete to use it. When gfs recovery is done this
192 * granted lock will be converted to NL and then reacquired in this
193 * granted state.
194 */
195
196 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
197 !test_bit(LFL_NOBLOCK, &lp->flags) &&
198 lp->req != DLM_LOCK_NL) {
199
200 lp->cur = lp->req;
201 lp->prev_req = lp->req;
202 lp->req = DLM_LOCK_NL;
203 lp->lkf |= DLM_LKF_CONVERT;
204 lp->lkf &= ~DLM_LKF_CONVDEADLK;
205
206 log_debug("rereq %x,%llx id %x %d,%d",
207 lp->lockname.ln_type,
208 (unsigned long long)lp->lockname.ln_number,
209 lp->lksb.sb_lkid, lp->cur, lp->req);
210
211 set_bit(LFL_REREQUEST, &lp->flags);
212 queue_submit(lp);
213 return;
214 }
215
216 /*
217 * DLM demoted the lock to NL before it was granted so GFS must be
218 * told it cannot cache data for this lock.
219 */
220
221 if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
222 set_bit(LFL_NOCACHE, &lp->flags);
223
224out:
225 /*
226 * This is an internal lock_dlm lock
227 */
228
229 if (test_bit(LFL_INLOCK, &lp->flags)) {
230 clear_bit(LFL_NOBLOCK, &lp->flags);
231 lp->cur = lp->req;
232 wake_up_ast(lp);
233 return;
234 }
235
236 /*
237 * Normal completion of a lock request. Tell GFS it now has the lock.
238 */
239
240 clear_bit(LFL_NOBLOCK, &lp->flags);
241 lp->cur = lp->req;
242
243 acb.lc_name = lp->lockname;
244 acb.lc_ret |= gdlm_make_lmstate(lp->cur);
245
246 if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
247 (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
248 acb.lc_ret |= LM_OUT_CACHEABLE;
249
250 ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
251}
252
253static inline int no_work(struct gdlm_ls *ls, int blocking)
254{ 13{
255 int ret; 14 int ret;
256 15
257 spin_lock(&ls->async_lock); 16 spin_lock(&ls->async_lock);
258 ret = list_empty(&ls->complete) && list_empty(&ls->submit); 17 ret = list_empty(&ls->submit);
259 if (ret && blocking)
260 ret = list_empty(&ls->blocking);
261 spin_unlock(&ls->async_lock); 18 spin_unlock(&ls->async_lock);
262 19
263 return ret; 20 return ret;
@@ -276,100 +33,55 @@ static inline int check_drop(struct gdlm_ls *ls)
276 return 0; 33 return 0;
277} 34}
278 35
279static int gdlm_thread(void *data, int blist) 36static int gdlm_thread(void *data)
280{ 37{
281 struct gdlm_ls *ls = (struct gdlm_ls *) data; 38 struct gdlm_ls *ls = (struct gdlm_ls *) data;
282 struct gdlm_lock *lp = NULL; 39 struct gdlm_lock *lp = NULL;
283 uint8_t complete, blocking, submit, drop;
284
285 /* Only thread1 is allowed to do blocking callbacks since gfs
286 may wait for a completion callback within a blocking cb. */
287 40
288 while (!kthread_should_stop()) { 41 while (!kthread_should_stop()) {
289 wait_event_interruptible(ls->thread_wait, 42 wait_event_interruptible(ls->thread_wait,
290 !no_work(ls, blist) || kthread_should_stop()); 43 !no_work(ls) || kthread_should_stop());
291
292 complete = blocking = submit = drop = 0;
293 44
294 spin_lock(&ls->async_lock); 45 spin_lock(&ls->async_lock);
295 46
296 if (blist && !list_empty(&ls->blocking)) { 47 if (!list_empty(&ls->submit)) {
297 lp = list_entry(ls->blocking.next, struct gdlm_lock,
298 blist);
299 list_del_init(&lp->blist);
300 blocking = lp->bast_mode;
301 lp->bast_mode = 0;
302 } else if (!list_empty(&ls->complete)) {
303 lp = list_entry(ls->complete.next, struct gdlm_lock,
304 clist);
305 list_del_init(&lp->clist);
306 complete = 1;
307 } else if (!list_empty(&ls->submit)) {
308 lp = list_entry(ls->submit.next, struct gdlm_lock, 48 lp = list_entry(ls->submit.next, struct gdlm_lock,
309 delay_list); 49 delay_list);
310 list_del_init(&lp->delay_list); 50 list_del_init(&lp->delay_list);
311 submit = 1; 51 spin_unlock(&ls->async_lock);
312 }
313
314 drop = check_drop(ls);
315 spin_unlock(&ls->async_lock);
316
317 if (complete)
318 process_complete(lp);
319
320 else if (blocking)
321 process_blocking(lp, blocking);
322
323 else if (submit)
324 gdlm_do_lock(lp); 52 gdlm_do_lock(lp);
325 53 spin_lock(&ls->async_lock);
326 if (drop) 54 }
55 /* Does this ever happen these days? I hope not anyway */
56 if (check_drop(ls)) {
57 spin_unlock(&ls->async_lock);
327 ls->fscb(ls->sdp, LM_CB_DROPLOCKS, NULL); 58 ls->fscb(ls->sdp, LM_CB_DROPLOCKS, NULL);
328 59 spin_lock(&ls->async_lock);
329 schedule(); 60 }
61 spin_unlock(&ls->async_lock);
330 } 62 }
331 63
332 return 0; 64 return 0;
333} 65}
334 66
335static int gdlm_thread1(void *data)
336{
337 return gdlm_thread(data, 1);
338}
339
340static int gdlm_thread2(void *data)
341{
342 return gdlm_thread(data, 0);
343}
344
345int gdlm_init_threads(struct gdlm_ls *ls) 67int gdlm_init_threads(struct gdlm_ls *ls)
346{ 68{
347 struct task_struct *p; 69 struct task_struct *p;
348 int error; 70 int error;
349 71
350 p = kthread_run(gdlm_thread1, ls, "lock_dlm1"); 72 p = kthread_run(gdlm_thread, ls, "lock_dlm");
351 error = IS_ERR(p);
352 if (error) {
353 log_error("can't start lock_dlm1 thread %d", error);
354 return error;
355 }
356 ls->thread1 = p;
357
358 p = kthread_run(gdlm_thread2, ls, "lock_dlm2");
359 error = IS_ERR(p); 73 error = IS_ERR(p);
360 if (error) { 74 if (error) {
361 log_error("can't start lock_dlm2 thread %d", error); 75 log_error("can't start lock_dlm thread %d", error);
362 kthread_stop(ls->thread1);
363 return error; 76 return error;
364 } 77 }
365 ls->thread2 = p; 78 ls->thread = p;
366 79
367 return 0; 80 return 0;
368} 81}
369 82
370void gdlm_release_threads(struct gdlm_ls *ls) 83void gdlm_release_threads(struct gdlm_ls *ls)
371{ 84{
372 kthread_stop(ls->thread1); 85 kthread_stop(ls->thread);
373 kthread_stop(ls->thread2);
374} 86}
375 87