aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmdomain.c
diff options
context:
space:
mode:
authorSunil Mushran <sunil.mushran@oracle.com>2010-10-09 13:26:23 -0400
committerSunil Mushran <sunil.mushran@oracle.com>2010-10-09 13:26:23 -0400
commitea2034416b54700e30371f2ad6517cbb94674083 (patch)
tree057585455d8357a63b5c35ebc69de73ac0f828ec /fs/ocfs2/dlm/dlmdomain.c
parentb3c85c4cdf77154acc940dd0f14d1fb99cbbaf75 (diff)
ocfs2/dlm: Add message DLM_QUERY_REGION
Adds new dlm message DLM_QUERY_REGION that sends the names of all active heartbeat regions. This message is only sent in the global heartbeat mode. If the regions in the joining node do not fully match the ones in the active nodes, the join domain request is rejected. Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com>
Diffstat (limited to 'fs/ocfs2/dlm/dlmdomain.c')
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c218
1 files changed, 218 insertions, 0 deletions
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 11a5c87fd7f7..49650756dfef 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -128,6 +128,9 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
128 * will have a negotiated version with the same major number and a minor 128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should 129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using. 130 * be used to determine what a running domain is actually using.
131 *
132 * New in version 1.1:
133 * - Message DLM_QUERY_REGION added to support global heartbeat
131 */ 134 */
132static const struct dlm_protocol_version dlm_protocol = { 135static const struct dlm_protocol_version dlm_protocol = {
133 .pv_major = 1, 136 .pv_major = 1,
@@ -142,6 +145,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
142 void **ret_data); 145 void **ret_data);
143static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 146static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
144 void **ret_data); 147 void **ret_data);
148static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
149 void *data, void **ret_data);
145static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 150static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
146 void **ret_data); 151 void **ret_data);
147static int dlm_protocol_compare(struct dlm_protocol_version *existing, 152static int dlm_protocol_compare(struct dlm_protocol_version *existing,
@@ -921,6 +926,203 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
921 return 0; 926 return 0;
922} 927}
923 928
929static int dlm_match_regions(struct dlm_ctxt *dlm,
930 struct dlm_query_region *qr)
931{
932 char *local = NULL, *remote = qr->qr_regions;
933 char *l, *r;
934 int localnr, i, j, foundit;
935 int status = 0;
936
937 if (!o2hb_global_heartbeat_active()) {
938 if (qr->qr_numregions) {
939 mlog(ML_ERROR, "Domain %s: Joining node %d has global "
940 "heartbeat enabled but local node %d does not\n",
941 qr->qr_domain, qr->qr_node, dlm->node_num);
942 status = -EINVAL;
943 }
944 goto bail;
945 }
946
947 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
948 mlog(ML_ERROR, "Domain %s: Local node %d has global "
949 "heartbeat enabled but joining node %d does not\n",
950 qr->qr_domain, dlm->node_num, qr->qr_node);
951 status = -EINVAL;
952 goto bail;
953 }
954
955 r = remote;
956 for (i = 0; i < qr->qr_numregions; ++i) {
957 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
958 r += O2HB_MAX_REGION_NAME_LEN;
959 }
960
961 local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
962 if (!local) {
963 status = -ENOMEM;
964 goto bail;
965 }
966
967 localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
968
969 /* compare local regions with remote */
970 l = local;
971 for (i = 0; i < localnr; ++i) {
972 foundit = 0;
973 r = remote;
974 for (j = 0; j <= qr->qr_numregions; ++j) {
975 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
976 foundit = 1;
977 break;
978 }
979 r += O2HB_MAX_REGION_NAME_LEN;
980 }
981 if (!foundit) {
982 status = -EINVAL;
983 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
984 "in local node %d but not in joining node %d\n",
985 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
986 dlm->node_num, qr->qr_node);
987 goto bail;
988 }
989 l += O2HB_MAX_REGION_NAME_LEN;
990 }
991
992 /* compare remote with local regions */
993 r = remote;
994 for (i = 0; i < qr->qr_numregions; ++i) {
995 foundit = 0;
996 l = local;
997 for (j = 0; j < localnr; ++j) {
998 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
999 foundit = 1;
1000 break;
1001 }
1002 l += O2HB_MAX_REGION_NAME_LEN;
1003 }
1004 if (!foundit) {
1005 status = -EINVAL;
1006 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1007 "in joining node %d but not in local node %d\n",
1008 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1009 qr->qr_node, dlm->node_num);
1010 goto bail;
1011 }
1012 r += O2HB_MAX_REGION_NAME_LEN;
1013 }
1014
1015bail:
1016 kfree(local);
1017
1018 return status;
1019}
1020
1021static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1022{
1023 struct dlm_query_region *qr = NULL;
1024 int status, ret = 0, i;
1025 char *p;
1026
1027 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1028 goto bail;
1029
1030 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1031 if (!qr) {
1032 ret = -ENOMEM;
1033 mlog_errno(ret);
1034 goto bail;
1035 }
1036
1037 qr->qr_node = dlm->node_num;
1038 qr->qr_namelen = strlen(dlm->name);
1039 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1040 /* if local hb, the numregions will be zero */
1041 if (o2hb_global_heartbeat_active())
1042 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1043 O2NM_MAX_REGIONS);
1044
1045 p = qr->qr_regions;
1046 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1047 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1048
1049 i = -1;
1050 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1051 i + 1)) < O2NM_MAX_NODES) {
1052 if (i == dlm->node_num)
1053 continue;
1054
1055 mlog(0, "Sending regions to node %d\n", i);
1056
1057 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1058 sizeof(struct dlm_query_region),
1059 i, &status);
1060 if (ret >= 0)
1061 ret = status;
1062 if (ret) {
1063 mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1064 ret, i);
1065 break;
1066 }
1067 }
1068
1069bail:
1070 kfree(qr);
1071 return ret;
1072}
1073
1074static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1075 void *data, void **ret_data)
1076{
1077 struct dlm_query_region *qr;
1078 struct dlm_ctxt *dlm = NULL;
1079 int status = 0;
1080 int locked = 0;
1081
1082 qr = (struct dlm_query_region *) msg->buf;
1083
1084 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1085 qr->qr_domain);
1086
1087 status = -EINVAL;
1088
1089 spin_lock(&dlm_domain_lock);
1090 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1091 if (!dlm) {
1092 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1093 "before join domain\n", qr->qr_node, qr->qr_domain);
1094 goto bail;
1095 }
1096
1097 spin_lock(&dlm->spinlock);
1098 locked = 1;
1099 if (dlm->joining_node != qr->qr_node) {
1100 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1101 "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1102 dlm->joining_node);
1103 goto bail;
1104 }
1105
1106 /* Support for global heartbeat was added in 1.1 */
1107 if (dlm->dlm_locking_proto.pv_major == 1 &&
1108 dlm->dlm_locking_proto.pv_minor == 0) {
1109 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1110 "but active dlm protocol is %d.%d\n", qr->qr_node,
1111 qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1112 dlm->dlm_locking_proto.pv_minor);
1113 goto bail;
1114 }
1115
1116 status = dlm_match_regions(dlm, qr);
1117
1118bail:
1119 if (locked)
1120 spin_unlock(&dlm->spinlock);
1121 spin_unlock(&dlm_domain_lock);
1122
1123 return status;
1124}
1125
924static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, 1126static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
925 void **ret_data) 1127 void **ret_data)
926{ 1128{
@@ -1241,6 +1443,15 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1241 set_bit(dlm->node_num, dlm->domain_map); 1443 set_bit(dlm->node_num, dlm->domain_map);
1242 spin_unlock(&dlm->spinlock); 1444 spin_unlock(&dlm->spinlock);
1243 1445
1446 /* Support for global heartbeat was added in 1.1 */
1447 if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
1448 status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1449 if (status) {
1450 mlog_errno(status);
1451 goto bail;
1452 }
1453 }
1454
1244 dlm_send_join_asserts(dlm, ctxt->yes_resp_map); 1455 dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1245 1456
1246 /* Joined state *must* be set before the joining node 1457 /* Joined state *must* be set before the joining node
@@ -1807,6 +2018,13 @@ static int dlm_register_net_handlers(void)
1807 sizeof(struct dlm_cancel_join), 2018 sizeof(struct dlm_cancel_join),
1808 dlm_cancel_join_handler, 2019 dlm_cancel_join_handler,
1809 NULL, NULL, &dlm_join_handlers); 2020 NULL, NULL, &dlm_join_handlers);
2021 if (status)
2022 goto bail;
2023
2024 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2025 sizeof(struct dlm_query_region),
2026 dlm_query_region_handler,
2027 NULL, NULL, &dlm_join_handlers);
1810 2028
1811bail: 2029bail:
1812 if (status < 0) 2030 if (status < 0)