aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmdomain.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmdomain.c')
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c218
1 files changed, 218 insertions, 0 deletions
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 11a5c87fd7f7..49650756dfef 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -128,6 +128,9 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
128 * will have a negotiated version with the same major number and a minor 128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using. 130 * be used to determine what a running domain is actually using.
131 *
132 * New in version 1.1:
133 * - Message DLM_QUERY_REGION added to support global heartbeat
131 */ 134 */
132static const struct dlm_protocol_version dlm_protocol = { 135static const struct dlm_protocol_version dlm_protocol = {
133 .pv_major = 1, 136 .pv_major = 1,
@@ -142,6 +145,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
142 void **ret_data); 145 void **ret_data);
143static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 146static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
144 void **ret_data); 147 void **ret_data);
148static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
149 void *data, void **ret_data);
145static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 150static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
146 void **ret_data); 151 void **ret_data);
147static int dlm_protocol_compare(struct dlm_protocol_version *existing, 152static int dlm_protocol_compare(struct dlm_protocol_version *existing,
@@ -921,6 +926,203 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
921 return 0; 926 return 0;
922} 927}
923 928
929static int dlm_match_regions(struct dlm_ctxt *dlm,
930 struct dlm_query_region *qr)
931{
932 char *local = NULL, *remote = qr->qr_regions;
933 char *l, *r;
934 int localnr, i, j, foundit;
935 int status = 0;
936
937 if (!o2hb_global_heartbeat_active()) {
938 if (qr->qr_numregions) {
939 mlog(ML_ERROR, "Domain %s: Joining node %d has global "
940 "heartbeat enabled but local node %d does not\n",
941 qr->qr_domain, qr->qr_node, dlm->node_num);
942 status = -EINVAL;
943 }
944 goto bail;
945 }
946
947 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
948 mlog(ML_ERROR, "Domain %s: Local node %d has global "
949 "heartbeat enabled but joining node %d does not\n",
950 qr->qr_domain, dlm->node_num, qr->qr_node);
951 status = -EINVAL;
952 goto bail;
953 }
954
955 r = remote;
956 for (i = 0; i < qr->qr_numregions; ++i) {
957 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
958 r += O2HB_MAX_REGION_NAME_LEN;
959 }
960
961 local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
962 if (!local) {
963 status = -ENOMEM;
964 goto bail;
965 }
966
967 localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
968
969 /* compare local regions with remote */
970 l = local;
971 for (i = 0; i < localnr; ++i) {
972 foundit = 0;
973 r = remote;
974 for (j = 0; j <= qr->qr_numregions; ++j) {
975 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
976 foundit = 1;
977 break;
978 }
979 r += O2HB_MAX_REGION_NAME_LEN;
980 }
981 if (!foundit) {
982 status = -EINVAL;
983 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
984 "in local node %d but not in joining node %d\n",
985 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
986 dlm->node_num, qr->qr_node);
987 goto bail;
988 }
989 l += O2HB_MAX_REGION_NAME_LEN;
990 }
991
992 /* compare remote with local regions */
993 r = remote;
994 for (i = 0; i < qr->qr_numregions; ++i) {
995 foundit = 0;
996 l = local;
997 for (j = 0; j < localnr; ++j) {
998 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
999 foundit = 1;
1000 break;
1001 }
1002 l += O2HB_MAX_REGION_NAME_LEN;
1003 }
1004 if (!foundit) {
1005 status = -EINVAL;
1006 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1007 "in joining node %d but not in local node %d\n",
1008 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1009 qr->qr_node, dlm->node_num);
1010 goto bail;
1011 }
1012 r += O2HB_MAX_REGION_NAME_LEN;
1013 }
1014
1015bail:
1016 kfree(local);
1017
1018 return status;
1019}
1020
1021static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1022{
1023 struct dlm_query_region *qr = NULL;
1024 int status, ret = 0, i;
1025 char *p;
1026
1027 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1028 goto bail;
1029
1030 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1031 if (!qr) {
1032 ret = -ENOMEM;
1033 mlog_errno(ret);
1034 goto bail;
1035 }
1036
1037 qr->qr_node = dlm->node_num;
1038 qr->qr_namelen = strlen(dlm->name);
1039 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1040 /* if local hb, the numregions will be zero */
1041 if (o2hb_global_heartbeat_active())
1042 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1043 O2NM_MAX_REGIONS);
1044
1045 p = qr->qr_regions;
1046 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1047 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1048
1049 i = -1;
1050 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1051 i + 1)) < O2NM_MAX_NODES) {
1052 if (i == dlm->node_num)
1053 continue;
1054
1055 mlog(0, "Sending regions to node %d\n", i);
1056
1057 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1058 sizeof(struct dlm_query_region),
1059 i, &status);
1060 if (ret >= 0)
1061 ret = status;
1062 if (ret) {
1063 mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1064 ret, i);
1065 break;
1066 }
1067 }
1068
1069bail:
1070 kfree(qr);
1071 return ret;
1072}
1073
1074static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1075 void *data, void **ret_data)
1076{
1077 struct dlm_query_region *qr;
1078 struct dlm_ctxt *dlm = NULL;
1079 int status = 0;
1080 int locked = 0;
1081
1082 qr = (struct dlm_query_region *) msg->buf;
1083
1084 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1085 qr->qr_domain);
1086
1087 status = -EINVAL;
1088
1089 spin_lock(&dlm_domain_lock);
1090 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1091 if (!dlm) {
1092 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1093 "before join domain\n", qr->qr_node, qr->qr_domain);
1094 goto bail;
1095 }
1096
1097 spin_lock(&dlm->spinlock);
1098 locked = 1;
1099 if (dlm->joining_node != qr->qr_node) {
1100 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1101 "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1102 dlm->joining_node);
1103 goto bail;
1104 }
1105
1106 /* Support for global heartbeat was added in 1.1 */
1107 if (dlm->dlm_locking_proto.pv_major == 1 &&
1108 dlm->dlm_locking_proto.pv_minor == 0) {
1109 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1110 "but active dlm protocol is %d.%d\n", qr->qr_node,
1111 qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1112 dlm->dlm_locking_proto.pv_minor);
1113 goto bail;
1114 }
1115
1116 status = dlm_match_regions(dlm, qr);
1117
1118bail:
1119 if (locked)
1120 spin_unlock(&dlm->spinlock);
1121 spin_unlock(&dlm_domain_lock);
1122
1123 return status;
1124}
1125
924static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 1126static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
925 void **ret_data) 1127 void **ret_data)
926{ 1128{
@@ -1241,6 +1443,15 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1241 set_bit(dlm->node_num, dlm->domain_map); 1443 set_bit(dlm->node_num, dlm->domain_map);
1242 spin_unlock(&dlm->spinlock); 1444 spin_unlock(&dlm->spinlock);
1243 1445
1446 /* Support for global heartbeat was added in 1.1 */
1447 if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
1448 status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1449 if (status) {
1450 mlog_errno(status);
1451 goto bail;
1452 }
1453 }
1454
1244 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1455 dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1245 1456
1246 /* Joined state *must* be set before the joining node 1457 /* Joined state *must* be set before the joining node
@@ -1807,6 +2018,13 @@ static int dlm_register_net_handlers(void)
1807 sizeof(struct dlm_cancel_join), 2018 sizeof(struct dlm_cancel_join),
1808 dlm_cancel_join_handler, 2019 dlm_cancel_join_handler,
1809 NULL, NULL, &dlm_join_handlers); 2020 NULL, NULL, &dlm_join_handlers);
2021 if (status)
2022 goto bail;
2023
2024 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2025 sizeof(struct dlm_query_region),
2026 dlm_query_region_handler,
2027 NULL, NULL, &dlm_join_handlers);
1810 2028
1811bail: 2029bail:
1812 if (status < 0) 2030 if (status < 0)