aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmdomain.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmdomain.c')
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c253
1 files changed, 194 insertions, 59 deletions
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index f0b25f2dd205..6087c4749fee 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -48,6 +48,36 @@
48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49#include "cluster/masklog.h" 49#include "cluster/masklog.h"
50 50
51/*
52 * ocfs2 node maps are array of long int, which limits to send them freely
53 * across the wire due to endianness issues. To workaround this, we convert
54 * long ints to byte arrays. Following 3 routines are helper functions to
55 * set/test/copy bits within those array of bytes
56 */
57static inline void byte_set_bit(u8 nr, u8 map[])
58{
59 map[nr >> 3] |= (1UL << (nr & 7));
60}
61
62static inline int byte_test_bit(u8 nr, u8 map[])
63{
64 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65}
66
67static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68 unsigned int sz)
69{
70 unsigned int nn;
71
72 if (!sz)
73 return;
74
75 memset(dmap, 0, ((sz + 7) >> 3));
76 for (nn = 0 ; nn < sz; nn++)
77 if (test_bit(nn, smap))
78 byte_set_bit(nn, dmap);
79}
80
51static void dlm_free_pagevec(void **vec, int pages) 81static void dlm_free_pagevec(void **vec, int pages)
52{ 82{
53 while (pages--) 83 while (pages--)
@@ -95,10 +125,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
95 125
96#define DLM_DOMAIN_BACKOFF_MS 200 126#define DLM_DOMAIN_BACKOFF_MS 200
97 127
98static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data); 128static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
99static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data); 129 void **ret_data);
100static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data); 130static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
101static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data); 131 void **ret_data);
132static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
133 void **ret_data);
134static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
135 void **ret_data);
102 136
103static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 137static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
104 138
@@ -125,10 +159,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
125 hlist_add_head(&res->hash_node, bucket); 159 hlist_add_head(&res->hash_node, bucket);
126} 160}
127 161
128struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 162struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
129 const char *name, 163 const char *name,
130 unsigned int len, 164 unsigned int len,
131 unsigned int hash) 165 unsigned int hash)
132{ 166{
133 struct hlist_head *bucket; 167 struct hlist_head *bucket;
134 struct hlist_node *list; 168 struct hlist_node *list;
@@ -154,6 +188,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
154 return NULL; 188 return NULL;
155} 189}
156 190
191/* intended to be called by functions which do not care about lock
192 * resources which are being purged (most net _handler functions).
193 * this will return NULL for any lock resource which is found but
194 * currently in the process of dropping its mastery reference.
195 * use __dlm_lookup_lockres_full when you need the lock resource
196 * regardless (e.g. dlm_get_lock_resource) */
197struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
198 const char *name,
199 unsigned int len,
200 unsigned int hash)
201{
202 struct dlm_lock_resource *res = NULL;
203
204 mlog_entry("%.*s\n", len, name);
205
206 assert_spin_locked(&dlm->spinlock);
207
208 res = __dlm_lookup_lockres_full(dlm, name, len, hash);
209 if (res) {
210 spin_lock(&res->spinlock);
211 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
212 spin_unlock(&res->spinlock);
213 dlm_lockres_put(res);
214 return NULL;
215 }
216 spin_unlock(&res->spinlock);
217 }
218
219 return res;
220}
221
157struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 222struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
158 const char *name, 223 const char *name,
159 unsigned int len) 224 unsigned int len)
@@ -330,43 +395,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
330 wake_up(&dlm_domain_events); 395 wake_up(&dlm_domain_events);
331} 396}
332 397
333static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) 398static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
334{ 399{
335 int i; 400 int i, num, n, ret = 0;
336 struct dlm_lock_resource *res; 401 struct dlm_lock_resource *res;
402 struct hlist_node *iter;
403 struct hlist_head *bucket;
404 int dropped;
337 405
338 mlog(0, "Migrating locks from domain %s\n", dlm->name); 406 mlog(0, "Migrating locks from domain %s\n", dlm->name);
339restart: 407
408 num = 0;
340 spin_lock(&dlm->spinlock); 409 spin_lock(&dlm->spinlock);
341 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 410 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
342 while (!hlist_empty(dlm_lockres_hash(dlm, i))) { 411redo_bucket:
343 res = hlist_entry(dlm_lockres_hash(dlm, i)->first, 412 n = 0;
344 struct dlm_lock_resource, hash_node); 413 bucket = dlm_lockres_hash(dlm, i);
345 /* need reference when manually grabbing lockres */ 414 iter = bucket->first;
415 while (iter) {
416 n++;
417 res = hlist_entry(iter, struct dlm_lock_resource,
418 hash_node);
346 dlm_lockres_get(res); 419 dlm_lockres_get(res);
347 /* this should unhash the lockres 420 /* migrate, if necessary. this will drop the dlm
348 * and exit with dlm->spinlock */ 421 * spinlock and retake it if it does migration. */
349 mlog(0, "purging res=%p\n", res); 422 dropped = dlm_empty_lockres(dlm, res);
350 if (dlm_lockres_is_dirty(dlm, res)) { 423
351 /* HACK! this should absolutely go. 424 spin_lock(&res->spinlock);
352 * need to figure out why some empty 425 __dlm_lockres_calc_usage(dlm, res);
353 * lockreses are still marked dirty */ 426 iter = res->hash_node.next;
354 mlog(ML_ERROR, "lockres %.*s dirty!\n", 427 spin_unlock(&res->spinlock);
355 res->lockname.len, res->lockname.name); 428
356
357 spin_unlock(&dlm->spinlock);
358 dlm_kick_thread(dlm, res);
359 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
360 dlm_lockres_put(res);
361 goto restart;
362 }
363 dlm_purge_lockres(dlm, res);
364 dlm_lockres_put(res); 429 dlm_lockres_put(res);
430
431 cond_resched_lock(&dlm->spinlock);
432
433 if (dropped)
434 goto redo_bucket;
365 } 435 }
436 num += n;
437 mlog(0, "%s: touched %d lockreses in bucket %d "
438 "(tot=%d)\n", dlm->name, n, i, num);
366 } 439 }
367 spin_unlock(&dlm->spinlock); 440 spin_unlock(&dlm->spinlock);
368 441 wake_up(&dlm->dlm_thread_wq);
442
443 /* let the dlm thread take care of purging, keep scanning until
444 * nothing remains in the hash */
445 if (num) {
446 mlog(0, "%s: %d lock resources in hash last pass\n",
447 dlm->name, num);
448 ret = -EAGAIN;
449 }
369 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 450 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
451 return ret;
370} 452}
371 453
372static int dlm_no_joining_node(struct dlm_ctxt *dlm) 454static int dlm_no_joining_node(struct dlm_ctxt *dlm)
@@ -418,7 +500,8 @@ static void __dlm_print_nodes(struct dlm_ctxt *dlm)
418 printk("\n"); 500 printk("\n");
419} 501}
420 502
421static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data) 503static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
504 void **ret_data)
422{ 505{
423 struct dlm_ctxt *dlm = data; 506 struct dlm_ctxt *dlm = data;
424 unsigned int node; 507 unsigned int node;
@@ -571,7 +654,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
571 /* We changed dlm state, notify the thread */ 654 /* We changed dlm state, notify the thread */
572 dlm_kick_thread(dlm, NULL); 655 dlm_kick_thread(dlm, NULL);
573 656
574 dlm_migrate_all_locks(dlm); 657 while (dlm_migrate_all_locks(dlm)) {
658 mlog(0, "%s: more migration to do\n", dlm->name);
659 }
575 dlm_mark_domain_leaving(dlm); 660 dlm_mark_domain_leaving(dlm);
576 dlm_leave_domain(dlm); 661 dlm_leave_domain(dlm);
577 dlm_complete_dlm_shutdown(dlm); 662 dlm_complete_dlm_shutdown(dlm);
@@ -580,11 +665,13 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
580} 665}
581EXPORT_SYMBOL_GPL(dlm_unregister_domain); 666EXPORT_SYMBOL_GPL(dlm_unregister_domain);
582 667
583static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) 668static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
669 void **ret_data)
584{ 670{
585 struct dlm_query_join_request *query; 671 struct dlm_query_join_request *query;
586 enum dlm_query_join_response response; 672 enum dlm_query_join_response response;
587 struct dlm_ctxt *dlm = NULL; 673 struct dlm_ctxt *dlm = NULL;
674 u8 nodenum;
588 675
589 query = (struct dlm_query_join_request *) msg->buf; 676 query = (struct dlm_query_join_request *) msg->buf;
590 677
@@ -608,6 +695,28 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
608 695
609 spin_lock(&dlm_domain_lock); 696 spin_lock(&dlm_domain_lock);
610 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 697 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
698 if (!dlm)
699 goto unlock_respond;
700
701 /*
702 * There is a small window where the joining node may not see the
703 * node(s) that just left but still part of the cluster. DISALLOW
704 * join request if joining node has different node map.
705 */
706 nodenum=0;
707 while (nodenum < O2NM_MAX_NODES) {
708 if (test_bit(nodenum, dlm->domain_map)) {
709 if (!byte_test_bit(nodenum, query->node_map)) {
710 mlog(0, "disallow join as node %u does not "
711 "have node %u in its nodemap\n",
712 query->node_idx, nodenum);
713 response = JOIN_DISALLOW;
714 goto unlock_respond;
715 }
716 }
717 nodenum++;
718 }
719
611 /* Once the dlm ctxt is marked as leaving then we don't want 720 /* Once the dlm ctxt is marked as leaving then we don't want
612 * to be put in someone's domain map. 721 * to be put in someone's domain map.
613 * Also, explicitly disallow joining at certain troublesome 722 * Also, explicitly disallow joining at certain troublesome
@@ -626,15 +735,15 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
626 /* Disallow parallel joins. */ 735 /* Disallow parallel joins. */
627 response = JOIN_DISALLOW; 736 response = JOIN_DISALLOW;
628 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 737 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
629 mlog(ML_NOTICE, "node %u trying to join, but recovery " 738 mlog(0, "node %u trying to join, but recovery "
630 "is ongoing.\n", bit); 739 "is ongoing.\n", bit);
631 response = JOIN_DISALLOW; 740 response = JOIN_DISALLOW;
632 } else if (test_bit(bit, dlm->recovery_map)) { 741 } else if (test_bit(bit, dlm->recovery_map)) {
633 mlog(ML_NOTICE, "node %u trying to join, but it " 742 mlog(0, "node %u trying to join, but it "
634 "still needs recovery.\n", bit); 743 "still needs recovery.\n", bit);
635 response = JOIN_DISALLOW; 744 response = JOIN_DISALLOW;
636 } else if (test_bit(bit, dlm->domain_map)) { 745 } else if (test_bit(bit, dlm->domain_map)) {
637 mlog(ML_NOTICE, "node %u trying to join, but it " 746 mlog(0, "node %u trying to join, but it "
638 "is still in the domain! needs recovery?\n", 747 "is still in the domain! needs recovery?\n",
639 bit); 748 bit);
640 response = JOIN_DISALLOW; 749 response = JOIN_DISALLOW;
@@ -649,6 +758,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
649 758
650 spin_unlock(&dlm->spinlock); 759 spin_unlock(&dlm->spinlock);
651 } 760 }
761unlock_respond:
652 spin_unlock(&dlm_domain_lock); 762 spin_unlock(&dlm_domain_lock);
653 763
654respond: 764respond:
@@ -657,7 +767,8 @@ respond:
657 return response; 767 return response;
658} 768}
659 769
660static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data) 770static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
771 void **ret_data)
661{ 772{
662 struct dlm_assert_joined *assert; 773 struct dlm_assert_joined *assert;
663 struct dlm_ctxt *dlm = NULL; 774 struct dlm_ctxt *dlm = NULL;
@@ -694,7 +805,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
694 return 0; 805 return 0;
695} 806}
696 807
697static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data) 808static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
809 void **ret_data)
698{ 810{
699 struct dlm_cancel_join *cancel; 811 struct dlm_cancel_join *cancel;
700 struct dlm_ctxt *dlm = NULL; 812 struct dlm_ctxt *dlm = NULL;
@@ -796,6 +908,9 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
796 join_msg.name_len = strlen(dlm->name); 908 join_msg.name_len = strlen(dlm->name);
797 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 909 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
798 910
911 /* copy live node map to join message */
912 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
913
799 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 914 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
800 sizeof(join_msg), node, &retval); 915 sizeof(join_msg), node, &retval);
801 if (status < 0 && status != -ENOPROTOOPT) { 916 if (status < 0 && status != -ENOPROTOOPT) {
@@ -1036,98 +1151,106 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1036 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1151 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1037 sizeof(struct dlm_master_request), 1152 sizeof(struct dlm_master_request),
1038 dlm_master_request_handler, 1153 dlm_master_request_handler,
1039 dlm, &dlm->dlm_domain_handlers); 1154 dlm, NULL, &dlm->dlm_domain_handlers);
1040 if (status) 1155 if (status)
1041 goto bail; 1156 goto bail;
1042 1157
1043 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1158 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1044 sizeof(struct dlm_assert_master), 1159 sizeof(struct dlm_assert_master),
1045 dlm_assert_master_handler, 1160 dlm_assert_master_handler,
1046 dlm, &dlm->dlm_domain_handlers); 1161 dlm, dlm_assert_master_post_handler,
1162 &dlm->dlm_domain_handlers);
1047 if (status) 1163 if (status)
1048 goto bail; 1164 goto bail;
1049 1165
1050 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1166 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1051 sizeof(struct dlm_create_lock), 1167 sizeof(struct dlm_create_lock),
1052 dlm_create_lock_handler, 1168 dlm_create_lock_handler,
1053 dlm, &dlm->dlm_domain_handlers); 1169 dlm, NULL, &dlm->dlm_domain_handlers);
1054 if (status) 1170 if (status)
1055 goto bail; 1171 goto bail;
1056 1172
1057 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1173 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1058 DLM_CONVERT_LOCK_MAX_LEN, 1174 DLM_CONVERT_LOCK_MAX_LEN,
1059 dlm_convert_lock_handler, 1175 dlm_convert_lock_handler,
1060 dlm, &dlm->dlm_domain_handlers); 1176 dlm, NULL, &dlm->dlm_domain_handlers);
1061 if (status) 1177 if (status)
1062 goto bail; 1178 goto bail;
1063 1179
1064 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1180 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1065 DLM_UNLOCK_LOCK_MAX_LEN, 1181 DLM_UNLOCK_LOCK_MAX_LEN,
1066 dlm_unlock_lock_handler, 1182 dlm_unlock_lock_handler,
1067 dlm, &dlm->dlm_domain_handlers); 1183 dlm, NULL, &dlm->dlm_domain_handlers);
1068 if (status) 1184 if (status)
1069 goto bail; 1185 goto bail;
1070 1186
1071 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1187 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1072 DLM_PROXY_AST_MAX_LEN, 1188 DLM_PROXY_AST_MAX_LEN,
1073 dlm_proxy_ast_handler, 1189 dlm_proxy_ast_handler,
1074 dlm, &dlm->dlm_domain_handlers); 1190 dlm, NULL, &dlm->dlm_domain_handlers);
1075 if (status) 1191 if (status)
1076 goto bail; 1192 goto bail;
1077 1193
1078 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1194 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1079 sizeof(struct dlm_exit_domain), 1195 sizeof(struct dlm_exit_domain),
1080 dlm_exit_domain_handler, 1196 dlm_exit_domain_handler,
1081 dlm, &dlm->dlm_domain_handlers); 1197 dlm, NULL, &dlm->dlm_domain_handlers);
1198 if (status)
1199 goto bail;
1200
1201 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1202 sizeof(struct dlm_deref_lockres),
1203 dlm_deref_lockres_handler,
1204 dlm, NULL, &dlm->dlm_domain_handlers);
1082 if (status) 1205 if (status)
1083 goto bail; 1206 goto bail;
1084 1207
1085 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1208 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1086 sizeof(struct dlm_migrate_request), 1209 sizeof(struct dlm_migrate_request),
1087 dlm_migrate_request_handler, 1210 dlm_migrate_request_handler,
1088 dlm, &dlm->dlm_domain_handlers); 1211 dlm, NULL, &dlm->dlm_domain_handlers);
1089 if (status) 1212 if (status)
1090 goto bail; 1213 goto bail;
1091 1214
1092 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1215 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1093 DLM_MIG_LOCKRES_MAX_LEN, 1216 DLM_MIG_LOCKRES_MAX_LEN,
1094 dlm_mig_lockres_handler, 1217 dlm_mig_lockres_handler,
1095 dlm, &dlm->dlm_domain_handlers); 1218 dlm, NULL, &dlm->dlm_domain_handlers);
1096 if (status) 1219 if (status)
1097 goto bail; 1220 goto bail;
1098 1221
1099 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1222 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1100 sizeof(struct dlm_master_requery), 1223 sizeof(struct dlm_master_requery),
1101 dlm_master_requery_handler, 1224 dlm_master_requery_handler,
1102 dlm, &dlm->dlm_domain_handlers); 1225 dlm, NULL, &dlm->dlm_domain_handlers);
1103 if (status) 1226 if (status)
1104 goto bail; 1227 goto bail;
1105 1228
1106 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1229 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1107 sizeof(struct dlm_lock_request), 1230 sizeof(struct dlm_lock_request),
1108 dlm_request_all_locks_handler, 1231 dlm_request_all_locks_handler,
1109 dlm, &dlm->dlm_domain_handlers); 1232 dlm, NULL, &dlm->dlm_domain_handlers);
1110 if (status) 1233 if (status)
1111 goto bail; 1234 goto bail;
1112 1235
1113 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1236 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1114 sizeof(struct dlm_reco_data_done), 1237 sizeof(struct dlm_reco_data_done),
1115 dlm_reco_data_done_handler, 1238 dlm_reco_data_done_handler,
1116 dlm, &dlm->dlm_domain_handlers); 1239 dlm, NULL, &dlm->dlm_domain_handlers);
1117 if (status) 1240 if (status)
1118 goto bail; 1241 goto bail;
1119 1242
1120 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1243 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1121 sizeof(struct dlm_begin_reco), 1244 sizeof(struct dlm_begin_reco),
1122 dlm_begin_reco_handler, 1245 dlm_begin_reco_handler,
1123 dlm, &dlm->dlm_domain_handlers); 1246 dlm, NULL, &dlm->dlm_domain_handlers);
1124 if (status) 1247 if (status)
1125 goto bail; 1248 goto bail;
1126 1249
1127 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1250 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1128 sizeof(struct dlm_finalize_reco), 1251 sizeof(struct dlm_finalize_reco),
1129 dlm_finalize_reco_handler, 1252 dlm_finalize_reco_handler,
1130 dlm, &dlm->dlm_domain_handlers); 1253 dlm, NULL, &dlm->dlm_domain_handlers);
1131 if (status) 1254 if (status)
1132 goto bail; 1255 goto bail;
1133 1256
@@ -1141,6 +1264,8 @@ bail:
1141static int dlm_join_domain(struct dlm_ctxt *dlm) 1264static int dlm_join_domain(struct dlm_ctxt *dlm)
1142{ 1265{
1143 int status; 1266 int status;
1267 unsigned int backoff;
1268 unsigned int total_backoff = 0;
1144 1269
1145 BUG_ON(!dlm); 1270 BUG_ON(!dlm);
1146 1271
@@ -1172,18 +1297,27 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1172 } 1297 }
1173 1298
1174 do { 1299 do {
1175 unsigned int backoff;
1176 status = dlm_try_to_join_domain(dlm); 1300 status = dlm_try_to_join_domain(dlm);
1177 1301
1178 /* If we're racing another node to the join, then we 1302 /* If we're racing another node to the join, then we
1179 * need to back off temporarily and let them 1303 * need to back off temporarily and let them
1180 * complete. */ 1304 * complete. */
1305#define DLM_JOIN_TIMEOUT_MSECS 90000
1181 if (status == -EAGAIN) { 1306 if (status == -EAGAIN) {
1182 if (signal_pending(current)) { 1307 if (signal_pending(current)) {
1183 status = -ERESTARTSYS; 1308 status = -ERESTARTSYS;
1184 goto bail; 1309 goto bail;
1185 } 1310 }
1186 1311
1312 if (total_backoff >
1313 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1314 status = -ERESTARTSYS;
1315 mlog(ML_NOTICE, "Timed out joining dlm domain "
1316 "%s after %u msecs\n", dlm->name,
1317 jiffies_to_msecs(total_backoff));
1318 goto bail;
1319 }
1320
1187 /* 1321 /*
1188 * <chip> After you! 1322 * <chip> After you!
1189 * <dale> No, after you! 1323 * <dale> No, after you!
@@ -1193,6 +1327,7 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1193 */ 1327 */
1194 backoff = (unsigned int)(jiffies & 0x3); 1328 backoff = (unsigned int)(jiffies & 0x3);
1195 backoff *= DLM_DOMAIN_BACKOFF_MS; 1329 backoff *= DLM_DOMAIN_BACKOFF_MS;
1330 total_backoff += backoff;
1196 mlog(0, "backoff %d\n", backoff); 1331 mlog(0, "backoff %d\n", backoff);
1197 msleep(backoff); 1332 msleep(backoff);
1198 } 1333 }
@@ -1421,21 +1556,21 @@ static int dlm_register_net_handlers(void)
1421 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1556 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1422 sizeof(struct dlm_query_join_request), 1557 sizeof(struct dlm_query_join_request),
1423 dlm_query_join_handler, 1558 dlm_query_join_handler,
1424 NULL, &dlm_join_handlers); 1559 NULL, NULL, &dlm_join_handlers);
1425 if (status) 1560 if (status)
1426 goto bail; 1561 goto bail;
1427 1562
1428 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1563 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1429 sizeof(struct dlm_assert_joined), 1564 sizeof(struct dlm_assert_joined),
1430 dlm_assert_joined_handler, 1565 dlm_assert_joined_handler,
1431 NULL, &dlm_join_handlers); 1566 NULL, NULL, &dlm_join_handlers);
1432 if (status) 1567 if (status)
1433 goto bail; 1568 goto bail;
1434 1569
1435 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1570 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1436 sizeof(struct dlm_cancel_join), 1571 sizeof(struct dlm_cancel_join),
1437 dlm_cancel_join_handler, 1572 dlm_cancel_join_handler,
1438 NULL, &dlm_join_handlers); 1573 NULL, NULL, &dlm_join_handlers);
1439 1574
1440bail: 1575bail:
1441 if (status < 0) 1576 if (status < 0)