aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h30
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c21
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c401
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c49
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c22
-rw-r--r--fs/ocfs2/dlm/dlmthread.c114
6 files changed, 551 insertions, 86 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 4b6ae2c13b47..b36d0bf77a5a 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -445,7 +445,9 @@ enum {
445 DLM_LOCK_REQUEST_MSG, /* 515 */ 445 DLM_LOCK_REQUEST_MSG, /* 515 */
446 DLM_RECO_DATA_DONE_MSG, /* 516 */ 446 DLM_RECO_DATA_DONE_MSG, /* 516 */
447 DLM_BEGIN_RECO_MSG, /* 517 */ 447 DLM_BEGIN_RECO_MSG, /* 517 */
448 DLM_FINALIZE_RECO_MSG /* 518 */ 448 DLM_FINALIZE_RECO_MSG, /* 518 */
449 DLM_QUERY_REGION, /* 519 */
450 DLM_QUERY_NODEINFO, /* 520 */
449}; 451};
450 452
451struct dlm_reco_node_data 453struct dlm_reco_node_data
@@ -727,6 +729,31 @@ struct dlm_cancel_join
727 u8 domain[O2NM_MAX_NAME_LEN]; 729 u8 domain[O2NM_MAX_NAME_LEN];
728}; 730};
729 731
732struct dlm_query_region {
733 u8 qr_node;
734 u8 qr_numregions;
735 u8 qr_namelen;
736 u8 pad1;
737 u8 qr_domain[O2NM_MAX_NAME_LEN];
738 u8 qr_regions[O2HB_MAX_REGION_NAME_LEN * O2NM_MAX_REGIONS];
739};
740
741struct dlm_node_info {
742 u8 ni_nodenum;
743 u8 pad1;
744 u16 ni_ipv4_port;
745 u32 ni_ipv4_address;
746};
747
748struct dlm_query_nodeinfo {
749 u8 qn_nodenum;
750 u8 qn_numnodes;
751 u8 qn_namelen;
752 u8 pad1;
753 u8 qn_domain[O2NM_MAX_NAME_LEN];
754 struct dlm_node_info qn_nodes[O2NM_MAX_NODES];
755};
756
730struct dlm_exit_domain 757struct dlm_exit_domain
731{ 758{
732 u8 node_idx; 759 u8 node_idx;
@@ -1030,6 +1057,7 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
1030 struct dlm_lock_resource *res); 1057 struct dlm_lock_resource *res);
1031void dlm_clean_master_list(struct dlm_ctxt *dlm, 1058void dlm_clean_master_list(struct dlm_ctxt *dlm,
1032 u8 dead_node); 1059 u8 dead_node);
1060void dlm_force_free_mles(struct dlm_ctxt *dlm);
1033int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); 1061int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
1034int __dlm_lockres_has_locks(struct dlm_lock_resource *res); 1062int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
1035int __dlm_lockres_unused(struct dlm_lock_resource *res); 1063int __dlm_lockres_unused(struct dlm_lock_resource *res);
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 5efdd37dfe48..272ec8631a51 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -493,7 +493,7 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db)
493 struct hlist_head *bucket; 493 struct hlist_head *bucket;
494 struct hlist_node *list; 494 struct hlist_node *list;
495 int i, out = 0; 495 int i, out = 0;
496 unsigned long total = 0, longest = 0, bktcnt; 496 unsigned long total = 0, longest = 0, bucket_count = 0;
497 497
498 out += snprintf(db->buf + out, db->len - out, 498 out += snprintf(db->buf + out, db->len - out,
499 "Dumping MLEs for Domain: %s\n", dlm->name); 499 "Dumping MLEs for Domain: %s\n", dlm->name);
@@ -505,13 +505,13 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db)
505 mle = hlist_entry(list, struct dlm_master_list_entry, 505 mle = hlist_entry(list, struct dlm_master_list_entry,
506 master_hash_node); 506 master_hash_node);
507 ++total; 507 ++total;
508 ++bktcnt; 508 ++bucket_count;
509 if (db->len - out < 200) 509 if (db->len - out < 200)
510 continue; 510 continue;
511 out += dump_mle(mle, db->buf + out, db->len - out); 511 out += dump_mle(mle, db->buf + out, db->len - out);
512 } 512 }
513 longest = max(longest, bktcnt); 513 longest = max(longest, bucket_count);
514 bktcnt = 0; 514 bucket_count = 0;
515 } 515 }
516 spin_unlock(&dlm->master_lock); 516 spin_unlock(&dlm->master_lock);
517 517
@@ -636,8 +636,14 @@ static void *lockres_seq_start(struct seq_file *m, loff_t *pos)
636 spin_lock(&dlm->track_lock); 636 spin_lock(&dlm->track_lock);
637 if (oldres) 637 if (oldres)
638 track_list = &oldres->tracking; 638 track_list = &oldres->tracking;
639 else 639 else {
640 track_list = &dlm->tracking_list; 640 track_list = &dlm->tracking_list;
641 if (list_empty(track_list)) {
642 dl = NULL;
643 spin_unlock(&dlm->track_lock);
644 goto bail;
645 }
646 }
641 647
642 list_for_each_entry(res, track_list, tracking) { 648 list_for_each_entry(res, track_list, tracking) {
643 if (&res->tracking == &dlm->tracking_list) 649 if (&res->tracking == &dlm->tracking_list)
@@ -660,6 +666,7 @@ static void *lockres_seq_start(struct seq_file *m, loff_t *pos)
660 } else 666 } else
661 dl = NULL; 667 dl = NULL;
662 668
669bail:
663 /* passed to seq_show */ 670 /* passed to seq_show */
664 return dl; 671 return dl;
665} 672}
@@ -775,7 +782,9 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db)
775 782
776 /* Domain: xxxxxxxxxx Key: 0xdfbac769 */ 783 /* Domain: xxxxxxxxxx Key: 0xdfbac769 */
777 out += snprintf(db->buf + out, db->len - out, 784 out += snprintf(db->buf + out, db->len - out,
778 "Domain: %s Key: 0x%08x\n", dlm->name, dlm->key); 785 "Domain: %s Key: 0x%08x Protocol: %d.%d\n",
786 dlm->name, dlm->key, dlm->dlm_locking_proto.pv_major,
787 dlm->dlm_locking_proto.pv_minor);
779 788
780 /* Thread Pid: xxx Node: xxx State: xxxxx */ 789 /* Thread Pid: xxx Node: xxx State: xxxxx */
781 out += snprintf(db->buf + out, db->len - out, 790 out += snprintf(db->buf + out, db->len - out,
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 153abb5abef0..58a93b953735 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -128,10 +128,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
128 * will have a negotiated version with the same major number and a minor 128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using. 130 * be used to determine what a running domain is actually using.
131 *
132 * New in version 1.1:
133 * - Message DLM_QUERY_REGION added to support global heartbeat
134 * - Message DLM_QUERY_NODEINFO added to allow online node removes
131 */ 135 */
132static const struct dlm_protocol_version dlm_protocol = { 136static const struct dlm_protocol_version dlm_protocol = {
133 .pv_major = 1, 137 .pv_major = 1,
134 .pv_minor = 0, 138 .pv_minor = 1,
135}; 139};
136 140
137#define DLM_DOMAIN_BACKOFF_MS 200 141#define DLM_DOMAIN_BACKOFF_MS 200
@@ -142,6 +146,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
142 void **ret_data); 146 void **ret_data);
143static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 147static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
144 void **ret_data); 148 void **ret_data);
149static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
150 void *data, void **ret_data);
145static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 151static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
146 void **ret_data); 152 void **ret_data);
147static int dlm_protocol_compare(struct dlm_protocol_version *existing, 153static int dlm_protocol_compare(struct dlm_protocol_version *existing,
@@ -693,6 +699,7 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
693 699
694 dlm_mark_domain_leaving(dlm); 700 dlm_mark_domain_leaving(dlm);
695 dlm_leave_domain(dlm); 701 dlm_leave_domain(dlm);
702 dlm_force_free_mles(dlm);
696 dlm_complete_dlm_shutdown(dlm); 703 dlm_complete_dlm_shutdown(dlm);
697 } 704 }
698 dlm_put(dlm); 705 dlm_put(dlm);
@@ -920,6 +927,370 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
920 return 0; 927 return 0;
921} 928}
922 929
930static int dlm_match_regions(struct dlm_ctxt *dlm,
931 struct dlm_query_region *qr)
932{
933 char *local = NULL, *remote = qr->qr_regions;
934 char *l, *r;
935 int localnr, i, j, foundit;
936 int status = 0;
937
938 if (!o2hb_global_heartbeat_active()) {
939 if (qr->qr_numregions) {
940 mlog(ML_ERROR, "Domain %s: Joining node %d has global "
941 "heartbeat enabled but local node %d does not\n",
942 qr->qr_domain, qr->qr_node, dlm->node_num);
943 status = -EINVAL;
944 }
945 goto bail;
946 }
947
948 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
949 mlog(ML_ERROR, "Domain %s: Local node %d has global "
950 "heartbeat enabled but joining node %d does not\n",
951 qr->qr_domain, dlm->node_num, qr->qr_node);
952 status = -EINVAL;
953 goto bail;
954 }
955
956 r = remote;
957 for (i = 0; i < qr->qr_numregions; ++i) {
958 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
959 r += O2HB_MAX_REGION_NAME_LEN;
960 }
961
962 local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
963 if (!local) {
964 status = -ENOMEM;
965 goto bail;
966 }
967
968 localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
969
970 /* compare local regions with remote */
971 l = local;
972 for (i = 0; i < localnr; ++i) {
973 foundit = 0;
974 r = remote;
975 for (j = 0; j <= qr->qr_numregions; ++j) {
976 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
977 foundit = 1;
978 break;
979 }
980 r += O2HB_MAX_REGION_NAME_LEN;
981 }
982 if (!foundit) {
983 status = -EINVAL;
984 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
985 "in local node %d but not in joining node %d\n",
986 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
987 dlm->node_num, qr->qr_node);
988 goto bail;
989 }
990 l += O2HB_MAX_REGION_NAME_LEN;
991 }
992
993 /* compare remote with local regions */
994 r = remote;
995 for (i = 0; i < qr->qr_numregions; ++i) {
996 foundit = 0;
997 l = local;
998 for (j = 0; j < localnr; ++j) {
999 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1000 foundit = 1;
1001 break;
1002 }
1003 l += O2HB_MAX_REGION_NAME_LEN;
1004 }
1005 if (!foundit) {
1006 status = -EINVAL;
1007 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1008 "in joining node %d but not in local node %d\n",
1009 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1010 qr->qr_node, dlm->node_num);
1011 goto bail;
1012 }
1013 r += O2HB_MAX_REGION_NAME_LEN;
1014 }
1015
1016bail:
1017 kfree(local);
1018
1019 return status;
1020}
1021
1022static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1023{
1024 struct dlm_query_region *qr = NULL;
1025 int status, ret = 0, i;
1026 char *p;
1027
1028 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1029 goto bail;
1030
1031 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1032 if (!qr) {
1033 ret = -ENOMEM;
1034 mlog_errno(ret);
1035 goto bail;
1036 }
1037
1038 qr->qr_node = dlm->node_num;
1039 qr->qr_namelen = strlen(dlm->name);
1040 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1041 /* if local hb, the numregions will be zero */
1042 if (o2hb_global_heartbeat_active())
1043 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1044 O2NM_MAX_REGIONS);
1045
1046 p = qr->qr_regions;
1047 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1048 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1049
1050 i = -1;
1051 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1052 i + 1)) < O2NM_MAX_NODES) {
1053 if (i == dlm->node_num)
1054 continue;
1055
1056 mlog(0, "Sending regions to node %d\n", i);
1057
1058 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1059 sizeof(struct dlm_query_region),
1060 i, &status);
1061 if (ret >= 0)
1062 ret = status;
1063 if (ret) {
1064 mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1065 ret, i);
1066 break;
1067 }
1068 }
1069
1070bail:
1071 kfree(qr);
1072 return ret;
1073}
1074
1075static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1076 void *data, void **ret_data)
1077{
1078 struct dlm_query_region *qr;
1079 struct dlm_ctxt *dlm = NULL;
1080 int status = 0;
1081 int locked = 0;
1082
1083 qr = (struct dlm_query_region *) msg->buf;
1084
1085 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1086 qr->qr_domain);
1087
1088 status = -EINVAL;
1089
1090 spin_lock(&dlm_domain_lock);
1091 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1092 if (!dlm) {
1093 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1094 "before join domain\n", qr->qr_node, qr->qr_domain);
1095 goto bail;
1096 }
1097
1098 spin_lock(&dlm->spinlock);
1099 locked = 1;
1100 if (dlm->joining_node != qr->qr_node) {
1101 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1102 "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1103 dlm->joining_node);
1104 goto bail;
1105 }
1106
1107 /* Support for global heartbeat was added in 1.1 */
1108 if (dlm->dlm_locking_proto.pv_major == 1 &&
1109 dlm->dlm_locking_proto.pv_minor == 0) {
1110 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1111 "but active dlm protocol is %d.%d\n", qr->qr_node,
1112 qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1113 dlm->dlm_locking_proto.pv_minor);
1114 goto bail;
1115 }
1116
1117 status = dlm_match_regions(dlm, qr);
1118
1119bail:
1120 if (locked)
1121 spin_unlock(&dlm->spinlock);
1122 spin_unlock(&dlm_domain_lock);
1123
1124 return status;
1125}
1126
1127static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1128{
1129 struct o2nm_node *local;
1130 struct dlm_node_info *remote;
1131 int i, j;
1132 int status = 0;
1133
1134 for (j = 0; j < qn->qn_numnodes; ++j)
1135 mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1136 &(qn->qn_nodes[j].ni_ipv4_address),
1137 ntohs(qn->qn_nodes[j].ni_ipv4_port));
1138
1139 for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1140 local = o2nm_get_node_by_num(i);
1141 remote = NULL;
1142 for (j = 0; j < qn->qn_numnodes; ++j) {
1143 if (qn->qn_nodes[j].ni_nodenum == i) {
1144 remote = &(qn->qn_nodes[j]);
1145 break;
1146 }
1147 }
1148
1149 if (!local && !remote)
1150 continue;
1151
1152 if ((local && !remote) || (!local && remote))
1153 status = -EINVAL;
1154
1155 if (!status &&
1156 ((remote->ni_nodenum != local->nd_num) ||
1157 (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1158 (remote->ni_ipv4_address != local->nd_ipv4_address)))
1159 status = -EINVAL;
1160
1161 if (status) {
1162 if (remote && !local)
1163 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1164 "registered in joining node %d but not in "
1165 "local node %d\n", qn->qn_domain,
1166 remote->ni_nodenum,
1167 &(remote->ni_ipv4_address),
1168 ntohs(remote->ni_ipv4_port),
1169 qn->qn_nodenum, dlm->node_num);
1170 if (local && !remote)
1171 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1172 "registered in local node %d but not in "
1173 "joining node %d\n", qn->qn_domain,
1174 local->nd_num, &(local->nd_ipv4_address),
1175 ntohs(local->nd_ipv4_port),
1176 dlm->node_num, qn->qn_nodenum);
1177 BUG_ON((!local && !remote));
1178 }
1179
1180 if (local)
1181 o2nm_node_put(local);
1182 }
1183
1184 return status;
1185}
1186
1187static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1188{
1189 struct dlm_query_nodeinfo *qn = NULL;
1190 struct o2nm_node *node;
1191 int ret = 0, status, count, i;
1192
1193 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1194 goto bail;
1195
1196 qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1197 if (!qn) {
1198 ret = -ENOMEM;
1199 mlog_errno(ret);
1200 goto bail;
1201 }
1202
1203 for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1204 node = o2nm_get_node_by_num(i);
1205 if (!node)
1206 continue;
1207 qn->qn_nodes[count].ni_nodenum = node->nd_num;
1208 qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1209 qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1210 mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1211 &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1212 ++count;
1213 o2nm_node_put(node);
1214 }
1215
1216 qn->qn_nodenum = dlm->node_num;
1217 qn->qn_numnodes = count;
1218 qn->qn_namelen = strlen(dlm->name);
1219 memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1220
1221 i = -1;
1222 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1223 i + 1)) < O2NM_MAX_NODES) {
1224 if (i == dlm->node_num)
1225 continue;
1226
1227 mlog(0, "Sending nodeinfo to node %d\n", i);
1228
1229 ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1230 qn, sizeof(struct dlm_query_nodeinfo),
1231 i, &status);
1232 if (ret >= 0)
1233 ret = status;
1234 if (ret) {
1235 mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1236 break;
1237 }
1238 }
1239
1240bail:
1241 kfree(qn);
1242 return ret;
1243}
1244
1245static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1246 void *data, void **ret_data)
1247{
1248 struct dlm_query_nodeinfo *qn;
1249 struct dlm_ctxt *dlm = NULL;
1250 int locked = 0, status = -EINVAL;
1251
1252 qn = (struct dlm_query_nodeinfo *) msg->buf;
1253
1254 mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1255 qn->qn_domain);
1256
1257 spin_lock(&dlm_domain_lock);
1258 dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1259 if (!dlm) {
1260 mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1261 "join domain\n", qn->qn_nodenum, qn->qn_domain);
1262 goto bail;
1263 }
1264
1265 spin_lock(&dlm->spinlock);
1266 locked = 1;
1267 if (dlm->joining_node != qn->qn_nodenum) {
1268 mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1269 "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1270 dlm->joining_node);
1271 goto bail;
1272 }
1273
1274 /* Support for node query was added in 1.1 */
1275 if (dlm->dlm_locking_proto.pv_major == 1 &&
1276 dlm->dlm_locking_proto.pv_minor == 0) {
1277 mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1278 "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1279 qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1280 dlm->dlm_locking_proto.pv_minor);
1281 goto bail;
1282 }
1283
1284 status = dlm_match_nodes(dlm, qn);
1285
1286bail:
1287 if (locked)
1288 spin_unlock(&dlm->spinlock);
1289 spin_unlock(&dlm_domain_lock);
1290
1291 return status;
1292}
1293
923static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 1294static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
924 void **ret_data) 1295 void **ret_data)
925{ 1296{
@@ -1240,6 +1611,20 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1240 set_bit(dlm->node_num, dlm->domain_map); 1611 set_bit(dlm->node_num, dlm->domain_map);
1241 spin_unlock(&dlm->spinlock); 1612 spin_unlock(&dlm->spinlock);
1242 1613
1614 /* Support for global heartbeat and node info was added in 1.1 */
1615 if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
1616 status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1617 if (status) {
1618 mlog_errno(status);
1619 goto bail;
1620 }
1621 status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1622 if (status) {
1623 mlog_errno(status);
1624 goto bail;
1625 }
1626 }
1627
1243 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1628 dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1244 1629
1245 /* Joined state *must* be set before the joining node 1630 /* Joined state *must* be set before the joining node
@@ -1806,7 +2191,21 @@ static int dlm_register_net_handlers(void)
1806 sizeof(struct dlm_cancel_join), 2191 sizeof(struct dlm_cancel_join),
1807 dlm_cancel_join_handler, 2192 dlm_cancel_join_handler,
1808 NULL, NULL, &dlm_join_handlers); 2193 NULL, NULL, &dlm_join_handlers);
2194 if (status)
2195 goto bail;
2196
2197 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2198 sizeof(struct dlm_query_region),
2199 dlm_query_region_handler,
2200 NULL, NULL, &dlm_join_handlers);
1809 2201
2202 if (status)
2203 goto bail;
2204
2205 status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2206 sizeof(struct dlm_query_nodeinfo),
2207 dlm_query_nodeinfo_handler,
2208 NULL, NULL, &dlm_join_handlers);
1810bail: 2209bail:
1811 if (status < 0) 2210 if (status < 0)
1812 dlm_unregister_net_handlers(); 2211 dlm_unregister_net_handlers();
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 94b97fc6a88e..f564b0e5f80d 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -511,8 +511,6 @@ static void dlm_lockres_release(struct kref *kref)
511 511
512 atomic_dec(&dlm->res_cur_count); 512 atomic_dec(&dlm->res_cur_count);
513 513
514 dlm_put(dlm);
515
516 if (!hlist_unhashed(&res->hash_node) || 514 if (!hlist_unhashed(&res->hash_node) ||
517 !list_empty(&res->granted) || 515 !list_empty(&res->granted) ||
518 !list_empty(&res->converting) || 516 !list_empty(&res->converting) ||
@@ -585,8 +583,6 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
585 res->migration_pending = 0; 583 res->migration_pending = 0;
586 res->inflight_locks = 0; 584 res->inflight_locks = 0;
587 585
588 /* put in dlm_lockres_release */
589 dlm_grab(dlm);
590 res->dlm = dlm; 586 res->dlm = dlm;
591 587
592 kref_init(&res->refs); 588 kref_init(&res->refs);
@@ -3050,8 +3046,6 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3050 /* check for pre-existing lock */ 3046 /* check for pre-existing lock */
3051 spin_lock(&dlm->spinlock); 3047 spin_lock(&dlm->spinlock);
3052 res = __dlm_lookup_lockres(dlm, name, namelen, hash); 3048 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3053 spin_lock(&dlm->master_lock);
3054
3055 if (res) { 3049 if (res) {
3056 spin_lock(&res->spinlock); 3050 spin_lock(&res->spinlock);
3057 if (res->state & DLM_LOCK_RES_RECOVERING) { 3051 if (res->state & DLM_LOCK_RES_RECOVERING) {
@@ -3069,14 +3063,15 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3069 spin_unlock(&res->spinlock); 3063 spin_unlock(&res->spinlock);
3070 } 3064 }
3071 3065
3066 spin_lock(&dlm->master_lock);
3072 /* ignore status. only nonzero status would BUG. */ 3067 /* ignore status. only nonzero status would BUG. */
3073 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, 3068 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3074 name, namelen, 3069 name, namelen,
3075 migrate->new_master, 3070 migrate->new_master,
3076 migrate->master); 3071 migrate->master);
3077 3072
3078unlock:
3079 spin_unlock(&dlm->master_lock); 3073 spin_unlock(&dlm->master_lock);
3074unlock:
3080 spin_unlock(&dlm->spinlock); 3075 spin_unlock(&dlm->spinlock);
3081 3076
3082 if (oldmle) { 3077 if (oldmle) {
@@ -3438,3 +3433,43 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3438 wake_up(&res->wq); 3433 wake_up(&res->wq);
3439 wake_up(&dlm->migration_wq); 3434 wake_up(&dlm->migration_wq);
3440} 3435}
3436
3437void dlm_force_free_mles(struct dlm_ctxt *dlm)
3438{
3439 int i;
3440 struct hlist_head *bucket;
3441 struct dlm_master_list_entry *mle;
3442 struct hlist_node *tmp, *list;
3443
3444 /*
3445 * We notified all other nodes that we are exiting the domain and
3446 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3447 * around we force free them and wake any processes that are waiting
3448 * on the mles
3449 */
3450 spin_lock(&dlm->spinlock);
3451 spin_lock(&dlm->master_lock);
3452
3453 BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3454 BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3455
3456 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3457 bucket = dlm_master_hash(dlm, i);
3458 hlist_for_each_safe(list, tmp, bucket) {
3459 mle = hlist_entry(list, struct dlm_master_list_entry,
3460 master_hash_node);
3461 if (mle->type != DLM_MLE_BLOCK) {
3462 mlog(ML_ERROR, "bad mle: %p\n", mle);
3463 dlm_print_one_mle(mle);
3464 }
3465 atomic_set(&mle->woken, 1);
3466 wake_up(&mle->wq);
3467
3468 __dlm_unlink_mle(dlm, mle);
3469 __dlm_mle_detach_hb_events(dlm, mle);
3470 __dlm_put_mle(mle);
3471 }
3472 }
3473 spin_unlock(&dlm->master_lock);
3474 spin_unlock(&dlm->spinlock);
3475}
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 9dfaac73b36d..aaaffbcbe916 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -1997,6 +1997,8 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
1997 struct list_head *queue; 1997 struct list_head *queue;
1998 struct dlm_lock *lock, *next; 1998 struct dlm_lock *lock, *next;
1999 1999
2000 assert_spin_locked(&dlm->spinlock);
2001 assert_spin_locked(&res->spinlock);
2000 res->state |= DLM_LOCK_RES_RECOVERING; 2002 res->state |= DLM_LOCK_RES_RECOVERING;
2001 if (!list_empty(&res->recovering)) { 2003 if (!list_empty(&res->recovering)) {
2002 mlog(0, 2004 mlog(0,
@@ -2326,19 +2328,15 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2326 /* zero the lvb if necessary */ 2328 /* zero the lvb if necessary */
2327 dlm_revalidate_lvb(dlm, res, dead_node); 2329 dlm_revalidate_lvb(dlm, res, dead_node);
2328 if (res->owner == dead_node) { 2330 if (res->owner == dead_node) {
2329 if (res->state & DLM_LOCK_RES_DROPPING_REF) 2331 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
2330 mlog(0, "%s:%.*s: owned by " 2332 mlog(ML_NOTICE, "Ignore %.*s for "
2331 "dead node %u, this node was " 2333 "recovery as it is being freed\n",
2332 "dropping its ref when it died. " 2334 res->lockname.len,
2333 "continue, dropping the flag.\n", 2335 res->lockname.name);
2334 dlm->name, res->lockname.len, 2336 } else
2335 res->lockname.name, dead_node); 2337 dlm_move_lockres_to_recovery_list(dlm,
2336 2338 res);
2337 /* the wake_up for this will happen when the
2338 * RECOVERING flag is dropped later */
2339 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
2340 2339
2341 dlm_move_lockres_to_recovery_list(dlm, res);
2342 } else if (res->owner == dlm->node_num) { 2340 } else if (res->owner == dlm->node_num) {
2343 dlm_free_dead_locks(dlm, res, dead_node); 2341 dlm_free_dead_locks(dlm, res, dead_node);
2344 __dlm_lockres_calc_usage(dlm, res); 2342 __dlm_lockres_calc_usage(dlm, res);
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index d4f73ca68fe5..2211acf33d9b 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -92,19 +92,27 @@ int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
92 * truly ready to be freed. */ 92 * truly ready to be freed. */
93int __dlm_lockres_unused(struct dlm_lock_resource *res) 93int __dlm_lockres_unused(struct dlm_lock_resource *res)
94{ 94{
95 if (!__dlm_lockres_has_locks(res) && 95 int bit;
96 (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) { 96
97 /* try not to scan the bitmap unless the first two 97 if (__dlm_lockres_has_locks(res))
98 * conditions are already true */ 98 return 0;
99 int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 99
100 if (bit >= O2NM_MAX_NODES) { 100 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
101 /* since the bit for dlm->node_num is not 101 return 0;
102 * set, inflight_locks better be zero */ 102
103 BUG_ON(res->inflight_locks != 0); 103 if (res->state & DLM_LOCK_RES_RECOVERING)
104 return 1; 104 return 0;
105 } 105
106 } 106 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
107 return 0; 107 if (bit < O2NM_MAX_NODES)
108 return 0;
109
110 /*
111 * since the bit for dlm->node_num is not set, inflight_locks better
112 * be zero
113 */
114 BUG_ON(res->inflight_locks != 0);
115 return 1;
108} 116}
109 117
110 118
@@ -152,45 +160,25 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
152 spin_unlock(&dlm->spinlock); 160 spin_unlock(&dlm->spinlock);
153} 161}
154 162
155static int dlm_purge_lockres(struct dlm_ctxt *dlm, 163static void dlm_purge_lockres(struct dlm_ctxt *dlm,
156 struct dlm_lock_resource *res) 164 struct dlm_lock_resource *res)
157{ 165{
158 int master; 166 int master;
159 int ret = 0; 167 int ret = 0;
160 168
161 spin_lock(&res->spinlock); 169 assert_spin_locked(&dlm->spinlock);
162 if (!__dlm_lockres_unused(res)) { 170 assert_spin_locked(&res->spinlock);
163 mlog(0, "%s:%.*s: tried to purge but not unused\n",
164 dlm->name, res->lockname.len, res->lockname.name);
165 __dlm_print_one_lock_resource(res);
166 spin_unlock(&res->spinlock);
167 BUG();
168 }
169
170 if (res->state & DLM_LOCK_RES_MIGRATING) {
171 mlog(0, "%s:%.*s: Delay dropref as this lockres is "
172 "being remastered\n", dlm->name, res->lockname.len,
173 res->lockname.name);
174 /* Re-add the lockres to the end of the purge list */
175 if (!list_empty(&res->purge)) {
176 list_del_init(&res->purge);
177 list_add_tail(&res->purge, &dlm->purge_list);
178 }
179 spin_unlock(&res->spinlock);
180 return 0;
181 }
182 171
183 master = (res->owner == dlm->node_num); 172 master = (res->owner == dlm->node_num);
184 173
185 if (!master)
186 res->state |= DLM_LOCK_RES_DROPPING_REF;
187 spin_unlock(&res->spinlock);
188 174
189 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len, 175 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
190 res->lockname.name, master); 176 res->lockname.name, master);
191 177
192 if (!master) { 178 if (!master) {
179 res->state |= DLM_LOCK_RES_DROPPING_REF;
193 /* drop spinlock... retake below */ 180 /* drop spinlock... retake below */
181 spin_unlock(&res->spinlock);
194 spin_unlock(&dlm->spinlock); 182 spin_unlock(&dlm->spinlock);
195 183
196 spin_lock(&res->spinlock); 184 spin_lock(&res->spinlock);
@@ -208,31 +196,35 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm,
208 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n", 196 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
209 dlm->name, res->lockname.len, res->lockname.name, ret); 197 dlm->name, res->lockname.len, res->lockname.name, ret);
210 spin_lock(&dlm->spinlock); 198 spin_lock(&dlm->spinlock);
199 spin_lock(&res->spinlock);
211 } 200 }
212 201
213 spin_lock(&res->spinlock);
214 if (!list_empty(&res->purge)) { 202 if (!list_empty(&res->purge)) {
215 mlog(0, "removing lockres %.*s:%p from purgelist, " 203 mlog(0, "removing lockres %.*s:%p from purgelist, "
216 "master = %d\n", res->lockname.len, res->lockname.name, 204 "master = %d\n", res->lockname.len, res->lockname.name,
217 res, master); 205 res, master);
218 list_del_init(&res->purge); 206 list_del_init(&res->purge);
219 spin_unlock(&res->spinlock);
220 dlm_lockres_put(res); 207 dlm_lockres_put(res);
221 dlm->purge_count--; 208 dlm->purge_count--;
222 } else 209 }
223 spin_unlock(&res->spinlock); 210
211 if (!__dlm_lockres_unused(res)) {
212 mlog(ML_ERROR, "found lockres %s:%.*s: in use after deref\n",
213 dlm->name, res->lockname.len, res->lockname.name);
214 __dlm_print_one_lock_resource(res);
215 BUG();
216 }
224 217
225 __dlm_unhash_lockres(res); 218 __dlm_unhash_lockres(res);
226 219
227 /* lockres is not in the hash now. drop the flag and wake up 220 /* lockres is not in the hash now. drop the flag and wake up
228 * any processes waiting in dlm_get_lock_resource. */ 221 * any processes waiting in dlm_get_lock_resource. */
229 if (!master) { 222 if (!master) {
230 spin_lock(&res->spinlock);
231 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 223 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
232 spin_unlock(&res->spinlock); 224 spin_unlock(&res->spinlock);
233 wake_up(&res->wq); 225 wake_up(&res->wq);
234 } 226 } else
235 return 0; 227 spin_unlock(&res->spinlock);
236} 228}
237 229
238static void dlm_run_purge_list(struct dlm_ctxt *dlm, 230static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -251,17 +243,7 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
251 lockres = list_entry(dlm->purge_list.next, 243 lockres = list_entry(dlm->purge_list.next,
252 struct dlm_lock_resource, purge); 244 struct dlm_lock_resource, purge);
253 245
254 /* Status of the lockres *might* change so double
255 * check. If the lockres is unused, holding the dlm
256 * spinlock will prevent people from getting and more
257 * refs on it -- there's no need to keep the lockres
258 * spinlock. */
259 spin_lock(&lockres->spinlock); 246 spin_lock(&lockres->spinlock);
260 unused = __dlm_lockres_unused(lockres);
261 spin_unlock(&lockres->spinlock);
262
263 if (!unused)
264 continue;
265 247
266 purge_jiffies = lockres->last_used + 248 purge_jiffies = lockres->last_used +
267 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 249 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
@@ -273,15 +255,29 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
273 * in tail order, we can stop at the first 255 * in tail order, we can stop at the first
274 * unpurgable resource -- anyone added after 256 * unpurgable resource -- anyone added after
275 * him will have a greater last_used value */ 257 * him will have a greater last_used value */
258 spin_unlock(&lockres->spinlock);
276 break; 259 break;
277 } 260 }
278 261
262 /* Status of the lockres *might* change so double
263 * check. If the lockres is unused, holding the dlm
264 * spinlock will prevent people from getting and more
265 * refs on it. */
266 unused = __dlm_lockres_unused(lockres);
267 if (!unused ||
268 (lockres->state & DLM_LOCK_RES_MIGRATING)) {
269 mlog(0, "lockres %s:%.*s: is in use or "
270 "being remastered, used %d, state %d\n",
271 dlm->name, lockres->lockname.len,
272 lockres->lockname.name, !unused, lockres->state);
273 list_move_tail(&dlm->purge_list, &lockres->purge);
274 spin_unlock(&lockres->spinlock);
275 continue;
276 }
277
279 dlm_lockres_get(lockres); 278 dlm_lockres_get(lockres);
280 279
281 /* This may drop and reacquire the dlm spinlock if it 280 dlm_purge_lockres(dlm, lockres);
282 * has to do migration. */
283 if (dlm_purge_lockres(dlm, lockres))
284 BUG();
285 281
286 dlm_lockres_put(lockres); 282 dlm_lockres_put(lockres);
287 283