aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmdomain.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmdomain.c')
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c195
1 files changed, 169 insertions, 26 deletions
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 6954565b8ccb..638d2ebb892b 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -123,6 +123,17 @@ DEFINE_SPINLOCK(dlm_domain_lock);
123LIST_HEAD(dlm_domains); 123LIST_HEAD(dlm_domains);
124static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 124static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
125 125
126/*
127 * The supported protocol version for DLM communication. Running domains
128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using.
131 */
132static const struct dlm_protocol_version dlm_protocol = {
133 .pv_major = 1,
134 .pv_minor = 0,
135};
136
126#define DLM_DOMAIN_BACKOFF_MS 200 137#define DLM_DOMAIN_BACKOFF_MS 200
127 138
128static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 139static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -133,6 +144,8 @@ static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
133 void **ret_data); 144 void **ret_data);
134static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 145static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
135 void **ret_data); 146 void **ret_data);
147static int dlm_protocol_compare(struct dlm_protocol_version *existing,
148 struct dlm_protocol_version *request);
136 149
137static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 150static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
138 151
@@ -668,11 +681,45 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
668} 681}
669EXPORT_SYMBOL_GPL(dlm_unregister_domain); 682EXPORT_SYMBOL_GPL(dlm_unregister_domain);
670 683
684static int dlm_query_join_proto_check(char *proto_type, int node,
685 struct dlm_protocol_version *ours,
686 struct dlm_protocol_version *request)
687{
688 int rc;
689 struct dlm_protocol_version proto = *request;
690
691 if (!dlm_protocol_compare(ours, &proto)) {
692 mlog(0,
693 "node %u wanted to join with %s locking protocol "
694 "%u.%u, we respond with %u.%u\n",
695 node, proto_type,
696 request->pv_major,
697 request->pv_minor,
698 proto.pv_major, proto.pv_minor);
699 request->pv_minor = proto.pv_minor;
700 rc = 0;
701 } else {
702 mlog(ML_NOTICE,
703 "Node %u wanted to join with %s locking "
704 "protocol %u.%u, but we have %u.%u, disallowing\n",
705 node, proto_type,
706 request->pv_major,
707 request->pv_minor,
708 ours->pv_major,
709 ours->pv_minor);
710 rc = 1;
711 }
712
713 return rc;
714}
715
671static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 716static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
672 void **ret_data) 717 void **ret_data)
673{ 718{
674 struct dlm_query_join_request *query; 719 struct dlm_query_join_request *query;
675 enum dlm_query_join_response response; 720 union dlm_query_join_response response = {
721 .packet.code = JOIN_DISALLOW,
722 };
676 struct dlm_ctxt *dlm = NULL; 723 struct dlm_ctxt *dlm = NULL;
677 u8 nodenum; 724 u8 nodenum;
678 725
@@ -690,11 +737,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
690 mlog(0, "node %u is not in our live map yet\n", 737 mlog(0, "node %u is not in our live map yet\n",
691 query->node_idx); 738 query->node_idx);
692 739
693 response = JOIN_DISALLOW; 740 response.packet.code = JOIN_DISALLOW;
694 goto respond; 741 goto respond;
695 } 742 }
696 743
697 response = JOIN_OK_NO_MAP; 744 response.packet.code = JOIN_OK_NO_MAP;
698 745
699 spin_lock(&dlm_domain_lock); 746 spin_lock(&dlm_domain_lock);
700 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 747 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
@@ -713,7 +760,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
713 mlog(0, "disallow join as node %u does not " 760 mlog(0, "disallow join as node %u does not "
714 "have node %u in its nodemap\n", 761 "have node %u in its nodemap\n",
715 query->node_idx, nodenum); 762 query->node_idx, nodenum);
716 response = JOIN_DISALLOW; 763 response.packet.code = JOIN_DISALLOW;
717 goto unlock_respond; 764 goto unlock_respond;
718 } 765 }
719 } 766 }
@@ -733,30 +780,48 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
733 /*If this is a brand new context and we 780 /*If this is a brand new context and we
734 * haven't started our join process yet, then 781 * haven't started our join process yet, then
735 * the other node won the race. */ 782 * the other node won the race. */
736 response = JOIN_OK_NO_MAP; 783 response.packet.code = JOIN_OK_NO_MAP;
737 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 784 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
738 /* Disallow parallel joins. */ 785 /* Disallow parallel joins. */
739 response = JOIN_DISALLOW; 786 response.packet.code = JOIN_DISALLOW;
740 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 787 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
741 mlog(0, "node %u trying to join, but recovery " 788 mlog(0, "node %u trying to join, but recovery "
742 "is ongoing.\n", bit); 789 "is ongoing.\n", bit);
743 response = JOIN_DISALLOW; 790 response.packet.code = JOIN_DISALLOW;
744 } else if (test_bit(bit, dlm->recovery_map)) { 791 } else if (test_bit(bit, dlm->recovery_map)) {
745 mlog(0, "node %u trying to join, but it " 792 mlog(0, "node %u trying to join, but it "
746 "still needs recovery.\n", bit); 793 "still needs recovery.\n", bit);
747 response = JOIN_DISALLOW; 794 response.packet.code = JOIN_DISALLOW;
748 } else if (test_bit(bit, dlm->domain_map)) { 795 } else if (test_bit(bit, dlm->domain_map)) {
749 mlog(0, "node %u trying to join, but it " 796 mlog(0, "node %u trying to join, but it "
750 "is still in the domain! needs recovery?\n", 797 "is still in the domain! needs recovery?\n",
751 bit); 798 bit);
752 response = JOIN_DISALLOW; 799 response.packet.code = JOIN_DISALLOW;
753 } else { 800 } else {
754 /* Alright we're fully a part of this domain 801 /* Alright we're fully a part of this domain
755 * so we keep some state as to who's joining 802 * so we keep some state as to who's joining
756 * and indicate to him that needs to be fixed 803 * and indicate to him that needs to be fixed
757 * up. */ 804 * up. */
758 response = JOIN_OK; 805
759 __dlm_set_joining_node(dlm, query->node_idx); 806 /* Make sure we speak compatible locking protocols. */
807 if (dlm_query_join_proto_check("DLM", bit,
808 &dlm->dlm_locking_proto,
809 &query->dlm_proto)) {
810 response.packet.code =
811 JOIN_PROTOCOL_MISMATCH;
812 } else if (dlm_query_join_proto_check("fs", bit,
813 &dlm->fs_locking_proto,
814 &query->fs_proto)) {
815 response.packet.code =
816 JOIN_PROTOCOL_MISMATCH;
817 } else {
818 response.packet.dlm_minor =
819 query->dlm_proto.pv_minor;
820 response.packet.fs_minor =
821 query->fs_proto.pv_minor;
822 response.packet.code = JOIN_OK;
823 __dlm_set_joining_node(dlm, query->node_idx);
824 }
760 } 825 }
761 826
762 spin_unlock(&dlm->spinlock); 827 spin_unlock(&dlm->spinlock);
@@ -765,9 +830,9 @@ unlock_respond:
765 spin_unlock(&dlm_domain_lock); 830 spin_unlock(&dlm_domain_lock);
766 831
767respond: 832respond:
768 mlog(0, "We respond with %u\n", response); 833 mlog(0, "We respond with %u\n", response.packet.code);
769 834
770 return response; 835 return response.intval;
771} 836}
772 837
773static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 838static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -899,10 +964,11 @@ static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
899 964
900static int dlm_request_join(struct dlm_ctxt *dlm, 965static int dlm_request_join(struct dlm_ctxt *dlm,
901 int node, 966 int node,
902 enum dlm_query_join_response *response) 967 enum dlm_query_join_response_code *response)
903{ 968{
904 int status, retval; 969 int status;
905 struct dlm_query_join_request join_msg; 970 struct dlm_query_join_request join_msg;
971 union dlm_query_join_response join_resp;
906 972
907 mlog(0, "querying node %d\n", node); 973 mlog(0, "querying node %d\n", node);
908 974
@@ -910,12 +976,15 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
910 join_msg.node_idx = dlm->node_num; 976 join_msg.node_idx = dlm->node_num;
911 join_msg.name_len = strlen(dlm->name); 977 join_msg.name_len = strlen(dlm->name);
912 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 978 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
979 join_msg.dlm_proto = dlm->dlm_locking_proto;
980 join_msg.fs_proto = dlm->fs_locking_proto;
913 981
914 /* copy live node map to join message */ 982 /* copy live node map to join message */
915 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); 983 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
916 984
917 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 985 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
918 sizeof(join_msg), node, &retval); 986 sizeof(join_msg), node,
987 &join_resp.intval);
919 if (status < 0 && status != -ENOPROTOOPT) { 988 if (status < 0 && status != -ENOPROTOOPT) {
920 mlog_errno(status); 989 mlog_errno(status);
921 goto bail; 990 goto bail;
@@ -928,14 +997,41 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
928 if (status == -ENOPROTOOPT) { 997 if (status == -ENOPROTOOPT) {
929 status = 0; 998 status = 0;
930 *response = JOIN_OK_NO_MAP; 999 *response = JOIN_OK_NO_MAP;
931 } else if (retval == JOIN_DISALLOW || 1000 } else if (join_resp.packet.code == JOIN_DISALLOW ||
932 retval == JOIN_OK || 1001 join_resp.packet.code == JOIN_OK_NO_MAP) {
933 retval == JOIN_OK_NO_MAP) { 1002 *response = join_resp.packet.code;
934 *response = retval; 1003 } else if (join_resp.packet.code == JOIN_PROTOCOL_MISMATCH) {
1004 mlog(ML_NOTICE,
1005 "This node requested DLM locking protocol %u.%u and "
1006 "filesystem locking protocol %u.%u. At least one of "
1007 "the protocol versions on node %d is not compatible, "
1008 "disconnecting\n",
1009 dlm->dlm_locking_proto.pv_major,
1010 dlm->dlm_locking_proto.pv_minor,
1011 dlm->fs_locking_proto.pv_major,
1012 dlm->fs_locking_proto.pv_minor,
1013 node);
1014 status = -EPROTO;
1015 *response = join_resp.packet.code;
1016 } else if (join_resp.packet.code == JOIN_OK) {
1017 *response = join_resp.packet.code;
1018 /* Use the same locking protocol as the remote node */
1019 dlm->dlm_locking_proto.pv_minor =
1020 join_resp.packet.dlm_minor;
1021 dlm->fs_locking_proto.pv_minor =
1022 join_resp.packet.fs_minor;
1023 mlog(0,
1024 "Node %d responds JOIN_OK with DLM locking protocol "
1025 "%u.%u and fs locking protocol %u.%u\n",
1026 node,
1027 dlm->dlm_locking_proto.pv_major,
1028 dlm->dlm_locking_proto.pv_minor,
1029 dlm->fs_locking_proto.pv_major,
1030 dlm->fs_locking_proto.pv_minor);
935 } else { 1031 } else {
936 status = -EINVAL; 1032 status = -EINVAL;
937 mlog(ML_ERROR, "invalid response %d from node %u\n", retval, 1033 mlog(ML_ERROR, "invalid response %d from node %u\n",
938 node); 1034 join_resp.packet.code, node);
939 } 1035 }
940 1036
941 mlog(0, "status %d, node %d response is %d\n", status, node, 1037 mlog(0, "status %d, node %d response is %d\n", status, node,
@@ -1008,7 +1104,7 @@ struct domain_join_ctxt {
1008 1104
1009static int dlm_should_restart_join(struct dlm_ctxt *dlm, 1105static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1010 struct domain_join_ctxt *ctxt, 1106 struct domain_join_ctxt *ctxt,
1011 enum dlm_query_join_response response) 1107 enum dlm_query_join_response_code response)
1012{ 1108{
1013 int ret; 1109 int ret;
1014 1110
@@ -1034,7 +1130,7 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1034{ 1130{
1035 int status = 0, tmpstat, node; 1131 int status = 0, tmpstat, node;
1036 struct domain_join_ctxt *ctxt; 1132 struct domain_join_ctxt *ctxt;
1037 enum dlm_query_join_response response = JOIN_DISALLOW; 1133 enum dlm_query_join_response_code response = JOIN_DISALLOW;
1038 1134
1039 mlog_entry("%p", dlm); 1135 mlog_entry("%p", dlm);
1040 1136
@@ -1450,10 +1546,38 @@ leave:
1450} 1546}
1451 1547
1452/* 1548/*
1453 * dlm_register_domain: one-time setup per "domain" 1549 * Compare a requested locking protocol version against the current one.
1550 *
1551 * If the major numbers are different, they are incompatible.
1552 * If the current minor is greater than the request, they are incompatible.
1553 * If the current minor is less than or equal to the request, they are
1554 * compatible, and the requester should run at the current minor version.
1555 */
1556static int dlm_protocol_compare(struct dlm_protocol_version *existing,
1557 struct dlm_protocol_version *request)
1558{
1559 if (existing->pv_major != request->pv_major)
1560 return 1;
1561
1562 if (existing->pv_minor > request->pv_minor)
1563 return 1;
1564
1565 if (existing->pv_minor < request->pv_minor)
1566 request->pv_minor = existing->pv_minor;
1567
1568 return 0;
1569}
1570
1571/*
1572 * dlm_register_domain: one-time setup per "domain".
1573 *
1574 * The filesystem passes in the requested locking version via proto.
1575 * If registration was successful, proto will contain the negotiated
1576 * locking protocol.
1454 */ 1577 */
1455struct dlm_ctxt * dlm_register_domain(const char *domain, 1578struct dlm_ctxt * dlm_register_domain(const char *domain,
1456 u32 key) 1579 u32 key,
1580 struct dlm_protocol_version *fs_proto)
1457{ 1581{
1458 int ret; 1582 int ret;
1459 struct dlm_ctxt *dlm = NULL; 1583 struct dlm_ctxt *dlm = NULL;
@@ -1496,6 +1620,15 @@ retry:
1496 goto retry; 1620 goto retry;
1497 } 1621 }
1498 1622
1623 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
1624 mlog(ML_ERROR,
1625 "Requested locking protocol version is not "
1626 "compatible with already registered domain "
1627 "\"%s\"\n", domain);
1628 ret = -EPROTO;
1629 goto leave;
1630 }
1631
1499 __dlm_get(dlm); 1632 __dlm_get(dlm);
1500 dlm->num_joins++; 1633 dlm->num_joins++;
1501 1634
@@ -1526,6 +1659,13 @@ retry:
1526 list_add_tail(&dlm->list, &dlm_domains); 1659 list_add_tail(&dlm->list, &dlm_domains);
1527 spin_unlock(&dlm_domain_lock); 1660 spin_unlock(&dlm_domain_lock);
1528 1661
1662 /*
1663 * Pass the locking protocol version into the join. If the join
1664 * succeeds, it will have the negotiated protocol set.
1665 */
1666 dlm->dlm_locking_proto = dlm_protocol;
1667 dlm->fs_locking_proto = *fs_proto;
1668
1529 ret = dlm_join_domain(dlm); 1669 ret = dlm_join_domain(dlm);
1530 if (ret) { 1670 if (ret) {
1531 mlog_errno(ret); 1671 mlog_errno(ret);
@@ -1533,6 +1673,9 @@ retry:
1533 goto leave; 1673 goto leave;
1534 } 1674 }
1535 1675
1676 /* Tell the caller what locking protocol we negotiated */
1677 *fs_proto = dlm->fs_locking_proto;
1678
1536 ret = 0; 1679 ret = 0;
1537leave: 1680leave:
1538 if (new_ctxt) 1681 if (new_ctxt)