aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-07-26 17:03:42 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-07-26 17:03:42 -0400
commit548ed10228093f1036297a333d1c1064f4daefdc (patch)
tree211a19371f08f963c520f398e290e7c94e555139 /fs/dlm/rcom.c
parent98077a720584182fe594ccbf8a7e6ce2a00796b3 (diff)
parent96006ea6d4eea73466e90ef353bf34e507724e77 (diff)
Merge tag 'dlm-3.6' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updatesfrom David Teigland: "This set includes a major redesign of recording the master node for resources. The old dir hash table, which just held the master node for each resource, has been removed. The rsb hash table has always duplicated the master node value from the dir, and is now the single record of it. Having two full hash tables of all resources has always been a waste, especially since one just duplicated a single value from the other. Local requests will now often require one instead of two lengthy hash table searches. The other substantial change is made possible by the dirtbl removal, and fixes a long standing race between resource removal and lookup by reworking how removal is done. At the same time it improves the efficiency of removal by avoiding repeated searches through a hash bucket. The other commits include minor fixes and changes." * tag 'dlm-3.6' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm: dlm: fix missing dir remove dlm: fix conversion deadlock from recovery dlm: use wait_event_timeout dlm: fix race between remove and lookup dlm: use idr instead of list for recovered rsbs dlm: use rsbtbl as resource directory
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c147
1 files changed, 112 insertions, 35 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 64d3e2b958c7..87f1a56eab32 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,8 +23,6 @@
23#include "memory.h" 23#include "memory.h"
24#include "lock.h" 24#include "lock.h"
25#include "util.h" 25#include "util.h"
26#include "member.h"
27
28 26
29static int rcom_response(struct dlm_ls *ls) 27static int rcom_response(struct dlm_ls *ls)
30{ 28{
@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
275 struct dlm_rcom *rc; 273 struct dlm_rcom *rc;
276 struct dlm_mhandle *mh; 274 struct dlm_mhandle *mh;
277 int error = 0; 275 int error = 0;
278 int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
279 276
280 ls->ls_recover_nodeid = nodeid; 277 ls->ls_recover_nodeid = nodeid;
281 278
282 if (nodeid == dlm_our_nodeid()) {
283 ls->ls_recover_buf->rc_header.h_length =
284 dlm_config.ci_buffer_size;
285 dlm_copy_master_names(ls, last_name, last_len,
286 ls->ls_recover_buf->rc_buf,
287 max_size, nodeid);
288 goto out;
289 }
290
291 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); 279 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
292 if (error) 280 if (error)
293 goto out; 281 goto out;
@@ -337,7 +325,26 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
337 if (error) 325 if (error)
338 goto out; 326 goto out;
339 memcpy(rc->rc_buf, r->res_name, r->res_length); 327 memcpy(rc->rc_buf, r->res_name, r->res_length);
340 rc->rc_id = (unsigned long) r; 328 rc->rc_id = (unsigned long) r->res_id;
329
330 send_rcom(ls, mh, rc);
331 out:
332 return error;
333}
334
335int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
336{
337 struct dlm_rcom *rc;
338 struct dlm_mhandle *mh;
339 struct dlm_ls *ls = r->res_ls;
340 int error;
341
342 error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
343 &rc, &mh);
344 if (error)
345 goto out;
346 memcpy(rc->rc_buf, r->res_name, r->res_length);
347 rc->rc_id = 0xFFFFFFFF;
341 348
342 send_rcom(ls, mh, rc); 349 send_rcom(ls, mh, rc);
343 out: 350 out:
@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
355 if (error) 362 if (error)
356 return; 363 return;
357 364
358 error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); 365 if (rc_in->rc_id == 0xFFFFFFFF) {
366 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
367 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
368 return;
369 }
370
371 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
372 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
359 if (error) 373 if (error)
360 ret_nodeid = error; 374 ret_nodeid = error;
361 rc->rc_result = ret_nodeid; 375 rc->rc_result = ret_nodeid;
@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
486 return 0; 500 return 0;
487} 501}
488 502
503/*
504 * Ignore messages for stage Y before we set
505 * recover_status bit for stage X:
506 *
507 * recover_status = 0
508 *
509 * dlm_recover_members()
510 * - send nothing
511 * - recv nothing
512 * - ignore NAMES, NAMES_REPLY
513 * - ignore LOOKUP, LOOKUP_REPLY
514 * - ignore LOCK, LOCK_REPLY
515 *
516 * recover_status |= NODES
517 *
518 * dlm_recover_members_wait()
519 *
520 * dlm_recover_directory()
521 * - send NAMES
522 * - recv NAMES_REPLY
523 * - ignore LOOKUP, LOOKUP_REPLY
524 * - ignore LOCK, LOCK_REPLY
525 *
526 * recover_status |= DIR
527 *
528 * dlm_recover_directory_wait()
529 *
530 * dlm_recover_masters()
531 * - send LOOKUP
532 * - recv LOOKUP_REPLY
533 *
534 * dlm_recover_locks()
535 * - send LOCKS
536 * - recv LOCKS_REPLY
537 *
538 * recover_status |= LOCKS
539 *
540 * dlm_recover_locks_wait()
541 *
542 * recover_status |= DONE
543 */
544
489/* Called by dlm_recv; corresponds to dlm_receive_message() but special 545/* Called by dlm_recv; corresponds to dlm_receive_message() but special
490 recovery-only comms are sent through here. */ 546 recovery-only comms are sent through here. */
491 547
492void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 548void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
493{ 549{
494 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 550 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
495 int stop, reply = 0, lock = 0; 551 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
496 uint32_t status; 552 uint32_t status;
497 uint64_t seq; 553 uint64_t seq;
498 554
499 switch (rc->rc_type) { 555 switch (rc->rc_type) {
556 case DLM_RCOM_STATUS_REPLY:
557 reply = 1;
558 break;
559 case DLM_RCOM_NAMES:
560 names = 1;
561 break;
562 case DLM_RCOM_NAMES_REPLY:
563 names = 1;
564 reply = 1;
565 break;
566 case DLM_RCOM_LOOKUP:
567 lookup = 1;
568 break;
569 case DLM_RCOM_LOOKUP_REPLY:
570 lookup = 1;
571 reply = 1;
572 break;
500 case DLM_RCOM_LOCK: 573 case DLM_RCOM_LOCK:
501 lock = 1; 574 lock = 1;
502 break; 575 break;
@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
504 lock = 1; 577 lock = 1;
505 reply = 1; 578 reply = 1;
506 break; 579 break;
507 case DLM_RCOM_STATUS_REPLY:
508 case DLM_RCOM_NAMES_REPLY:
509 case DLM_RCOM_LOOKUP_REPLY:
510 reply = 1;
511 }; 580 };
512 581
513 spin_lock(&ls->ls_recover_lock); 582 spin_lock(&ls->ls_recover_lock);
@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
516 seq = ls->ls_recover_seq; 585 seq = ls->ls_recover_seq;
517 spin_unlock(&ls->ls_recover_lock); 586 spin_unlock(&ls->ls_recover_lock);
518 587
519 if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || 588 if (stop && (rc->rc_type != DLM_RCOM_STATUS))
520 (reply && (rc->rc_seq_reply != seq)) || 589 goto ignore;
521 (lock && !(status & DLM_RS_DIR))) { 590
522 log_limit(ls, "dlm_receive_rcom ignore msg %d " 591 if (reply && (rc->rc_seq_reply != seq))
523 "from %d %llu %llu recover seq %llu sts %x gen %u", 592 goto ignore;
524 rc->rc_type, 593
525 nodeid, 594 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
526 (unsigned long long)rc->rc_seq, 595 goto ignore;
527 (unsigned long long)rc->rc_seq_reply, 596
528 (unsigned long long)seq, 597 if (!(status & DLM_RS_DIR) && (lookup || lock))
529 status, ls->ls_generation); 598 goto ignore;
530 goto out;
531 }
532 599
533 switch (rc->rc_type) { 600 switch (rc->rc_type) {
534 case DLM_RCOM_STATUS: 601 case DLM_RCOM_STATUS:
@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
570 default: 637 default:
571 log_error(ls, "receive_rcom bad type %d", rc->rc_type); 638 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
572 } 639 }
573out: 640 return;
641
642ignore:
643 log_limit(ls, "dlm_receive_rcom ignore msg %d "
644 "from %d %llu %llu recover seq %llu sts %x gen %u",
645 rc->rc_type,
646 nodeid,
647 (unsigned long long)rc->rc_seq,
648 (unsigned long long)rc->rc_seq_reply,
649 (unsigned long long)seq,
650 status, ls->ls_generation);
574 return; 651 return;
575Eshort: 652Eshort:
576 log_error(ls, "recovery message %x from %d is too short", 653 log_error(ls, "recovery message %d from %d is too short",
577 rc->rc_type, nodeid); 654 rc->rc_type, nodeid);
578} 655}
579 656