aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c3610
1 files changed, 3610 insertions, 0 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
new file mode 100644
index 000000000000..81efb361f95d
--- /dev/null
+++ b/fs/dlm/lock.c
@@ -0,0 +1,3610 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
58
59#include "dlm_internal.h"
60#include "memory.h"
61#include "lowcomms.h"
62#include "requestqueue.h"
63#include "util.h"
64#include "dir.h"
65#include "member.h"
66#include "lockspace.h"
67#include "ast.h"
68#include "lock.h"
69#include "rcom.h"
70#include "recover.h"
71#include "lvb_table.h"
72#include "config.h"
73
74static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_remove(struct dlm_rsb *r);
82static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84 struct dlm_message *ms);
85static int receive_extralen(struct dlm_message *ms);
86
87/*
88 * Lock compatibilty matrix - thanks Steve
89 * UN = Unlocked state. Not really a state, used as a flag
90 * PD = Padding. Used to make the matrix a nice power of two in size
91 * Other states are the same as the VMS DLM.
92 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
93 */
94
95static const int __dlm_compat_matrix[8][8] = {
96 /* UN NL CR CW PR PW EX PD */
97 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
98 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
99 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
100 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
101 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
102 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
103 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
104 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
105};
106
107/*
108 * This defines the direction of transfer of LVB data.
109 * Granted mode is the row; requested mode is the column.
110 * Usage: matrix[grmode+1][rqmode+1]
111 * 1 = LVB is returned to the caller
112 * 0 = LVB is written to the resource
113 * -1 = nothing happens to the LVB
114 */
115
116const int dlm_lvb_operations[8][8] = {
117 /* UN NL CR CW PR PW EX PD*/
118 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
119 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
120 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
121 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
122 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
123 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
124 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
125 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
126};
127EXPORT_SYMBOL_GPL(dlm_lvb_operations);
128
129#define modes_compat(gr, rq) \
130 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
131
132int dlm_modes_compat(int mode1, int mode2)
133{
134 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
135}
136
137/*
138 * Compatibility matrix for conversions with QUECVT set.
139 * Granted mode is the row; requested mode is the column.
140 * Usage: matrix[grmode+1][rqmode+1]
141 */
142
143static const int __quecvt_compat_matrix[8][8] = {
144 /* UN NL CR CW PR PW EX PD */
145 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
146 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
147 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
148 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
149 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
150 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
152 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
153};
154
155static void dlm_print_lkb(struct dlm_lkb *lkb)
156{
157 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
162}
163
164void dlm_print_rsb(struct dlm_rsb *r)
165{
166 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167 r->res_nodeid, r->res_flags, r->res_first_lkid,
168 r->res_recover_locks_count, r->res_name);
169}
170
171/* Threads cannot use the lockspace while it's being recovered */
172
173static inline void lock_recovery(struct dlm_ls *ls)
174{
175 down_read(&ls->ls_in_recovery);
176}
177
178static inline void unlock_recovery(struct dlm_ls *ls)
179{
180 up_read(&ls->ls_in_recovery);
181}
182
183static inline int lock_recovery_try(struct dlm_ls *ls)
184{
185 return down_read_trylock(&ls->ls_in_recovery);
186}
187
188static inline int can_be_queued(struct dlm_lkb *lkb)
189{
190 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
191}
192
193static inline int force_blocking_asts(struct dlm_lkb *lkb)
194{
195 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
196}
197
198static inline int is_demoted(struct dlm_lkb *lkb)
199{
200 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
201}
202
203static inline int is_remote(struct dlm_rsb *r)
204{
205 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206 return !!r->res_nodeid;
207}
208
209static inline int is_process_copy(struct dlm_lkb *lkb)
210{
211 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
212}
213
214static inline int is_master_copy(struct dlm_lkb *lkb)
215{
216 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
218 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? TRUE : FALSE;
219}
220
221static inline int middle_conversion(struct dlm_lkb *lkb)
222{
223 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
225 return TRUE;
226 return FALSE;
227}
228
229static inline int down_conversion(struct dlm_lkb *lkb)
230{
231 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
232}
233
234static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
235{
236 if (is_master_copy(lkb))
237 return;
238
239 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
240
241 lkb->lkb_lksb->sb_status = rv;
242 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
243
244 dlm_add_ast(lkb, AST_COMP);
245}
246
247static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
248{
249 if (is_master_copy(lkb))
250 send_bast(r, lkb, rqmode);
251 else {
252 lkb->lkb_bastmode = rqmode;
253 dlm_add_ast(lkb, AST_BAST);
254 }
255}
256
257/*
258 * Basic operations on rsb's and lkb's
259 */
260
261static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
262{
263 struct dlm_rsb *r;
264
265 r = allocate_rsb(ls, len);
266 if (!r)
267 return NULL;
268
269 r->res_ls = ls;
270 r->res_length = len;
271 memcpy(r->res_name, name, len);
272 init_MUTEX(&r->res_sem);
273
274 INIT_LIST_HEAD(&r->res_lookup);
275 INIT_LIST_HEAD(&r->res_grantqueue);
276 INIT_LIST_HEAD(&r->res_convertqueue);
277 INIT_LIST_HEAD(&r->res_waitqueue);
278 INIT_LIST_HEAD(&r->res_root_list);
279 INIT_LIST_HEAD(&r->res_recover_list);
280
281 return r;
282}
283
284static int search_rsb_list(struct list_head *head, char *name, int len,
285 unsigned int flags, struct dlm_rsb **r_ret)
286{
287 struct dlm_rsb *r;
288 int error = 0;
289
290 list_for_each_entry(r, head, res_hashchain) {
291 if (len == r->res_length && !memcmp(name, r->res_name, len))
292 goto found;
293 }
294 return -ENOENT;
295
296 found:
297 if (r->res_nodeid && (flags & R_MASTER))
298 error = -ENOTBLK;
299 *r_ret = r;
300 return error;
301}
302
303static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304 unsigned int flags, struct dlm_rsb **r_ret)
305{
306 struct dlm_rsb *r;
307 int error;
308
309 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
310 if (!error) {
311 kref_get(&r->res_ref);
312 goto out;
313 }
314 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
315 if (error)
316 goto out;
317
318 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
319
320 if (dlm_no_directory(ls))
321 goto out;
322
323 if (r->res_nodeid == -1) {
324 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325 r->res_first_lkid = 0;
326 } else if (r->res_nodeid > 0) {
327 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
329 } else {
330 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
332 }
333 out:
334 *r_ret = r;
335 return error;
336}
337
338static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339 unsigned int flags, struct dlm_rsb **r_ret)
340{
341 int error;
342 write_lock(&ls->ls_rsbtbl[b].lock);
343 error = _search_rsb(ls, name, len, b, flags, r_ret);
344 write_unlock(&ls->ls_rsbtbl[b].lock);
345 return error;
346}
347
348/*
349 * Find rsb in rsbtbl and potentially create/add one
350 *
351 * Delaying the release of rsb's has a similar benefit to applications keeping
352 * NL locks on an rsb, but without the guarantee that the cached master value
353 * will still be valid when the rsb is reused. Apps aren't always smart enough
354 * to keep NL locks on an rsb that they may lock again shortly; this can lead
355 * to excessive master lookups and removals if we don't delay the release.
356 *
357 * Searching for an rsb means looking through both the normal list and toss
358 * list. When found on the toss list the rsb is moved to the normal list with
359 * ref count of 1; when found on normal list the ref count is incremented.
360 */
361
362static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363 unsigned int flags, struct dlm_rsb **r_ret)
364{
365 struct dlm_rsb *r, *tmp;
366 uint32_t hash, bucket;
367 int error = 0;
368
369 if (dlm_no_directory(ls))
370 flags |= R_CREATE;
371
372 hash = jhash(name, namelen, 0);
373 bucket = hash & (ls->ls_rsbtbl_size - 1);
374
375 error = search_rsb(ls, name, namelen, bucket, flags, &r);
376 if (!error)
377 goto out;
378
379 if (error == -ENOENT && !(flags & R_CREATE))
380 goto out;
381
382 /* the rsb was found but wasn't a master copy */
383 if (error == -ENOTBLK)
384 goto out;
385
386 error = -ENOMEM;
387 r = create_rsb(ls, name, namelen);
388 if (!r)
389 goto out;
390
391 r->res_hash = hash;
392 r->res_bucket = bucket;
393 r->res_nodeid = -1;
394 kref_init(&r->res_ref);
395
396 /* With no directory, the master can be set immediately */
397 if (dlm_no_directory(ls)) {
398 int nodeid = dlm_dir_nodeid(r);
399 if (nodeid == dlm_our_nodeid())
400 nodeid = 0;
401 r->res_nodeid = nodeid;
402 }
403
404 write_lock(&ls->ls_rsbtbl[bucket].lock);
405 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
406 if (!error) {
407 write_unlock(&ls->ls_rsbtbl[bucket].lock);
408 free_rsb(r);
409 r = tmp;
410 goto out;
411 }
412 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413 write_unlock(&ls->ls_rsbtbl[bucket].lock);
414 error = 0;
415 out:
416 *r_ret = r;
417 return error;
418}
419
420int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421 unsigned int flags, struct dlm_rsb **r_ret)
422{
423 return find_rsb(ls, name, namelen, flags, r_ret);
424}
425
426/* This is only called to add a reference when the code already holds
427 a valid reference to the rsb, so there's no need for locking. */
428
429static inline void hold_rsb(struct dlm_rsb *r)
430{
431 kref_get(&r->res_ref);
432}
433
434void dlm_hold_rsb(struct dlm_rsb *r)
435{
436 hold_rsb(r);
437}
438
439static void toss_rsb(struct kref *kref)
440{
441 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442 struct dlm_ls *ls = r->res_ls;
443
444 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445 kref_init(&r->res_ref);
446 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447 r->res_toss_time = jiffies;
448 if (r->res_lvbptr) {
449 free_lvb(r->res_lvbptr);
450 r->res_lvbptr = NULL;
451 }
452}
453
454/* When all references to the rsb are gone it's transfered to
455 the tossed list for later disposal. */
456
457static void put_rsb(struct dlm_rsb *r)
458{
459 struct dlm_ls *ls = r->res_ls;
460 uint32_t bucket = r->res_bucket;
461
462 write_lock(&ls->ls_rsbtbl[bucket].lock);
463 kref_put(&r->res_ref, toss_rsb);
464 write_unlock(&ls->ls_rsbtbl[bucket].lock);
465}
466
467void dlm_put_rsb(struct dlm_rsb *r)
468{
469 put_rsb(r);
470}
471
472/* See comment for unhold_lkb */
473
474static void unhold_rsb(struct dlm_rsb *r)
475{
476 int rv;
477 rv = kref_put(&r->res_ref, toss_rsb);
478 DLM_ASSERT(!rv, dlm_print_rsb(r););
479}
480
481static void kill_rsb(struct kref *kref)
482{
483 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
484
485 /* All work is done after the return from kref_put() so we
486 can release the write_lock before the remove and free. */
487
488 DLM_ASSERT(list_empty(&r->res_lookup),);
489 DLM_ASSERT(list_empty(&r->res_grantqueue),);
490 DLM_ASSERT(list_empty(&r->res_convertqueue),);
491 DLM_ASSERT(list_empty(&r->res_waitqueue),);
492 DLM_ASSERT(list_empty(&r->res_root_list),);
493 DLM_ASSERT(list_empty(&r->res_recover_list),);
494}
495
496/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497 The rsb must exist as long as any lkb's for it do. */
498
499static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
500{
501 hold_rsb(r);
502 lkb->lkb_resource = r;
503}
504
505static void detach_lkb(struct dlm_lkb *lkb)
506{
507 if (lkb->lkb_resource) {
508 put_rsb(lkb->lkb_resource);
509 lkb->lkb_resource = NULL;
510 }
511}
512
513static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
514{
515 struct dlm_lkb *lkb, *tmp;
516 uint32_t lkid = 0;
517 uint16_t bucket;
518
519 lkb = allocate_lkb(ls);
520 if (!lkb)
521 return -ENOMEM;
522
523 lkb->lkb_nodeid = -1;
524 lkb->lkb_grmode = DLM_LOCK_IV;
525 kref_init(&lkb->lkb_ref);
526
527 get_random_bytes(&bucket, sizeof(bucket));
528 bucket &= (ls->ls_lkbtbl_size - 1);
529
530 write_lock(&ls->ls_lkbtbl[bucket].lock);
531
532 /* counter can roll over so we must verify lkid is not in use */
533
534 while (lkid == 0) {
535 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
536
537 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
538 lkb_idtbl_list) {
539 if (tmp->lkb_id != lkid)
540 continue;
541 lkid = 0;
542 break;
543 }
544 }
545
546 lkb->lkb_id = lkid;
547 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548 write_unlock(&ls->ls_lkbtbl[bucket].lock);
549
550 *lkb_ret = lkb;
551 return 0;
552}
553
554static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
555{
556 uint16_t bucket = lkid & 0xFFFF;
557 struct dlm_lkb *lkb;
558
559 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560 if (lkb->lkb_id == lkid)
561 return lkb;
562 }
563 return NULL;
564}
565
566static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
567{
568 struct dlm_lkb *lkb;
569 uint16_t bucket = lkid & 0xFFFF;
570
571 if (bucket >= ls->ls_lkbtbl_size)
572 return -EBADSLT;
573
574 read_lock(&ls->ls_lkbtbl[bucket].lock);
575 lkb = __find_lkb(ls, lkid);
576 if (lkb)
577 kref_get(&lkb->lkb_ref);
578 read_unlock(&ls->ls_lkbtbl[bucket].lock);
579
580 *lkb_ret = lkb;
581 return lkb ? 0 : -ENOENT;
582}
583
584static void kill_lkb(struct kref *kref)
585{
586 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
587
588 /* All work is done after the return from kref_put() so we
589 can release the write_lock before the detach_lkb */
590
591 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
592}
593
594static int put_lkb(struct dlm_lkb *lkb)
595{
596 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
597 uint16_t bucket = lkb->lkb_id & 0xFFFF;
598
599 write_lock(&ls->ls_lkbtbl[bucket].lock);
600 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
601 list_del(&lkb->lkb_idtbl_list);
602 write_unlock(&ls->ls_lkbtbl[bucket].lock);
603
604 detach_lkb(lkb);
605
606 /* for local/process lkbs, lvbptr points to caller's lksb */
607 if (lkb->lkb_lvbptr && is_master_copy(lkb))
608 free_lvb(lkb->lkb_lvbptr);
609 if (lkb->lkb_range)
610 free_range(lkb->lkb_range);
611 free_lkb(lkb);
612 return 1;
613 } else {
614 write_unlock(&ls->ls_lkbtbl[bucket].lock);
615 return 0;
616 }
617}
618
619int dlm_put_lkb(struct dlm_lkb *lkb)
620{
621 return put_lkb(lkb);
622}
623
624/* This is only called to add a reference when the code already holds
625 a valid reference to the lkb, so there's no need for locking. */
626
627static inline void hold_lkb(struct dlm_lkb *lkb)
628{
629 kref_get(&lkb->lkb_ref);
630}
631
632/* This is called when we need to remove a reference and are certain
633 it's not the last ref. e.g. del_lkb is always called between a
634 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
635 put_lkb would work fine, but would involve unnecessary locking */
636
637static inline void unhold_lkb(struct dlm_lkb *lkb)
638{
639 int rv;
640 rv = kref_put(&lkb->lkb_ref, kill_lkb);
641 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
642}
643
644static void lkb_add_ordered(struct list_head *new, struct list_head *head,
645 int mode)
646{
647 struct dlm_lkb *lkb = NULL;
648
649 list_for_each_entry(lkb, head, lkb_statequeue)
650 if (lkb->lkb_rqmode < mode)
651 break;
652
653 if (!lkb)
654 list_add_tail(new, head);
655 else
656 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
657}
658
659/* add/remove lkb to rsb's grant/convert/wait queue */
660
661static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
662{
663 kref_get(&lkb->lkb_ref);
664
665 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666
667 lkb->lkb_status = status;
668
669 switch (status) {
670 case DLM_LKSTS_WAITING:
671 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
672 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
673 else
674 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
675 break;
676 case DLM_LKSTS_GRANTED:
677 /* convention says granted locks kept in order of grmode */
678 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
679 lkb->lkb_grmode);
680 break;
681 case DLM_LKSTS_CONVERT:
682 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
683 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
684 else
685 list_add_tail(&lkb->lkb_statequeue,
686 &r->res_convertqueue);
687 break;
688 default:
689 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
690 }
691}
692
693static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
694{
695 lkb->lkb_status = 0;
696 list_del(&lkb->lkb_statequeue);
697 unhold_lkb(lkb);
698}
699
700static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
701{
702 hold_lkb(lkb);
703 del_lkb(r, lkb);
704 add_lkb(r, lkb, sts);
705 unhold_lkb(lkb);
706}
707
708/* add/remove lkb from global waiters list of lkb's waiting for
709 a reply from a remote node */
710
711static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
712{
713 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
714
715 down(&ls->ls_waiters_sem);
716 if (lkb->lkb_wait_type) {
717 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
718 goto out;
719 }
720 lkb->lkb_wait_type = mstype;
721 kref_get(&lkb->lkb_ref);
722 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
723 out:
724 up(&ls->ls_waiters_sem);
725}
726
727static int _remove_from_waiters(struct dlm_lkb *lkb)
728{
729 int error = 0;
730
731 if (!lkb->lkb_wait_type) {
732 log_print("remove_from_waiters error");
733 error = -EINVAL;
734 goto out;
735 }
736 lkb->lkb_wait_type = 0;
737 list_del(&lkb->lkb_wait_reply);
738 unhold_lkb(lkb);
739 out:
740 return error;
741}
742
743static int remove_from_waiters(struct dlm_lkb *lkb)
744{
745 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
746 int error;
747
748 down(&ls->ls_waiters_sem);
749 error = _remove_from_waiters(lkb);
750 up(&ls->ls_waiters_sem);
751 return error;
752}
753
754static void dir_remove(struct dlm_rsb *r)
755{
756 int to_nodeid;
757
758 if (dlm_no_directory(r->res_ls))
759 return;
760
761 to_nodeid = dlm_dir_nodeid(r);
762 if (to_nodeid != dlm_our_nodeid())
763 send_remove(r);
764 else
765 dlm_dir_remove_entry(r->res_ls, to_nodeid,
766 r->res_name, r->res_length);
767}
768
769/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
770 found since they are in order of newest to oldest? */
771
772static int shrink_bucket(struct dlm_ls *ls, int b)
773{
774 struct dlm_rsb *r;
775 int count = 0, found;
776
777 for (;;) {
778 found = FALSE;
779 write_lock(&ls->ls_rsbtbl[b].lock);
780 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
781 res_hashchain) {
782 if (!time_after_eq(jiffies, r->res_toss_time +
783 dlm_config.toss_secs * HZ))
784 continue;
785 found = TRUE;
786 break;
787 }
788
789 if (!found) {
790 write_unlock(&ls->ls_rsbtbl[b].lock);
791 break;
792 }
793
794 if (kref_put(&r->res_ref, kill_rsb)) {
795 list_del(&r->res_hashchain);
796 write_unlock(&ls->ls_rsbtbl[b].lock);
797
798 if (is_master(r))
799 dir_remove(r);
800 free_rsb(r);
801 count++;
802 } else {
803 write_unlock(&ls->ls_rsbtbl[b].lock);
804 log_error(ls, "tossed rsb in use %s", r->res_name);
805 }
806 }
807
808 return count;
809}
810
811void dlm_scan_rsbs(struct dlm_ls *ls)
812{
813 int i;
814
815 if (dlm_locking_stopped(ls))
816 return;
817
818 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
819 shrink_bucket(ls, i);
820 cond_resched();
821 }
822}
823
824/* lkb is master or local copy */
825
826static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
827{
828 int b, len = r->res_ls->ls_lvblen;
829
830 /* b=1 lvb returned to caller
831 b=0 lvb written to rsb or invalidated
832 b=-1 do nothing */
833
834 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
835
836 if (b == 1) {
837 if (!lkb->lkb_lvbptr)
838 return;
839
840 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
841 return;
842
843 if (!r->res_lvbptr)
844 return;
845
846 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
847 lkb->lkb_lvbseq = r->res_lvbseq;
848
849 } else if (b == 0) {
850 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
851 rsb_set_flag(r, RSB_VALNOTVALID);
852 return;
853 }
854
855 if (!lkb->lkb_lvbptr)
856 return;
857
858 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
859 return;
860
861 if (!r->res_lvbptr)
862 r->res_lvbptr = allocate_lvb(r->res_ls);
863
864 if (!r->res_lvbptr)
865 return;
866
867 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
868 r->res_lvbseq++;
869 lkb->lkb_lvbseq = r->res_lvbseq;
870 rsb_clear_flag(r, RSB_VALNOTVALID);
871 }
872
873 if (rsb_flag(r, RSB_VALNOTVALID))
874 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
875}
876
877static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
878{
879 if (lkb->lkb_grmode < DLM_LOCK_PW)
880 return;
881
882 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
883 rsb_set_flag(r, RSB_VALNOTVALID);
884 return;
885 }
886
887 if (!lkb->lkb_lvbptr)
888 return;
889
890 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
891 return;
892
893 if (!r->res_lvbptr)
894 r->res_lvbptr = allocate_lvb(r->res_ls);
895
896 if (!r->res_lvbptr)
897 return;
898
899 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
900 r->res_lvbseq++;
901 rsb_clear_flag(r, RSB_VALNOTVALID);
902}
903
904/* lkb is process copy (pc) */
905
906static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
907 struct dlm_message *ms)
908{
909 int b;
910
911 if (!lkb->lkb_lvbptr)
912 return;
913
914 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
915 return;
916
917 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
918 if (b == 1) {
919 int len = receive_extralen(ms);
920 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
921 lkb->lkb_lvbseq = ms->m_lvbseq;
922 }
923}
924
925/* Manipulate lkb's on rsb's convert/granted/waiting queues
926 remove_lock -- used for unlock, removes lkb from granted
927 revert_lock -- used for cancel, moves lkb from convert to granted
928 grant_lock -- used for request and convert, adds lkb to granted or
929 moves lkb from convert or waiting to granted
930
931 Each of these is used for master or local copy lkb's. There is
932 also a _pc() variation used to make the corresponding change on
933 a process copy (pc) lkb. */
934
935static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
936{
937 del_lkb(r, lkb);
938 lkb->lkb_grmode = DLM_LOCK_IV;
939 /* this unhold undoes the original ref from create_lkb()
940 so this leads to the lkb being freed */
941 unhold_lkb(lkb);
942}
943
944static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
945{
946 set_lvb_unlock(r, lkb);
947 _remove_lock(r, lkb);
948}
949
950static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
951{
952 _remove_lock(r, lkb);
953}
954
955static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
956{
957 lkb->lkb_rqmode = DLM_LOCK_IV;
958
959 switch (lkb->lkb_status) {
960 case DLM_LKSTS_CONVERT:
961 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
962 break;
963 case DLM_LKSTS_WAITING:
964 del_lkb(r, lkb);
965 lkb->lkb_grmode = DLM_LOCK_IV;
966 /* this unhold undoes the original ref from create_lkb()
967 so this leads to the lkb being freed */
968 unhold_lkb(lkb);
969 break;
970 default:
971 log_print("invalid status for revert %d", lkb->lkb_status);
972 }
973}
974
975static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
976{
977 revert_lock(r, lkb);
978}
979
980static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
981{
982 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
983 lkb->lkb_grmode = lkb->lkb_rqmode;
984 if (lkb->lkb_status)
985 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
986 else
987 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
988 }
989
990 lkb->lkb_rqmode = DLM_LOCK_IV;
991
992 if (lkb->lkb_range) {
993 lkb->lkb_range[GR_RANGE_START] = lkb->lkb_range[RQ_RANGE_START];
994 lkb->lkb_range[GR_RANGE_END] = lkb->lkb_range[RQ_RANGE_END];
995 }
996}
997
998static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
999{
1000 set_lvb_lock(r, lkb);
1001 _grant_lock(r, lkb);
1002 lkb->lkb_highbast = 0;
1003}
1004
1005static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1006 struct dlm_message *ms)
1007{
1008 set_lvb_lock_pc(r, lkb, ms);
1009 _grant_lock(r, lkb);
1010}
1011
1012/* called by grant_pending_locks() which means an async grant message must
1013 be sent to the requesting node in addition to granting the lock if the
1014 lkb belongs to a remote node. */
1015
1016static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1017{
1018 grant_lock(r, lkb);
1019 if (is_master_copy(lkb))
1020 send_grant(r, lkb);
1021 else
1022 queue_cast(r, lkb, 0);
1023}
1024
1025static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1026{
1027 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1028 lkb_statequeue);
1029 if (lkb->lkb_id == first->lkb_id)
1030 return TRUE;
1031
1032 return FALSE;
1033}
1034
1035/* Return 1 if the locks' ranges overlap. If the lkb has no range then it is
1036 assumed to cover 0-ffffffff.ffffffff */
1037
1038static inline int ranges_overlap(struct dlm_lkb *lkb1, struct dlm_lkb *lkb2)
1039{
1040 if (!lkb1->lkb_range || !lkb2->lkb_range)
1041 return TRUE;
1042
1043 if (lkb1->lkb_range[RQ_RANGE_END] < lkb2->lkb_range[GR_RANGE_START] ||
1044 lkb1->lkb_range[RQ_RANGE_START] > lkb2->lkb_range[GR_RANGE_END])
1045 return FALSE;
1046
1047 return TRUE;
1048}
1049
1050/* Check if the given lkb conflicts with another lkb on the queue. */
1051
1052static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1053{
1054 struct dlm_lkb *this;
1055
1056 list_for_each_entry(this, head, lkb_statequeue) {
1057 if (this == lkb)
1058 continue;
1059 if (ranges_overlap(lkb, this) && !modes_compat(this, lkb))
1060 return TRUE;
1061 }
1062 return FALSE;
1063}
1064
1065/*
1066 * "A conversion deadlock arises with a pair of lock requests in the converting
1067 * queue for one resource. The granted mode of each lock blocks the requested
1068 * mode of the other lock."
1069 *
1070 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1071 * convert queue from being granted, then demote lkb (set grmode to NL).
1072 * This second form requires that we check for conv-deadlk even when
1073 * now == 0 in _can_be_granted().
1074 *
1075 * Example:
1076 * Granted Queue: empty
1077 * Convert Queue: NL->EX (first lock)
1078 * PR->EX (second lock)
1079 *
1080 * The first lock can't be granted because of the granted mode of the second
1081 * lock and the second lock can't be granted because it's not first in the
1082 * list. We demote the granted mode of the second lock (the lkb passed to this
1083 * function).
1084 *
1085 * After the resolution, the "grant pending" function needs to go back and try
1086 * to grant locks on the convert queue again since the first lock can now be
1087 * granted.
1088 */
1089
1090static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1091{
1092 struct dlm_lkb *this, *first = NULL, *self = NULL;
1093
1094 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1095 if (!first)
1096 first = this;
1097 if (this == lkb) {
1098 self = lkb;
1099 continue;
1100 }
1101
1102 if (!ranges_overlap(lkb, this))
1103 continue;
1104
1105 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1106 return TRUE;
1107 }
1108
1109 /* if lkb is on the convert queue and is preventing the first
1110 from being granted, then there's deadlock and we demote lkb.
1111 multiple converting locks may need to do this before the first
1112 converting lock can be granted. */
1113
1114 if (self && self != first) {
1115 if (!modes_compat(lkb, first) &&
1116 !queue_conflict(&rsb->res_grantqueue, first))
1117 return TRUE;
1118 }
1119
1120 return FALSE;
1121}
1122
1123/*
1124 * Return 1 if the lock can be granted, 0 otherwise.
1125 * Also detect and resolve conversion deadlocks.
1126 *
1127 * lkb is the lock to be granted
1128 *
1129 * now is 1 if the function is being called in the context of the
1130 * immediate request, it is 0 if called later, after the lock has been
1131 * queued.
1132 *
1133 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1134 */
1135
1136static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1137{
1138 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1139
1140 /*
1141 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1142 * a new request for a NL mode lock being blocked.
1143 *
1144 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1145 * request, then it would be granted. In essence, the use of this flag
1146 * tells the Lock Manager to expedite theis request by not considering
1147 * what may be in the CONVERTING or WAITING queues... As of this
1148 * writing, the EXPEDITE flag can be used only with new requests for NL
1149 * mode locks. This flag is not valid for conversion requests.
1150 *
1151 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1152 * conversion or used with a non-NL requested mode. We also know an
1153 * EXPEDITE request is always granted immediately, so now must always
1154 * be 1. The full condition to grant an expedite request: (now &&
1155 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1156 * therefore be shortened to just checking the flag.
1157 */
1158
1159 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1160 return TRUE;
1161
1162 /*
1163 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1164 * added to the remaining conditions.
1165 */
1166
1167 if (queue_conflict(&r->res_grantqueue, lkb))
1168 goto out;
1169
1170 /*
1171 * 6-3: By default, a conversion request is immediately granted if the
1172 * requested mode is compatible with the modes of all other granted
1173 * locks
1174 */
1175
1176 if (queue_conflict(&r->res_convertqueue, lkb))
1177 goto out;
1178
1179 /*
1180 * 6-5: But the default algorithm for deciding whether to grant or
1181 * queue conversion requests does not by itself guarantee that such
1182 * requests are serviced on a "first come first serve" basis. This, in
1183 * turn, can lead to a phenomenon known as "indefinate postponement".
1184 *
1185 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1186 * the system service employed to request a lock conversion. This flag
1187 * forces certain conversion requests to be queued, even if they are
1188 * compatible with the granted modes of other locks on the same
1189 * resource. Thus, the use of this flag results in conversion requests
1190 * being ordered on a "first come first servce" basis.
1191 *
1192 * DCT: This condition is all about new conversions being able to occur
1193 * "in place" while the lock remains on the granted queue (assuming
1194 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1195 * doesn't _have_ to go onto the convert queue where it's processed in
1196 * order. The "now" variable is necessary to distinguish converts
1197 * being received and processed for the first time now, because once a
1198 * convert is moved to the conversion queue the condition below applies
1199 * requiring fifo granting.
1200 */
1201
1202 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1203 return TRUE;
1204
1205 /*
1206 * When using range locks the NOORDER flag is set to avoid the standard
1207 * vms rules on grant order.
1208 */
1209
1210 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1211 return TRUE;
1212
1213 /*
1214 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1215 * granted until all other conversion requests ahead of it are granted
1216 * and/or canceled.
1217 */
1218
1219 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1220 return TRUE;
1221
1222 /*
1223 * 6-4: By default, a new request is immediately granted only if all
1224 * three of the following conditions are satisfied when the request is
1225 * issued:
1226 * - The queue of ungranted conversion requests for the resource is
1227 * empty.
1228 * - The queue of ungranted new requests for the resource is empty.
1229 * - The mode of the new request is compatible with the most
1230 * restrictive mode of all granted locks on the resource.
1231 */
1232
1233 if (now && !conv && list_empty(&r->res_convertqueue) &&
1234 list_empty(&r->res_waitqueue))
1235 return TRUE;
1236
1237 /*
1238 * 6-4: Once a lock request is in the queue of ungranted new requests,
1239 * it cannot be granted until the queue of ungranted conversion
1240 * requests is empty, all ungranted new requests ahead of it are
1241 * granted and/or canceled, and it is compatible with the granted mode
1242 * of the most restrictive lock granted on the resource.
1243 */
1244
1245 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1246 first_in_list(lkb, &r->res_waitqueue))
1247 return TRUE;
1248
1249 out:
1250 /*
1251 * The following, enabled by CONVDEADLK, departs from VMS.
1252 */
1253
1254 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1255 conversion_deadlock_detect(r, lkb)) {
1256 lkb->lkb_grmode = DLM_LOCK_NL;
1257 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1258 }
1259
1260 return FALSE;
1261}
1262
1263/*
1264 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1265 * simple way to provide a big optimization to applications that can use them.
1266 */
1267
1268static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1269{
1270 uint32_t flags = lkb->lkb_exflags;
1271 int rv;
1272 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1273
1274 rv = _can_be_granted(r, lkb, now);
1275 if (rv)
1276 goto out;
1277
1278 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1279 goto out;
1280
1281 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1282 alt = DLM_LOCK_PR;
1283 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1284 alt = DLM_LOCK_CW;
1285
1286 if (alt) {
1287 lkb->lkb_rqmode = alt;
1288 rv = _can_be_granted(r, lkb, now);
1289 if (rv)
1290 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1291 else
1292 lkb->lkb_rqmode = rqmode;
1293 }
1294 out:
1295 return rv;
1296}
1297
1298static int grant_pending_convert(struct dlm_rsb *r, int high)
1299{
1300 struct dlm_lkb *lkb, *s;
1301 int hi, demoted, quit, grant_restart, demote_restart;
1302
1303 quit = 0;
1304 restart:
1305 grant_restart = 0;
1306 demote_restart = 0;
1307 hi = DLM_LOCK_IV;
1308
1309 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1310 demoted = is_demoted(lkb);
1311 if (can_be_granted(r, lkb, FALSE)) {
1312 grant_lock_pending(r, lkb);
1313 grant_restart = 1;
1314 } else {
1315 hi = max_t(int, lkb->lkb_rqmode, hi);
1316 if (!demoted && is_demoted(lkb))
1317 demote_restart = 1;
1318 }
1319 }
1320
1321 if (grant_restart)
1322 goto restart;
1323 if (demote_restart && !quit) {
1324 quit = 1;
1325 goto restart;
1326 }
1327
1328 return max_t(int, high, hi);
1329}
1330
1331static int grant_pending_wait(struct dlm_rsb *r, int high)
1332{
1333 struct dlm_lkb *lkb, *s;
1334
1335 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1336 if (can_be_granted(r, lkb, FALSE))
1337 grant_lock_pending(r, lkb);
1338 else
1339 high = max_t(int, lkb->lkb_rqmode, high);
1340 }
1341
1342 return high;
1343}
1344
1345static void grant_pending_locks(struct dlm_rsb *r)
1346{
1347 struct dlm_lkb *lkb, *s;
1348 int high = DLM_LOCK_IV;
1349
1350 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1351
1352 high = grant_pending_convert(r, high);
1353 high = grant_pending_wait(r, high);
1354
1355 if (high == DLM_LOCK_IV)
1356 return;
1357
1358 /*
1359 * If there are locks left on the wait/convert queue then send blocking
1360 * ASTs to granted locks based on the largest requested mode (high)
1361 * found above. This can generate spurious blocking ASTs for range
1362 * locks. FIXME: highbast < high comparison not valid for PR/CW.
1363 */
1364
1365 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1366 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1367 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1368 queue_bast(r, lkb, high);
1369 lkb->lkb_highbast = high;
1370 }
1371 }
1372}
1373
1374static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1375 struct dlm_lkb *lkb)
1376{
1377 struct dlm_lkb *gr;
1378
1379 list_for_each_entry(gr, head, lkb_statequeue) {
1380 if (gr->lkb_bastaddr &&
1381 gr->lkb_highbast < lkb->lkb_rqmode &&
1382 ranges_overlap(lkb, gr) && !modes_compat(gr, lkb)) {
1383 queue_bast(r, gr, lkb->lkb_rqmode);
1384 gr->lkb_highbast = lkb->lkb_rqmode;
1385 }
1386 }
1387}
1388
1389static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1390{
1391 send_bast_queue(r, &r->res_grantqueue, lkb);
1392}
1393
1394static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1395{
1396 send_bast_queue(r, &r->res_grantqueue, lkb);
1397 send_bast_queue(r, &r->res_convertqueue, lkb);
1398}
1399
1400/* set_master(r, lkb) -- set the master nodeid of a resource
1401
1402 The purpose of this function is to set the nodeid field in the given
1403 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1404 known, it can just be copied to the lkb and the function will return
1405 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1406 before it can be copied to the lkb.
1407
1408 When the rsb nodeid is being looked up remotely, the initial lkb
1409 causing the lookup is kept on the ls_waiters list waiting for the
1410 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1411 on the rsb's res_lookup list until the master is verified.
1412
1413 Return values:
1414 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1415 1: the rsb master is not available and the lkb has been placed on
1416 a wait queue
1417*/
1418
1419static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1420{
1421 struct dlm_ls *ls = r->res_ls;
1422 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1423
1424 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1425 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1426 r->res_first_lkid = lkb->lkb_id;
1427 lkb->lkb_nodeid = r->res_nodeid;
1428 return 0;
1429 }
1430
1431 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1432 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1433 return 1;
1434 }
1435
1436 if (r->res_nodeid == 0) {
1437 lkb->lkb_nodeid = 0;
1438 return 0;
1439 }
1440
1441 if (r->res_nodeid > 0) {
1442 lkb->lkb_nodeid = r->res_nodeid;
1443 return 0;
1444 }
1445
1446 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1447
1448 dir_nodeid = dlm_dir_nodeid(r);
1449
1450 if (dir_nodeid != our_nodeid) {
1451 r->res_first_lkid = lkb->lkb_id;
1452 send_lookup(r, lkb);
1453 return 1;
1454 }
1455
1456 for (;;) {
1457 /* It's possible for dlm_scand to remove an old rsb for
1458 this same resource from the toss list, us to create
1459 a new one, look up the master locally, and find it
1460 already exists just before dlm_scand does the
1461 dir_remove() on the previous rsb. */
1462
1463 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1464 r->res_length, &ret_nodeid);
1465 if (!error)
1466 break;
1467 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1468 schedule();
1469 }
1470
1471 if (ret_nodeid == our_nodeid) {
1472 r->res_first_lkid = 0;
1473 r->res_nodeid = 0;
1474 lkb->lkb_nodeid = 0;
1475 } else {
1476 r->res_first_lkid = lkb->lkb_id;
1477 r->res_nodeid = ret_nodeid;
1478 lkb->lkb_nodeid = ret_nodeid;
1479 }
1480 return 0;
1481}
1482
1483static void process_lookup_list(struct dlm_rsb *r)
1484{
1485 struct dlm_lkb *lkb, *safe;
1486
1487 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1488 list_del(&lkb->lkb_rsb_lookup);
1489 _request_lock(r, lkb);
1490 schedule();
1491 }
1492}
1493
1494/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1495
1496static void confirm_master(struct dlm_rsb *r, int error)
1497{
1498 struct dlm_lkb *lkb;
1499
1500 if (!r->res_first_lkid)
1501 return;
1502
1503 switch (error) {
1504 case 0:
1505 case -EINPROGRESS:
1506 r->res_first_lkid = 0;
1507 process_lookup_list(r);
1508 break;
1509
1510 case -EAGAIN:
1511 /* the remote master didn't queue our NOQUEUE request;
1512 make a waiting lkb the first_lkid */
1513
1514 r->res_first_lkid = 0;
1515
1516 if (!list_empty(&r->res_lookup)) {
1517 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1518 lkb_rsb_lookup);
1519 list_del(&lkb->lkb_rsb_lookup);
1520 r->res_first_lkid = lkb->lkb_id;
1521 _request_lock(r, lkb);
1522 } else
1523 r->res_nodeid = -1;
1524 break;
1525
1526 default:
1527 log_error(r->res_ls, "confirm_master unknown error %d", error);
1528 }
1529}
1530
1531static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1532 int namelen, uint32_t parent_lkid, void *ast,
1533 void *astarg, void *bast, struct dlm_range *range,
1534 struct dlm_args *args)
1535{
1536 int rv = -EINVAL;
1537
1538 /* check for invalid arg usage */
1539
1540 if (mode < 0 || mode > DLM_LOCK_EX)
1541 goto out;
1542
1543 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1544 goto out;
1545
1546 if (flags & DLM_LKF_CANCEL)
1547 goto out;
1548
1549 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1550 goto out;
1551
1552 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1553 goto out;
1554
1555 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1556 goto out;
1557
1558 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1559 goto out;
1560
1561 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1562 goto out;
1563
1564 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1565 goto out;
1566
1567 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1568 goto out;
1569
1570 if (!ast || !lksb)
1571 goto out;
1572
1573 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1574 goto out;
1575
1576 /* parent/child locks not yet supported */
1577 if (parent_lkid)
1578 goto out;
1579
1580 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1581 goto out;
1582
1583 /* these args will be copied to the lkb in validate_lock_args,
1584 it cannot be done now because when converting locks, fields in
1585 an active lkb cannot be modified before locking the rsb */
1586
1587 args->flags = flags;
1588 args->astaddr = ast;
1589 args->astparam = (long) astarg;
1590 args->bastaddr = bast;
1591 args->mode = mode;
1592 args->lksb = lksb;
1593 args->range = range;
1594 rv = 0;
1595 out:
1596 return rv;
1597}
1598
1599static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1600{
1601 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1602 DLM_LKF_FORCEUNLOCK))
1603 return -EINVAL;
1604
1605 args->flags = flags;
1606 args->astparam = (long) astarg;
1607 return 0;
1608}
1609
1610static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1611 struct dlm_args *args)
1612{
1613 int rv = -EINVAL;
1614
1615 if (args->flags & DLM_LKF_CONVERT) {
1616 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1617 goto out;
1618
1619 if (args->flags & DLM_LKF_QUECVT &&
1620 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1621 goto out;
1622
1623 rv = -EBUSY;
1624 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1625 goto out;
1626
1627 if (lkb->lkb_wait_type)
1628 goto out;
1629 }
1630
1631 lkb->lkb_exflags = args->flags;
1632 lkb->lkb_sbflags = 0;
1633 lkb->lkb_astaddr = args->astaddr;
1634 lkb->lkb_astparam = args->astparam;
1635 lkb->lkb_bastaddr = args->bastaddr;
1636 lkb->lkb_rqmode = args->mode;
1637 lkb->lkb_lksb = args->lksb;
1638 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1639 lkb->lkb_ownpid = (int) current->pid;
1640
1641 rv = 0;
1642 if (!args->range)
1643 goto out;
1644
1645 if (!lkb->lkb_range) {
1646 rv = -ENOMEM;
1647 lkb->lkb_range = allocate_range(ls);
1648 if (!lkb->lkb_range)
1649 goto out;
1650 /* This is needed for conversions that contain ranges
1651 where the original lock didn't but it's harmless for
1652 new locks too. */
1653 lkb->lkb_range[GR_RANGE_START] = 0LL;
1654 lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
1655 }
1656
1657 lkb->lkb_range[RQ_RANGE_START] = args->range->ra_start;
1658 lkb->lkb_range[RQ_RANGE_END] = args->range->ra_end;
1659 lkb->lkb_flags |= DLM_IFL_RANGE;
1660 rv = 0;
1661 out:
1662 return rv;
1663}
1664
1665static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1666{
1667 int rv = -EINVAL;
1668
1669 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1670 goto out;
1671
1672 if (args->flags & DLM_LKF_FORCEUNLOCK)
1673 goto out_ok;
1674
1675 if (args->flags & DLM_LKF_CANCEL &&
1676 lkb->lkb_status == DLM_LKSTS_GRANTED)
1677 goto out;
1678
1679 if (!(args->flags & DLM_LKF_CANCEL) &&
1680 lkb->lkb_status != DLM_LKSTS_GRANTED)
1681 goto out;
1682
1683 rv = -EBUSY;
1684 if (lkb->lkb_wait_type)
1685 goto out;
1686
1687 out_ok:
1688 lkb->lkb_exflags = args->flags;
1689 lkb->lkb_sbflags = 0;
1690 lkb->lkb_astparam = args->astparam;
1691
1692 rv = 0;
1693 out:
1694 return rv;
1695}
1696
1697/*
1698 * Four stage 4 varieties:
1699 * do_request(), do_convert(), do_unlock(), do_cancel()
1700 * These are called on the master node for the given lock and
1701 * from the central locking logic.
1702 */
1703
1704static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1705{
1706 int error = 0;
1707
1708 if (can_be_granted(r, lkb, TRUE)) {
1709 grant_lock(r, lkb);
1710 queue_cast(r, lkb, 0);
1711 goto out;
1712 }
1713
1714 if (can_be_queued(lkb)) {
1715 error = -EINPROGRESS;
1716 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1717 send_blocking_asts(r, lkb);
1718 goto out;
1719 }
1720
1721 error = -EAGAIN;
1722 if (force_blocking_asts(lkb))
1723 send_blocking_asts_all(r, lkb);
1724 queue_cast(r, lkb, -EAGAIN);
1725
1726 out:
1727 return error;
1728}
1729
1730static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1731{
1732 int error = 0;
1733
1734 /* changing an existing lock may allow others to be granted */
1735
1736 if (can_be_granted(r, lkb, TRUE)) {
1737 grant_lock(r, lkb);
1738 queue_cast(r, lkb, 0);
1739 grant_pending_locks(r);
1740 goto out;
1741 }
1742
1743 if (can_be_queued(lkb)) {
1744 if (is_demoted(lkb))
1745 grant_pending_locks(r);
1746 error = -EINPROGRESS;
1747 del_lkb(r, lkb);
1748 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1749 send_blocking_asts(r, lkb);
1750 goto out;
1751 }
1752
1753 error = -EAGAIN;
1754 if (force_blocking_asts(lkb))
1755 send_blocking_asts_all(r, lkb);
1756 queue_cast(r, lkb, -EAGAIN);
1757
1758 out:
1759 return error;
1760}
1761
1762static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1763{
1764 remove_lock(r, lkb);
1765 queue_cast(r, lkb, -DLM_EUNLOCK);
1766 grant_pending_locks(r);
1767 return -DLM_EUNLOCK;
1768}
1769
1770static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1771{
1772 revert_lock(r, lkb);
1773 queue_cast(r, lkb, -DLM_ECANCEL);
1774 grant_pending_locks(r);
1775 return -DLM_ECANCEL;
1776}
1777
1778/*
1779 * Four stage 3 varieties:
1780 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1781 */
1782
1783/* add a new lkb to a possibly new rsb, called by requesting process */
1784
1785static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1786{
1787 int error;
1788
1789 /* set_master: sets lkb nodeid from r */
1790
1791 error = set_master(r, lkb);
1792 if (error < 0)
1793 goto out;
1794 if (error) {
1795 error = 0;
1796 goto out;
1797 }
1798
1799 if (is_remote(r))
1800 /* receive_request() calls do_request() on remote node */
1801 error = send_request(r, lkb);
1802 else
1803 error = do_request(r, lkb);
1804 out:
1805 return error;
1806}
1807
1808/* change some property of an existing lkb, e.g. mode, range */
1809
1810static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1811{
1812 int error;
1813
1814 if (is_remote(r))
1815 /* receive_convert() calls do_convert() on remote node */
1816 error = send_convert(r, lkb);
1817 else
1818 error = do_convert(r, lkb);
1819
1820 return error;
1821}
1822
1823/* remove an existing lkb from the granted queue */
1824
1825static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1826{
1827 int error;
1828
1829 if (is_remote(r))
1830 /* receive_unlock() calls do_unlock() on remote node */
1831 error = send_unlock(r, lkb);
1832 else
1833 error = do_unlock(r, lkb);
1834
1835 return error;
1836}
1837
1838/* remove an existing lkb from the convert or wait queue */
1839
1840static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1841{
1842 int error;
1843
1844 if (is_remote(r))
1845 /* receive_cancel() calls do_cancel() on remote node */
1846 error = send_cancel(r, lkb);
1847 else
1848 error = do_cancel(r, lkb);
1849
1850 return error;
1851}
1852
1853/*
1854 * Four stage 2 varieties:
1855 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1856 */
1857
1858static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1859 int len, struct dlm_args *args)
1860{
1861 struct dlm_rsb *r;
1862 int error;
1863
1864 error = validate_lock_args(ls, lkb, args);
1865 if (error)
1866 goto out;
1867
1868 error = find_rsb(ls, name, len, R_CREATE, &r);
1869 if (error)
1870 goto out;
1871
1872 lock_rsb(r);
1873
1874 attach_lkb(r, lkb);
1875 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1876
1877 error = _request_lock(r, lkb);
1878
1879 unlock_rsb(r);
1880 put_rsb(r);
1881
1882 out:
1883 return error;
1884}
1885
1886static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1887 struct dlm_args *args)
1888{
1889 struct dlm_rsb *r;
1890 int error;
1891
1892 r = lkb->lkb_resource;
1893
1894 hold_rsb(r);
1895 lock_rsb(r);
1896
1897 error = validate_lock_args(ls, lkb, args);
1898 if (error)
1899 goto out;
1900
1901 error = _convert_lock(r, lkb);
1902 out:
1903 unlock_rsb(r);
1904 put_rsb(r);
1905 return error;
1906}
1907
1908static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1909 struct dlm_args *args)
1910{
1911 struct dlm_rsb *r;
1912 int error;
1913
1914 r = lkb->lkb_resource;
1915
1916 hold_rsb(r);
1917 lock_rsb(r);
1918
1919 error = validate_unlock_args(lkb, args);
1920 if (error)
1921 goto out;
1922
1923 error = _unlock_lock(r, lkb);
1924 out:
1925 unlock_rsb(r);
1926 put_rsb(r);
1927 return error;
1928}
1929
1930static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1931 struct dlm_args *args)
1932{
1933 struct dlm_rsb *r;
1934 int error;
1935
1936 r = lkb->lkb_resource;
1937
1938 hold_rsb(r);
1939 lock_rsb(r);
1940
1941 error = validate_unlock_args(lkb, args);
1942 if (error)
1943 goto out;
1944
1945 error = _cancel_lock(r, lkb);
1946 out:
1947 unlock_rsb(r);
1948 put_rsb(r);
1949 return error;
1950}
1951
1952/*
1953 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1954 */
1955
1956int dlm_lock(dlm_lockspace_t *lockspace,
1957 int mode,
1958 struct dlm_lksb *lksb,
1959 uint32_t flags,
1960 void *name,
1961 unsigned int namelen,
1962 uint32_t parent_lkid,
1963 void (*ast) (void *astarg),
1964 void *astarg,
1965 void (*bast) (void *astarg, int mode),
1966 struct dlm_range *range)
1967{
1968 struct dlm_ls *ls;
1969 struct dlm_lkb *lkb;
1970 struct dlm_args args;
1971 int error, convert = flags & DLM_LKF_CONVERT;
1972
1973 ls = dlm_find_lockspace_local(lockspace);
1974 if (!ls)
1975 return -EINVAL;
1976
1977 lock_recovery(ls);
1978
1979 if (convert)
1980 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1981 else
1982 error = create_lkb(ls, &lkb);
1983
1984 if (error)
1985 goto out;
1986
1987 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1988 astarg, bast, range, &args);
1989 if (error)
1990 goto out_put;
1991
1992 if (convert)
1993 error = convert_lock(ls, lkb, &args);
1994 else
1995 error = request_lock(ls, lkb, name, namelen, &args);
1996
1997 if (error == -EINPROGRESS)
1998 error = 0;
1999 out_put:
2000 if (convert || error)
2001 put_lkb(lkb);
2002 if (error == -EAGAIN)
2003 error = 0;
2004 out:
2005 unlock_recovery(ls);
2006 dlm_put_lockspace(ls);
2007 return error;
2008}
2009
2010int dlm_unlock(dlm_lockspace_t *lockspace,
2011 uint32_t lkid,
2012 uint32_t flags,
2013 struct dlm_lksb *lksb,
2014 void *astarg)
2015{
2016 struct dlm_ls *ls;
2017 struct dlm_lkb *lkb;
2018 struct dlm_args args;
2019 int error;
2020
2021 ls = dlm_find_lockspace_local(lockspace);
2022 if (!ls)
2023 return -EINVAL;
2024
2025 lock_recovery(ls);
2026
2027 error = find_lkb(ls, lkid, &lkb);
2028 if (error)
2029 goto out;
2030
2031 error = set_unlock_args(flags, astarg, &args);
2032 if (error)
2033 goto out_put;
2034
2035 if (flags & DLM_LKF_CANCEL)
2036 error = cancel_lock(ls, lkb, &args);
2037 else
2038 error = unlock_lock(ls, lkb, &args);
2039
2040 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2041 error = 0;
2042 out_put:
2043 put_lkb(lkb);
2044 out:
2045 unlock_recovery(ls);
2046 dlm_put_lockspace(ls);
2047 return error;
2048}
2049
2050/*
2051 * send/receive routines for remote operations and replies
2052 *
2053 * send_args
2054 * send_common
2055 * send_request receive_request
2056 * send_convert receive_convert
2057 * send_unlock receive_unlock
2058 * send_cancel receive_cancel
2059 * send_grant receive_grant
2060 * send_bast receive_bast
2061 * send_lookup receive_lookup
2062 * send_remove receive_remove
2063 *
2064 * send_common_reply
2065 * receive_request_reply send_request_reply
2066 * receive_convert_reply send_convert_reply
2067 * receive_unlock_reply send_unlock_reply
2068 * receive_cancel_reply send_cancel_reply
2069 * receive_lookup_reply send_lookup_reply
2070 */
2071
2072static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2073 int to_nodeid, int mstype,
2074 struct dlm_message **ms_ret,
2075 struct dlm_mhandle **mh_ret)
2076{
2077 struct dlm_message *ms;
2078 struct dlm_mhandle *mh;
2079 char *mb;
2080 int mb_len = sizeof(struct dlm_message);
2081
2082 switch (mstype) {
2083 case DLM_MSG_REQUEST:
2084 case DLM_MSG_LOOKUP:
2085 case DLM_MSG_REMOVE:
2086 mb_len += r->res_length;
2087 break;
2088 case DLM_MSG_CONVERT:
2089 case DLM_MSG_UNLOCK:
2090 case DLM_MSG_REQUEST_REPLY:
2091 case DLM_MSG_CONVERT_REPLY:
2092 case DLM_MSG_GRANT:
2093 if (lkb && lkb->lkb_lvbptr)
2094 mb_len += r->res_ls->ls_lvblen;
2095 break;
2096 }
2097
2098 /* get_buffer gives us a message handle (mh) that we need to
2099 pass into lowcomms_commit and a message buffer (mb) that we
2100 write our data into */
2101
2102 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2103 if (!mh)
2104 return -ENOBUFS;
2105
2106 memset(mb, 0, mb_len);
2107
2108 ms = (struct dlm_message *) mb;
2109
2110 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2111 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2112 ms->m_header.h_nodeid = dlm_our_nodeid();
2113 ms->m_header.h_length = mb_len;
2114 ms->m_header.h_cmd = DLM_MSG;
2115
2116 ms->m_type = mstype;
2117
2118 *mh_ret = mh;
2119 *ms_ret = ms;
2120 return 0;
2121}
2122
2123/* further lowcomms enhancements or alternate implementations may make
2124 the return value from this function useful at some point */
2125
2126static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2127{
2128 dlm_message_out(ms);
2129 dlm_lowcomms_commit_buffer(mh);
2130 return 0;
2131}
2132
2133static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2134 struct dlm_message *ms)
2135{
2136 ms->m_nodeid = lkb->lkb_nodeid;
2137 ms->m_pid = lkb->lkb_ownpid;
2138 ms->m_lkid = lkb->lkb_id;
2139 ms->m_remid = lkb->lkb_remid;
2140 ms->m_exflags = lkb->lkb_exflags;
2141 ms->m_sbflags = lkb->lkb_sbflags;
2142 ms->m_flags = lkb->lkb_flags;
2143 ms->m_lvbseq = lkb->lkb_lvbseq;
2144 ms->m_status = lkb->lkb_status;
2145 ms->m_grmode = lkb->lkb_grmode;
2146 ms->m_rqmode = lkb->lkb_rqmode;
2147 ms->m_hash = r->res_hash;
2148
2149 /* m_result and m_bastmode are set from function args,
2150 not from lkb fields */
2151
2152 if (lkb->lkb_bastaddr)
2153 ms->m_asts |= AST_BAST;
2154 if (lkb->lkb_astaddr)
2155 ms->m_asts |= AST_COMP;
2156
2157 if (lkb->lkb_range) {
2158 ms->m_range[0] = lkb->lkb_range[RQ_RANGE_START];
2159 ms->m_range[1] = lkb->lkb_range[RQ_RANGE_END];
2160 }
2161
2162 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2163 memcpy(ms->m_extra, r->res_name, r->res_length);
2164
2165 else if (lkb->lkb_lvbptr)
2166 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2167
2168}
2169
2170static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2171{
2172 struct dlm_message *ms;
2173 struct dlm_mhandle *mh;
2174 int to_nodeid, error;
2175
2176 add_to_waiters(lkb, mstype);
2177
2178 to_nodeid = r->res_nodeid;
2179
2180 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2181 if (error)
2182 goto fail;
2183
2184 send_args(r, lkb, ms);
2185
2186 error = send_message(mh, ms);
2187 if (error)
2188 goto fail;
2189 return 0;
2190
2191 fail:
2192 remove_from_waiters(lkb);
2193 return error;
2194}
2195
2196static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197{
2198 return send_common(r, lkb, DLM_MSG_REQUEST);
2199}
2200
2201static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2202{
2203 int error;
2204
2205 error = send_common(r, lkb, DLM_MSG_CONVERT);
2206
2207 /* down conversions go without a reply from the master */
2208 if (!error && down_conversion(lkb)) {
2209 remove_from_waiters(lkb);
2210 r->res_ls->ls_stub_ms.m_result = 0;
2211 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2212 }
2213
2214 return error;
2215}
2216
2217/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2218 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2219 that the master is still correct. */
2220
2221static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2222{
2223 return send_common(r, lkb, DLM_MSG_UNLOCK);
2224}
2225
2226static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2227{
2228 return send_common(r, lkb, DLM_MSG_CANCEL);
2229}
2230
2231static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2232{
2233 struct dlm_message *ms;
2234 struct dlm_mhandle *mh;
2235 int to_nodeid, error;
2236
2237 to_nodeid = lkb->lkb_nodeid;
2238
2239 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2240 if (error)
2241 goto out;
2242
2243 send_args(r, lkb, ms);
2244
2245 ms->m_result = 0;
2246
2247 error = send_message(mh, ms);
2248 out:
2249 return error;
2250}
2251
2252static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2253{
2254 struct dlm_message *ms;
2255 struct dlm_mhandle *mh;
2256 int to_nodeid, error;
2257
2258 to_nodeid = lkb->lkb_nodeid;
2259
2260 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2261 if (error)
2262 goto out;
2263
2264 send_args(r, lkb, ms);
2265
2266 ms->m_bastmode = mode;
2267
2268 error = send_message(mh, ms);
2269 out:
2270 return error;
2271}
2272
2273static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2274{
2275 struct dlm_message *ms;
2276 struct dlm_mhandle *mh;
2277 int to_nodeid, error;
2278
2279 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2280
2281 to_nodeid = dlm_dir_nodeid(r);
2282
2283 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2284 if (error)
2285 goto fail;
2286
2287 send_args(r, lkb, ms);
2288
2289 error = send_message(mh, ms);
2290 if (error)
2291 goto fail;
2292 return 0;
2293
2294 fail:
2295 remove_from_waiters(lkb);
2296 return error;
2297}
2298
2299static int send_remove(struct dlm_rsb *r)
2300{
2301 struct dlm_message *ms;
2302 struct dlm_mhandle *mh;
2303 int to_nodeid, error;
2304
2305 to_nodeid = dlm_dir_nodeid(r);
2306
2307 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2308 if (error)
2309 goto out;
2310
2311 memcpy(ms->m_extra, r->res_name, r->res_length);
2312 ms->m_hash = r->res_hash;
2313
2314 error = send_message(mh, ms);
2315 out:
2316 return error;
2317}
2318
2319static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2320 int mstype, int rv)
2321{
2322 struct dlm_message *ms;
2323 struct dlm_mhandle *mh;
2324 int to_nodeid, error;
2325
2326 to_nodeid = lkb->lkb_nodeid;
2327
2328 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2329 if (error)
2330 goto out;
2331
2332 send_args(r, lkb, ms);
2333
2334 ms->m_result = rv;
2335
2336 error = send_message(mh, ms);
2337 out:
2338 return error;
2339}
2340
2341static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2342{
2343 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2344}
2345
2346static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2347{
2348 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2349}
2350
2351static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2352{
2353 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2354}
2355
2356static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2357{
2358 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2359}
2360
2361static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2362 int ret_nodeid, int rv)
2363{
2364 struct dlm_rsb *r = &ls->ls_stub_rsb;
2365 struct dlm_message *ms;
2366 struct dlm_mhandle *mh;
2367 int error, nodeid = ms_in->m_header.h_nodeid;
2368
2369 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2370 if (error)
2371 goto out;
2372
2373 ms->m_lkid = ms_in->m_lkid;
2374 ms->m_result = rv;
2375 ms->m_nodeid = ret_nodeid;
2376
2377 error = send_message(mh, ms);
2378 out:
2379 return error;
2380}
2381
2382/* which args we save from a received message depends heavily on the type
2383 of message, unlike the send side where we can safely send everything about
2384 the lkb for any type of message */
2385
2386static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2387{
2388 lkb->lkb_exflags = ms->m_exflags;
2389 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2390 (ms->m_flags & 0x0000FFFF);
2391}
2392
2393static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2394{
2395 lkb->lkb_sbflags = ms->m_sbflags;
2396 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2397 (ms->m_flags & 0x0000FFFF);
2398}
2399
2400static int receive_extralen(struct dlm_message *ms)
2401{
2402 return (ms->m_header.h_length - sizeof(struct dlm_message));
2403}
2404
2405static int receive_range(struct dlm_ls *ls, struct dlm_lkb *lkb,
2406 struct dlm_message *ms)
2407{
2408 if (lkb->lkb_flags & DLM_IFL_RANGE) {
2409 if (!lkb->lkb_range)
2410 lkb->lkb_range = allocate_range(ls);
2411 if (!lkb->lkb_range)
2412 return -ENOMEM;
2413 lkb->lkb_range[RQ_RANGE_START] = ms->m_range[0];
2414 lkb->lkb_range[RQ_RANGE_END] = ms->m_range[1];
2415 }
2416 return 0;
2417}
2418
2419static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420 struct dlm_message *ms)
2421{
2422 int len;
2423
2424 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2425 if (!lkb->lkb_lvbptr)
2426 lkb->lkb_lvbptr = allocate_lvb(ls);
2427 if (!lkb->lkb_lvbptr)
2428 return -ENOMEM;
2429 len = receive_extralen(ms);
2430 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2431 }
2432 return 0;
2433}
2434
2435static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2436 struct dlm_message *ms)
2437{
2438 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2439 lkb->lkb_ownpid = ms->m_pid;
2440 lkb->lkb_remid = ms->m_lkid;
2441 lkb->lkb_grmode = DLM_LOCK_IV;
2442 lkb->lkb_rqmode = ms->m_rqmode;
2443 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2444 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2445
2446 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2447
2448 if (receive_range(ls, lkb, ms))
2449 return -ENOMEM;
2450
2451 if (receive_lvb(ls, lkb, ms))
2452 return -ENOMEM;
2453
2454 return 0;
2455}
2456
2457static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2458 struct dlm_message *ms)
2459{
2460 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2461 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2462 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2463 lkb->lkb_id, lkb->lkb_remid);
2464 return -EINVAL;
2465 }
2466
2467 if (!is_master_copy(lkb))
2468 return -EINVAL;
2469
2470 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2471 return -EBUSY;
2472
2473 if (receive_range(ls, lkb, ms))
2474 return -ENOMEM;
2475 if (lkb->lkb_range) {
2476 lkb->lkb_range[GR_RANGE_START] = 0LL;
2477 lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
2478 }
2479
2480 if (receive_lvb(ls, lkb, ms))
2481 return -ENOMEM;
2482
2483 lkb->lkb_rqmode = ms->m_rqmode;
2484 lkb->lkb_lvbseq = ms->m_lvbseq;
2485
2486 return 0;
2487}
2488
2489static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2490 struct dlm_message *ms)
2491{
2492 if (!is_master_copy(lkb))
2493 return -EINVAL;
2494 if (receive_lvb(ls, lkb, ms))
2495 return -ENOMEM;
2496 return 0;
2497}
2498
2499/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2500 uses to send a reply and that the remote end uses to process the reply. */
2501
2502static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2503{
2504 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2505 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2506 lkb->lkb_remid = ms->m_lkid;
2507}
2508
2509static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2510{
2511 struct dlm_lkb *lkb;
2512 struct dlm_rsb *r;
2513 int error, namelen;
2514
2515 error = create_lkb(ls, &lkb);
2516 if (error)
2517 goto fail;
2518
2519 receive_flags(lkb, ms);
2520 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2521 error = receive_request_args(ls, lkb, ms);
2522 if (error) {
2523 put_lkb(lkb);
2524 goto fail;
2525 }
2526
2527 namelen = receive_extralen(ms);
2528
2529 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2530 if (error) {
2531 put_lkb(lkb);
2532 goto fail;
2533 }
2534
2535 lock_rsb(r);
2536
2537 attach_lkb(r, lkb);
2538 error = do_request(r, lkb);
2539 send_request_reply(r, lkb, error);
2540
2541 unlock_rsb(r);
2542 put_rsb(r);
2543
2544 if (error == -EINPROGRESS)
2545 error = 0;
2546 if (error)
2547 put_lkb(lkb);
2548 return;
2549
2550 fail:
2551 setup_stub_lkb(ls, ms);
2552 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2553}
2554
2555static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2556{
2557 struct dlm_lkb *lkb;
2558 struct dlm_rsb *r;
2559 int error, reply = TRUE;
2560
2561 error = find_lkb(ls, ms->m_remid, &lkb);
2562 if (error)
2563 goto fail;
2564
2565 r = lkb->lkb_resource;
2566
2567 hold_rsb(r);
2568 lock_rsb(r);
2569
2570 receive_flags(lkb, ms);
2571 error = receive_convert_args(ls, lkb, ms);
2572 if (error)
2573 goto out;
2574 reply = !down_conversion(lkb);
2575
2576 error = do_convert(r, lkb);
2577 out:
2578 if (reply)
2579 send_convert_reply(r, lkb, error);
2580
2581 unlock_rsb(r);
2582 put_rsb(r);
2583 put_lkb(lkb);
2584 return;
2585
2586 fail:
2587 setup_stub_lkb(ls, ms);
2588 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2589}
2590
2591static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2592{
2593 struct dlm_lkb *lkb;
2594 struct dlm_rsb *r;
2595 int error;
2596
2597 error = find_lkb(ls, ms->m_remid, &lkb);
2598 if (error)
2599 goto fail;
2600
2601 r = lkb->lkb_resource;
2602
2603 hold_rsb(r);
2604 lock_rsb(r);
2605
2606 receive_flags(lkb, ms);
2607 error = receive_unlock_args(ls, lkb, ms);
2608 if (error)
2609 goto out;
2610
2611 error = do_unlock(r, lkb);
2612 out:
2613 send_unlock_reply(r, lkb, error);
2614
2615 unlock_rsb(r);
2616 put_rsb(r);
2617 put_lkb(lkb);
2618 return;
2619
2620 fail:
2621 setup_stub_lkb(ls, ms);
2622 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2623}
2624
2625static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2626{
2627 struct dlm_lkb *lkb;
2628 struct dlm_rsb *r;
2629 int error;
2630
2631 error = find_lkb(ls, ms->m_remid, &lkb);
2632 if (error)
2633 goto fail;
2634
2635 receive_flags(lkb, ms);
2636
2637 r = lkb->lkb_resource;
2638
2639 hold_rsb(r);
2640 lock_rsb(r);
2641
2642 error = do_cancel(r, lkb);
2643 send_cancel_reply(r, lkb, error);
2644
2645 unlock_rsb(r);
2646 put_rsb(r);
2647 put_lkb(lkb);
2648 return;
2649
2650 fail:
2651 setup_stub_lkb(ls, ms);
2652 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2653}
2654
2655static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2656{
2657 struct dlm_lkb *lkb;
2658 struct dlm_rsb *r;
2659 int error;
2660
2661 error = find_lkb(ls, ms->m_remid, &lkb);
2662 if (error) {
2663 log_error(ls, "receive_grant no lkb");
2664 return;
2665 }
2666 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2667
2668 r = lkb->lkb_resource;
2669
2670 hold_rsb(r);
2671 lock_rsb(r);
2672
2673 receive_flags_reply(lkb, ms);
2674 grant_lock_pc(r, lkb, ms);
2675 queue_cast(r, lkb, 0);
2676
2677 unlock_rsb(r);
2678 put_rsb(r);
2679 put_lkb(lkb);
2680}
2681
2682static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2683{
2684 struct dlm_lkb *lkb;
2685 struct dlm_rsb *r;
2686 int error;
2687
2688 error = find_lkb(ls, ms->m_remid, &lkb);
2689 if (error) {
2690 log_error(ls, "receive_bast no lkb");
2691 return;
2692 }
2693 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2694
2695 r = lkb->lkb_resource;
2696
2697 hold_rsb(r);
2698 lock_rsb(r);
2699
2700 queue_bast(r, lkb, ms->m_bastmode);
2701
2702 unlock_rsb(r);
2703 put_rsb(r);
2704 put_lkb(lkb);
2705}
2706
2707static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2708{
2709 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2710
2711 from_nodeid = ms->m_header.h_nodeid;
2712 our_nodeid = dlm_our_nodeid();
2713
2714 len = receive_extralen(ms);
2715
2716 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2717 if (dir_nodeid != our_nodeid) {
2718 log_error(ls, "lookup dir_nodeid %d from %d",
2719 dir_nodeid, from_nodeid);
2720 error = -EINVAL;
2721 ret_nodeid = -1;
2722 goto out;
2723 }
2724
2725 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2726
2727 /* Optimization: we're master so treat lookup as a request */
2728 if (!error && ret_nodeid == our_nodeid) {
2729 receive_request(ls, ms);
2730 return;
2731 }
2732 out:
2733 send_lookup_reply(ls, ms, ret_nodeid, error);
2734}
2735
2736static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2737{
2738 int len, dir_nodeid, from_nodeid;
2739
2740 from_nodeid = ms->m_header.h_nodeid;
2741
2742 len = receive_extralen(ms);
2743
2744 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2745 if (dir_nodeid != dlm_our_nodeid()) {
2746 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2747 dir_nodeid, from_nodeid);
2748 return;
2749 }
2750
2751 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2752}
2753
2754static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2755{
2756 struct dlm_lkb *lkb;
2757 struct dlm_rsb *r;
2758 int error, mstype;
2759
2760 error = find_lkb(ls, ms->m_remid, &lkb);
2761 if (error) {
2762 log_error(ls, "receive_request_reply no lkb");
2763 return;
2764 }
2765 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2766
2767 mstype = lkb->lkb_wait_type;
2768 error = remove_from_waiters(lkb);
2769 if (error) {
2770 log_error(ls, "receive_request_reply not on waiters");
2771 goto out;
2772 }
2773
2774 /* this is the value returned from do_request() on the master */
2775 error = ms->m_result;
2776
2777 r = lkb->lkb_resource;
2778 hold_rsb(r);
2779 lock_rsb(r);
2780
2781 /* Optimization: the dir node was also the master, so it took our
2782 lookup as a request and sent request reply instead of lookup reply */
2783 if (mstype == DLM_MSG_LOOKUP) {
2784 r->res_nodeid = ms->m_header.h_nodeid;
2785 lkb->lkb_nodeid = r->res_nodeid;
2786 }
2787
2788 switch (error) {
2789 case -EAGAIN:
2790 /* request would block (be queued) on remote master;
2791 the unhold undoes the original ref from create_lkb()
2792 so it leads to the lkb being freed */
2793 queue_cast(r, lkb, -EAGAIN);
2794 confirm_master(r, -EAGAIN);
2795 unhold_lkb(lkb);
2796 break;
2797
2798 case -EINPROGRESS:
2799 case 0:
2800 /* request was queued or granted on remote master */
2801 receive_flags_reply(lkb, ms);
2802 lkb->lkb_remid = ms->m_lkid;
2803 if (error)
2804 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2805 else {
2806 grant_lock_pc(r, lkb, ms);
2807 queue_cast(r, lkb, 0);
2808 }
2809 confirm_master(r, error);
2810 break;
2811
2812 case -ENOENT:
2813 case -ENOTBLK:
2814 /* find_rsb failed to find rsb or rsb wasn't master */
2815 r->res_nodeid = -1;
2816 lkb->lkb_nodeid = -1;
2817 _request_lock(r, lkb);
2818 break;
2819
2820 default:
2821 log_error(ls, "receive_request_reply error %d", error);
2822 }
2823
2824 unlock_rsb(r);
2825 put_rsb(r);
2826 out:
2827 put_lkb(lkb);
2828}
2829
2830static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2831 struct dlm_message *ms)
2832{
2833 int error = ms->m_result;
2834
2835 /* this is the value returned from do_convert() on the master */
2836
2837 switch (error) {
2838 case -EAGAIN:
2839 /* convert would block (be queued) on remote master */
2840 queue_cast(r, lkb, -EAGAIN);
2841 break;
2842
2843 case -EINPROGRESS:
2844 /* convert was queued on remote master */
2845 del_lkb(r, lkb);
2846 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2847 break;
2848
2849 case 0:
2850 /* convert was granted on remote master */
2851 receive_flags_reply(lkb, ms);
2852 grant_lock_pc(r, lkb, ms);
2853 queue_cast(r, lkb, 0);
2854 break;
2855
2856 default:
2857 log_error(r->res_ls, "receive_convert_reply error %d", error);
2858 }
2859}
2860
2861static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2862{
2863 struct dlm_rsb *r = lkb->lkb_resource;
2864
2865 hold_rsb(r);
2866 lock_rsb(r);
2867
2868 __receive_convert_reply(r, lkb, ms);
2869
2870 unlock_rsb(r);
2871 put_rsb(r);
2872}
2873
2874static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2875{
2876 struct dlm_lkb *lkb;
2877 int error;
2878
2879 error = find_lkb(ls, ms->m_remid, &lkb);
2880 if (error) {
2881 log_error(ls, "receive_convert_reply no lkb");
2882 return;
2883 }
2884 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2885
2886 error = remove_from_waiters(lkb);
2887 if (error) {
2888 log_error(ls, "receive_convert_reply not on waiters");
2889 goto out;
2890 }
2891
2892 _receive_convert_reply(lkb, ms);
2893 out:
2894 put_lkb(lkb);
2895}
2896
2897static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2898{
2899 struct dlm_rsb *r = lkb->lkb_resource;
2900 int error = ms->m_result;
2901
2902 hold_rsb(r);
2903 lock_rsb(r);
2904
2905 /* this is the value returned from do_unlock() on the master */
2906
2907 switch (error) {
2908 case -DLM_EUNLOCK:
2909 receive_flags_reply(lkb, ms);
2910 remove_lock_pc(r, lkb);
2911 queue_cast(r, lkb, -DLM_EUNLOCK);
2912 break;
2913 default:
2914 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2915 }
2916
2917 unlock_rsb(r);
2918 put_rsb(r);
2919}
2920
2921static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2922{
2923 struct dlm_lkb *lkb;
2924 int error;
2925
2926 error = find_lkb(ls, ms->m_remid, &lkb);
2927 if (error) {
2928 log_error(ls, "receive_unlock_reply no lkb");
2929 return;
2930 }
2931 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2932
2933 error = remove_from_waiters(lkb);
2934 if (error) {
2935 log_error(ls, "receive_unlock_reply not on waiters");
2936 goto out;
2937 }
2938
2939 _receive_unlock_reply(lkb, ms);
2940 out:
2941 put_lkb(lkb);
2942}
2943
2944static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2945{
2946 struct dlm_rsb *r = lkb->lkb_resource;
2947 int error = ms->m_result;
2948
2949 hold_rsb(r);
2950 lock_rsb(r);
2951
2952 /* this is the value returned from do_cancel() on the master */
2953
2954 switch (error) {
2955 case -DLM_ECANCEL:
2956 receive_flags_reply(lkb, ms);
2957 revert_lock_pc(r, lkb);
2958 queue_cast(r, lkb, -DLM_ECANCEL);
2959 break;
2960 default:
2961 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2962 }
2963
2964 unlock_rsb(r);
2965 put_rsb(r);
2966}
2967
2968static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2969{
2970 struct dlm_lkb *lkb;
2971 int error;
2972
2973 error = find_lkb(ls, ms->m_remid, &lkb);
2974 if (error) {
2975 log_error(ls, "receive_cancel_reply no lkb");
2976 return;
2977 }
2978 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2979
2980 error = remove_from_waiters(lkb);
2981 if (error) {
2982 log_error(ls, "receive_cancel_reply not on waiters");
2983 goto out;
2984 }
2985
2986 _receive_cancel_reply(lkb, ms);
2987 out:
2988 put_lkb(lkb);
2989}
2990
2991static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2992{
2993 struct dlm_lkb *lkb;
2994 struct dlm_rsb *r;
2995 int error, ret_nodeid;
2996
2997 error = find_lkb(ls, ms->m_lkid, &lkb);
2998 if (error) {
2999 log_error(ls, "receive_lookup_reply no lkb");
3000 return;
3001 }
3002
3003 error = remove_from_waiters(lkb);
3004 if (error) {
3005 log_error(ls, "receive_lookup_reply not on waiters");
3006 goto out;
3007 }
3008
3009 /* this is the value returned by dlm_dir_lookup on dir node
3010 FIXME: will a non-zero error ever be returned? */
3011 error = ms->m_result;
3012
3013 r = lkb->lkb_resource;
3014 hold_rsb(r);
3015 lock_rsb(r);
3016
3017 ret_nodeid = ms->m_nodeid;
3018 if (ret_nodeid == dlm_our_nodeid()) {
3019 r->res_nodeid = 0;
3020 ret_nodeid = 0;
3021 r->res_first_lkid = 0;
3022 } else {
3023 /* set_master() will copy res_nodeid to lkb_nodeid */
3024 r->res_nodeid = ret_nodeid;
3025 }
3026
3027 _request_lock(r, lkb);
3028
3029 if (!ret_nodeid)
3030 process_lookup_list(r);
3031
3032 unlock_rsb(r);
3033 put_rsb(r);
3034 out:
3035 put_lkb(lkb);
3036}
3037
3038int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3039{
3040 struct dlm_message *ms = (struct dlm_message *) hd;
3041 struct dlm_ls *ls;
3042 int error;
3043
3044 if (!recovery)
3045 dlm_message_in(ms);
3046
3047 ls = dlm_find_lockspace_global(hd->h_lockspace);
3048 if (!ls) {
3049 log_print("drop message %d from %d for unknown lockspace %d",
3050 ms->m_type, nodeid, hd->h_lockspace);
3051 return -EINVAL;
3052 }
3053
3054 /* recovery may have just ended leaving a bunch of backed-up requests
3055 in the requestqueue; wait while dlm_recoverd clears them */
3056
3057 if (!recovery)
3058 dlm_wait_requestqueue(ls);
3059
3060 /* recovery may have just started while there were a bunch of
3061 in-flight requests -- save them in requestqueue to be processed
3062 after recovery. we can't let dlm_recvd block on the recovery
3063 lock. if dlm_recoverd is calling this function to clear the
3064 requestqueue, it needs to be interrupted (-EINTR) if another
3065 recovery operation is starting. */
3066
3067 while (1) {
3068 if (dlm_locking_stopped(ls)) {
3069 if (!recovery)
3070 dlm_add_requestqueue(ls, nodeid, hd);
3071 error = -EINTR;
3072 goto out;
3073 }
3074
3075 if (lock_recovery_try(ls))
3076 break;
3077 schedule();
3078 }
3079
3080 switch (ms->m_type) {
3081
3082 /* messages sent to a master node */
3083
3084 case DLM_MSG_REQUEST:
3085 receive_request(ls, ms);
3086 break;
3087
3088 case DLM_MSG_CONVERT:
3089 receive_convert(ls, ms);
3090 break;
3091
3092 case DLM_MSG_UNLOCK:
3093 receive_unlock(ls, ms);
3094 break;
3095
3096 case DLM_MSG_CANCEL:
3097 receive_cancel(ls, ms);
3098 break;
3099
3100 /* messages sent from a master node (replies to above) */
3101
3102 case DLM_MSG_REQUEST_REPLY:
3103 receive_request_reply(ls, ms);
3104 break;
3105
3106 case DLM_MSG_CONVERT_REPLY:
3107 receive_convert_reply(ls, ms);
3108 break;
3109
3110 case DLM_MSG_UNLOCK_REPLY:
3111 receive_unlock_reply(ls, ms);
3112 break;
3113
3114 case DLM_MSG_CANCEL_REPLY:
3115 receive_cancel_reply(ls, ms);
3116 break;
3117
3118 /* messages sent from a master node (only two types of async msg) */
3119
3120 case DLM_MSG_GRANT:
3121 receive_grant(ls, ms);
3122 break;
3123
3124 case DLM_MSG_BAST:
3125 receive_bast(ls, ms);
3126 break;
3127
3128 /* messages sent to a dir node */
3129
3130 case DLM_MSG_LOOKUP:
3131 receive_lookup(ls, ms);
3132 break;
3133
3134 case DLM_MSG_REMOVE:
3135 receive_remove(ls, ms);
3136 break;
3137
3138 /* messages sent from a dir node (remove has no reply) */
3139
3140 case DLM_MSG_LOOKUP_REPLY:
3141 receive_lookup_reply(ls, ms);
3142 break;
3143
3144 default:
3145 log_error(ls, "unknown message type %d", ms->m_type);
3146 }
3147
3148 unlock_recovery(ls);
3149 out:
3150 dlm_put_lockspace(ls);
3151 dlm_astd_wake();
3152 return 0;
3153}
3154
3155
3156/*
3157 * Recovery related
3158 */
3159
3160static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3161{
3162 if (middle_conversion(lkb)) {
3163 hold_lkb(lkb);
3164 ls->ls_stub_ms.m_result = -EINPROGRESS;
3165 _remove_from_waiters(lkb);
3166 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3167
3168 /* Same special case as in receive_rcom_lock_args() */
3169 lkb->lkb_grmode = DLM_LOCK_IV;
3170 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3171 unhold_lkb(lkb);
3172
3173 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3174 lkb->lkb_flags |= DLM_IFL_RESEND;
3175 }
3176
3177 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3178 conversions are async; there's no reply from the remote master */
3179}
3180
3181/* A waiting lkb needs recovery if the master node has failed, or
3182 the master node is changing (only when no directory is used) */
3183
3184static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3185{
3186 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3187 return 1;
3188
3189 if (!dlm_no_directory(ls))
3190 return 0;
3191
3192 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3193 return 1;
3194
3195 return 0;
3196}
3197
3198/* Recovery for locks that are waiting for replies from nodes that are now
3199 gone. We can just complete unlocks and cancels by faking a reply from the
3200 dead node. Requests and up-conversions we flag to be resent after
3201 recovery. Down-conversions can just be completed with a fake reply like
3202 unlocks. Conversions between PR and CW need special attention. */
3203
3204void dlm_recover_waiters_pre(struct dlm_ls *ls)
3205{
3206 struct dlm_lkb *lkb, *safe;
3207
3208 down(&ls->ls_waiters_sem);
3209
3210 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3211 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3212 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3213
3214 /* all outstanding lookups, regardless of destination will be
3215 resent after recovery is done */
3216
3217 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3218 lkb->lkb_flags |= DLM_IFL_RESEND;
3219 continue;
3220 }
3221
3222 if (!waiter_needs_recovery(ls, lkb))
3223 continue;
3224
3225 switch (lkb->lkb_wait_type) {
3226
3227 case DLM_MSG_REQUEST:
3228 lkb->lkb_flags |= DLM_IFL_RESEND;
3229 break;
3230
3231 case DLM_MSG_CONVERT:
3232 recover_convert_waiter(ls, lkb);
3233 break;
3234
3235 case DLM_MSG_UNLOCK:
3236 hold_lkb(lkb);
3237 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3238 _remove_from_waiters(lkb);
3239 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3240 put_lkb(lkb);
3241 break;
3242
3243 case DLM_MSG_CANCEL:
3244 hold_lkb(lkb);
3245 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3246 _remove_from_waiters(lkb);
3247 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3248 put_lkb(lkb);
3249 break;
3250
3251 default:
3252 log_error(ls, "invalid lkb wait_type %d",
3253 lkb->lkb_wait_type);
3254 }
3255 }
3256 up(&ls->ls_waiters_sem);
3257}
3258
3259static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3260{
3261 struct dlm_lkb *lkb;
3262 int rv = 0;
3263
3264 down(&ls->ls_waiters_sem);
3265 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3266 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3267 rv = lkb->lkb_wait_type;
3268 _remove_from_waiters(lkb);
3269 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3270 break;
3271 }
3272 }
3273 up(&ls->ls_waiters_sem);
3274
3275 if (!rv)
3276 lkb = NULL;
3277 *lkb_ret = lkb;
3278 return rv;
3279}
3280
3281/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3282 master or dir-node for r. Processing the lkb may result in it being placed
3283 back on waiters. */
3284
3285int dlm_recover_waiters_post(struct dlm_ls *ls)
3286{
3287 struct dlm_lkb *lkb;
3288 struct dlm_rsb *r;
3289 int error = 0, mstype;
3290
3291 while (1) {
3292 if (dlm_locking_stopped(ls)) {
3293 log_debug(ls, "recover_waiters_post aborted");
3294 error = -EINTR;
3295 break;
3296 }
3297
3298 mstype = remove_resend_waiter(ls, &lkb);
3299 if (!mstype)
3300 break;
3301
3302 r = lkb->lkb_resource;
3303
3304 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3305 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3306
3307 switch (mstype) {
3308
3309 case DLM_MSG_LOOKUP:
3310 hold_rsb(r);
3311 lock_rsb(r);
3312 _request_lock(r, lkb);
3313 if (is_master(r))
3314 confirm_master(r, 0);
3315 unlock_rsb(r);
3316 put_rsb(r);
3317 break;
3318
3319 case DLM_MSG_REQUEST:
3320 hold_rsb(r);
3321 lock_rsb(r);
3322 _request_lock(r, lkb);
3323 unlock_rsb(r);
3324 put_rsb(r);
3325 break;
3326
3327 case DLM_MSG_CONVERT:
3328 hold_rsb(r);
3329 lock_rsb(r);
3330 _convert_lock(r, lkb);
3331 unlock_rsb(r);
3332 put_rsb(r);
3333 break;
3334
3335 default:
3336 log_error(ls, "recover_waiters_post type %d", mstype);
3337 }
3338 }
3339
3340 return error;
3341}
3342
3343static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3344 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3345{
3346 struct dlm_ls *ls = r->res_ls;
3347 struct dlm_lkb *lkb, *safe;
3348
3349 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3350 if (test(ls, lkb)) {
3351 del_lkb(r, lkb);
3352 /* this put should free the lkb */
3353 if (!put_lkb(lkb))
3354 log_error(ls, "purged lkb not released");
3355 }
3356 }
3357}
3358
3359static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3360{
3361 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3362}
3363
3364static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3365{
3366 return is_master_copy(lkb);
3367}
3368
3369static void purge_dead_locks(struct dlm_rsb *r)
3370{
3371 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3372 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3373 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3374}
3375
3376void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3377{
3378 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3379 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3380 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3381}
3382
3383/* Get rid of locks held by nodes that are gone. */
3384
3385int dlm_purge_locks(struct dlm_ls *ls)
3386{
3387 struct dlm_rsb *r;
3388
3389 log_debug(ls, "dlm_purge_locks");
3390
3391 down_write(&ls->ls_root_sem);
3392 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3393 hold_rsb(r);
3394 lock_rsb(r);
3395 if (is_master(r))
3396 purge_dead_locks(r);
3397 unlock_rsb(r);
3398 unhold_rsb(r);
3399
3400 schedule();
3401 }
3402 up_write(&ls->ls_root_sem);
3403
3404 return 0;
3405}
3406
3407int dlm_grant_after_purge(struct dlm_ls *ls)
3408{
3409 struct dlm_rsb *r;
3410 int i;
3411
3412 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3413 read_lock(&ls->ls_rsbtbl[i].lock);
3414 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
3415 hold_rsb(r);
3416 lock_rsb(r);
3417 if (is_master(r)) {
3418 grant_pending_locks(r);
3419 confirm_master(r, 0);
3420 }
3421 unlock_rsb(r);
3422 put_rsb(r);
3423 }
3424 read_unlock(&ls->ls_rsbtbl[i].lock);
3425 }
3426
3427 return 0;
3428}
3429
3430static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3431 uint32_t remid)
3432{
3433 struct dlm_lkb *lkb;
3434
3435 list_for_each_entry(lkb, head, lkb_statequeue) {
3436 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3437 return lkb;
3438 }
3439 return NULL;
3440}
3441
3442static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3443 uint32_t remid)
3444{
3445 struct dlm_lkb *lkb;
3446
3447 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3448 if (lkb)
3449 return lkb;
3450 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3451 if (lkb)
3452 return lkb;
3453 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3454 if (lkb)
3455 return lkb;
3456 return NULL;
3457}
3458
3459static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3460 struct dlm_rsb *r, struct dlm_rcom *rc)
3461{
3462 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3463 int lvblen;
3464
3465 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3466 lkb->lkb_ownpid = rl->rl_ownpid;
3467 lkb->lkb_remid = rl->rl_lkid;
3468 lkb->lkb_exflags = rl->rl_exflags;
3469 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3470 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3471 lkb->lkb_lvbseq = rl->rl_lvbseq;
3472 lkb->lkb_rqmode = rl->rl_rqmode;
3473 lkb->lkb_grmode = rl->rl_grmode;
3474 /* don't set lkb_status because add_lkb wants to itself */
3475
3476 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3477 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3478
3479 if (lkb->lkb_flags & DLM_IFL_RANGE) {
3480 lkb->lkb_range = allocate_range(ls);
3481 if (!lkb->lkb_range)
3482 return -ENOMEM;
3483 memcpy(lkb->lkb_range, rl->rl_range, 4*sizeof(uint64_t));
3484 }
3485
3486 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3487 lkb->lkb_lvbptr = allocate_lvb(ls);
3488 if (!lkb->lkb_lvbptr)
3489 return -ENOMEM;
3490 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3491 sizeof(struct rcom_lock);
3492 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3493 }
3494
3495 /* Conversions between PR and CW (middle modes) need special handling.
3496 The real granted mode of these converting locks cannot be determined
3497 until all locks have been rebuilt on the rsb (recover_conversion) */
3498
3499 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3500 rl->rl_status = DLM_LKSTS_CONVERT;
3501 lkb->lkb_grmode = DLM_LOCK_IV;
3502 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3503 }
3504
3505 return 0;
3506}
3507
3508/* This lkb may have been recovered in a previous aborted recovery so we need
3509 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3510 If so we just send back a standard reply. If not, we create a new lkb with
3511 the given values and send back our lkid. We send back our lkid by sending
3512 back the rcom_lock struct we got but with the remid field filled in. */
3513
3514int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3515{
3516 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3517 struct dlm_rsb *r;
3518 struct dlm_lkb *lkb;
3519 int error;
3520
3521 if (rl->rl_parent_lkid) {
3522 error = -EOPNOTSUPP;
3523 goto out;
3524 }
3525
3526 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3527 if (error)
3528 goto out;
3529
3530 lock_rsb(r);
3531
3532 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3533 if (lkb) {
3534 error = -EEXIST;
3535 goto out_remid;
3536 }
3537
3538 error = create_lkb(ls, &lkb);
3539 if (error)
3540 goto out_unlock;
3541
3542 error = receive_rcom_lock_args(ls, lkb, r, rc);
3543 if (error) {
3544 put_lkb(lkb);
3545 goto out_unlock;
3546 }
3547
3548 attach_lkb(r, lkb);
3549 add_lkb(r, lkb, rl->rl_status);
3550 error = 0;
3551
3552 out_remid:
3553 /* this is the new value returned to the lock holder for
3554 saving in its process-copy lkb */
3555 rl->rl_remid = lkb->lkb_id;
3556
3557 out_unlock:
3558 unlock_rsb(r);
3559 put_rsb(r);
3560 out:
3561 if (error)
3562 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3563 rl->rl_result = error;
3564 return error;
3565}
3566
3567int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3568{
3569 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3570 struct dlm_rsb *r;
3571 struct dlm_lkb *lkb;
3572 int error;
3573
3574 error = find_lkb(ls, rl->rl_lkid, &lkb);
3575 if (error) {
3576 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3577 return error;
3578 }
3579
3580 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3581
3582 error = rl->rl_result;
3583
3584 r = lkb->lkb_resource;
3585 hold_rsb(r);
3586 lock_rsb(r);
3587
3588 switch (error) {
3589 case -EEXIST:
3590 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3591 /* fall through */
3592 case 0:
3593 lkb->lkb_remid = rl->rl_remid;
3594 break;
3595 default:
3596 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3597 error, lkb->lkb_id);
3598 }
3599
3600 /* an ack for dlm_recover_locks() which waits for replies from
3601 all the locks it sends to new masters */
3602 dlm_recovered_lock(r);
3603
3604 unlock_rsb(r);
3605 put_rsb(r);
3606 put_lkb(lkb);
3607
3608 return 0;
3609}
3610