aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
authorJoel Becker <Joel.Becker@oracle.com>2008-01-25 20:02:21 -0500
committerMark Fasheh <mark.fasheh@oracle.com>2008-02-06 19:11:29 -0500
commitd24fbcda0c4988322949df3d759f1cfb32b32953 (patch)
treed8454796d58649126005001472e9dcee8bd557ca /fs/ocfs2/dlm
parent3e6bdf473f489664dac4d7511d26c7ac3dfdc748 (diff)
ocfs2: Negotiate locking protocol versions.
Currently, when ocfs2 nodes connect via TCP, they advertise their compatibility level. If the versions do not match, two nodes cannot speak to each other and they disconnect. As a result, this provides no forward or backwards compatibility. This patch implements a simple protocol negotiation at the dlm level by introducing a major/minor version number scheme for entities that communicate. Specifically, o2dlm has a major/minor version for interaction with o2dlm on other nodes, and ocfs2 itself has a major/minor version for interacting with the filesystem on other nodes. This will allow rolling upgrades of ocfs2 clusters when changes to the locking or network protocols can be done in a backwards compatible manner. In those cases, only the minor number is changed and the negotatied protocol minor is returned from dlm join. In the far less likely event that a required protocol change makes backwards compatibility impossible, we simply bump the major number. Signed-off-by: Joel Becker <joel.becker@oracle.com> Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmapi.h7
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h24
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c195
-rw-r--r--fs/ocfs2/dlm/dlmfs.c15
-rw-r--r--fs/ocfs2/dlm/userdlm.c5
-rw-r--r--fs/ocfs2/dlm/userdlm.h3
6 files changed, 217 insertions, 32 deletions
diff --git a/fs/ocfs2/dlm/dlmapi.h b/fs/ocfs2/dlm/dlmapi.h
index cfd5cb65cab0..b5786a787fab 100644
--- a/fs/ocfs2/dlm/dlmapi.h
+++ b/fs/ocfs2/dlm/dlmapi.h
@@ -193,7 +193,12 @@ enum dlm_status dlmunlock(struct dlm_ctxt *dlm,
193 dlm_astunlockfunc_t *unlockast, 193 dlm_astunlockfunc_t *unlockast,
194 void *data); 194 void *data);
195 195
196struct dlm_ctxt * dlm_register_domain(const char *domain, u32 key); 196struct dlm_protocol_version {
197 u8 pv_major;
198 u8 pv_minor;
199};
200struct dlm_ctxt * dlm_register_domain(const char *domain, u32 key,
201 struct dlm_protocol_version *fs_proto);
197 202
198void dlm_unregister_domain(struct dlm_ctxt *dlm); 203void dlm_unregister_domain(struct dlm_ctxt *dlm);
199 204
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index e90b92f9ece1..9843ee17ea27 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -142,6 +142,12 @@ struct dlm_ctxt
142 spinlock_t work_lock; 142 spinlock_t work_lock;
143 struct list_head dlm_domain_handlers; 143 struct list_head dlm_domain_handlers;
144 struct list_head dlm_eviction_callbacks; 144 struct list_head dlm_eviction_callbacks;
145
146 /* The filesystem specifies this at domain registration. We
147 * cache it here to know what to tell other nodes. */
148 struct dlm_protocol_version fs_locking_proto;
149 /* This is the inter-dlm communication version */
150 struct dlm_protocol_version dlm_locking_proto;
145}; 151};
146 152
147static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i) 153static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i)
@@ -589,10 +595,24 @@ struct dlm_proxy_ast
589#define DLM_PROXY_AST_MAX_LEN (sizeof(struct dlm_proxy_ast)+DLM_LVB_LEN) 595#define DLM_PROXY_AST_MAX_LEN (sizeof(struct dlm_proxy_ast)+DLM_LVB_LEN)
590 596
591#define DLM_MOD_KEY (0x666c6172) 597#define DLM_MOD_KEY (0x666c6172)
592enum dlm_query_join_response { 598enum dlm_query_join_response_code {
593 JOIN_DISALLOW = 0, 599 JOIN_DISALLOW = 0,
594 JOIN_OK, 600 JOIN_OK,
595 JOIN_OK_NO_MAP, 601 JOIN_OK_NO_MAP,
602 JOIN_PROTOCOL_MISMATCH,
603};
604
605union dlm_query_join_response {
606 u32 intval;
607 struct {
608 u8 code; /* Response code. dlm_minor and fs_minor
609 are only valid if this is JOIN_OK */
610 u8 dlm_minor; /* The minor version of the protocol the
611 dlm is speaking. */
612 u8 fs_minor; /* The minor version of the protocol the
613 filesystem is speaking. */
614 u8 reserved;
615 } packet;
596}; 616};
597 617
598struct dlm_lock_request 618struct dlm_lock_request
@@ -633,6 +653,8 @@ struct dlm_query_join_request
633 u8 node_idx; 653 u8 node_idx;
634 u8 pad1[2]; 654 u8 pad1[2];
635 u8 name_len; 655 u8 name_len;
656 struct dlm_protocol_version dlm_proto;
657 struct dlm_protocol_version fs_proto;
636 u8 domain[O2NM_MAX_NAME_LEN]; 658 u8 domain[O2NM_MAX_NAME_LEN];
637 u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)]; 659 u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
638}; 660};
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 6954565b8ccb..638d2ebb892b 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -123,6 +123,17 @@ DEFINE_SPINLOCK(dlm_domain_lock);
123LIST_HEAD(dlm_domains); 123LIST_HEAD(dlm_domains);
124static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); 124static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
125 125
126/*
127 * The supported protocol version for DLM communication. Running domains
128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using.
131 */
132static const struct dlm_protocol_version dlm_protocol = {
133 .pv_major = 1,
134 .pv_minor = 0,
135};
136
126#define DLM_DOMAIN_BACKOFF_MS 200 137#define DLM_DOMAIN_BACKOFF_MS 200
127 138
128static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 139static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -133,6 +144,8 @@ static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
133 void **ret_data); 144 void **ret_data);
134static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, 145static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
135 void **ret_data); 146 void **ret_data);
147static int dlm_protocol_compare(struct dlm_protocol_version *existing,
148 struct dlm_protocol_version *request);
136 149
137static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 150static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
138 151
@@ -668,11 +681,45 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
668} 681}
669EXPORT_SYMBOL_GPL(dlm_unregister_domain); 682EXPORT_SYMBOL_GPL(dlm_unregister_domain);
670 683
684static int dlm_query_join_proto_check(char *proto_type, int node,
685 struct dlm_protocol_version *ours,
686 struct dlm_protocol_version *request)
687{
688 int rc;
689 struct dlm_protocol_version proto = *request;
690
691 if (!dlm_protocol_compare(ours, &proto)) {
692 mlog(0,
693 "node %u wanted to join with %s locking protocol "
694 "%u.%u, we respond with %u.%u\n",
695 node, proto_type,
696 request->pv_major,
697 request->pv_minor,
698 proto.pv_major, proto.pv_minor);
699 request->pv_minor = proto.pv_minor;
700 rc = 0;
701 } else {
702 mlog(ML_NOTICE,
703 "Node %u wanted to join with %s locking "
704 "protocol %u.%u, but we have %u.%u, disallowing\n",
705 node, proto_type,
706 request->pv_major,
707 request->pv_minor,
708 ours->pv_major,
709 ours->pv_minor);
710 rc = 1;
711 }
712
713 return rc;
714}
715
671static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 716static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
672 void **ret_data) 717 void **ret_data)
673{ 718{
674 struct dlm_query_join_request *query; 719 struct dlm_query_join_request *query;
675 enum dlm_query_join_response response; 720 union dlm_query_join_response response = {
721 .packet.code = JOIN_DISALLOW,
722 };
676 struct dlm_ctxt *dlm = NULL; 723 struct dlm_ctxt *dlm = NULL;
677 u8 nodenum; 724 u8 nodenum;
678 725
@@ -690,11 +737,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
690 mlog(0, "node %u is not in our live map yet\n", 737 mlog(0, "node %u is not in our live map yet\n",
691 query->node_idx); 738 query->node_idx);
692 739
693 response = JOIN_DISALLOW; 740 response.packet.code = JOIN_DISALLOW;
694 goto respond; 741 goto respond;
695 } 742 }
696 743
697 response = JOIN_OK_NO_MAP; 744 response.packet.code = JOIN_OK_NO_MAP;
698 745
699 spin_lock(&dlm_domain_lock); 746 spin_lock(&dlm_domain_lock);
700 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 747 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
@@ -713,7 +760,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
713 mlog(0, "disallow join as node %u does not " 760 mlog(0, "disallow join as node %u does not "
714 "have node %u in its nodemap\n", 761 "have node %u in its nodemap\n",
715 query->node_idx, nodenum); 762 query->node_idx, nodenum);
716 response = JOIN_DISALLOW; 763 response.packet.code = JOIN_DISALLOW;
717 goto unlock_respond; 764 goto unlock_respond;
718 } 765 }
719 } 766 }
@@ -733,30 +780,48 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
733 /*If this is a brand new context and we 780 /*If this is a brand new context and we
734 * haven't started our join process yet, then 781 * haven't started our join process yet, then
735 * the other node won the race. */ 782 * the other node won the race. */
736 response = JOIN_OK_NO_MAP; 783 response.packet.code = JOIN_OK_NO_MAP;
737 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 784 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
738 /* Disallow parallel joins. */ 785 /* Disallow parallel joins. */
739 response = JOIN_DISALLOW; 786 response.packet.code = JOIN_DISALLOW;
740 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 787 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
741 mlog(0, "node %u trying to join, but recovery " 788 mlog(0, "node %u trying to join, but recovery "
742 "is ongoing.\n", bit); 789 "is ongoing.\n", bit);
743 response = JOIN_DISALLOW; 790 response.packet.code = JOIN_DISALLOW;
744 } else if (test_bit(bit, dlm->recovery_map)) { 791 } else if (test_bit(bit, dlm->recovery_map)) {
745 mlog(0, "node %u trying to join, but it " 792 mlog(0, "node %u trying to join, but it "
746 "still needs recovery.\n", bit); 793 "still needs recovery.\n", bit);
747 response = JOIN_DISALLOW; 794 response.packet.code = JOIN_DISALLOW;
748 } else if (test_bit(bit, dlm->domain_map)) { 795 } else if (test_bit(bit, dlm->domain_map)) {
749 mlog(0, "node %u trying to join, but it " 796 mlog(0, "node %u trying to join, but it "
750 "is still in the domain! needs recovery?\n", 797 "is still in the domain! needs recovery?\n",
751 bit); 798 bit);
752 response = JOIN_DISALLOW; 799 response.packet.code = JOIN_DISALLOW;
753 } else { 800 } else {
754 /* Alright we're fully a part of this domain 801 /* Alright we're fully a part of this domain
755 * so we keep some state as to who's joining 802 * so we keep some state as to who's joining
756 * and indicate to him that needs to be fixed 803 * and indicate to him that needs to be fixed
757 * up. */ 804 * up. */
758 response = JOIN_OK; 805
759 __dlm_set_joining_node(dlm, query->node_idx); 806 /* Make sure we speak compatible locking protocols. */
807 if (dlm_query_join_proto_check("DLM", bit,
808 &dlm->dlm_locking_proto,
809 &query->dlm_proto)) {
810 response.packet.code =
811 JOIN_PROTOCOL_MISMATCH;
812 } else if (dlm_query_join_proto_check("fs", bit,
813 &dlm->fs_locking_proto,
814 &query->fs_proto)) {
815 response.packet.code =
816 JOIN_PROTOCOL_MISMATCH;
817 } else {
818 response.packet.dlm_minor =
819 query->dlm_proto.pv_minor;
820 response.packet.fs_minor =
821 query->fs_proto.pv_minor;
822 response.packet.code = JOIN_OK;
823 __dlm_set_joining_node(dlm, query->node_idx);
824 }
760 } 825 }
761 826
762 spin_unlock(&dlm->spinlock); 827 spin_unlock(&dlm->spinlock);
@@ -765,9 +830,9 @@ unlock_respond:
765 spin_unlock(&dlm_domain_lock); 830 spin_unlock(&dlm_domain_lock);
766 831
767respond: 832respond:
768 mlog(0, "We respond with %u\n", response); 833 mlog(0, "We respond with %u\n", response.packet.code);
769 834
770 return response; 835 return response.intval;
771} 836}
772 837
773static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 838static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -899,10 +964,11 @@ static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
899 964
900static int dlm_request_join(struct dlm_ctxt *dlm, 965static int dlm_request_join(struct dlm_ctxt *dlm,
901 int node, 966 int node,
902 enum dlm_query_join_response *response) 967 enum dlm_query_join_response_code *response)
903{ 968{
904 int status, retval; 969 int status;
905 struct dlm_query_join_request join_msg; 970 struct dlm_query_join_request join_msg;
971 union dlm_query_join_response join_resp;
906 972
907 mlog(0, "querying node %d\n", node); 973 mlog(0, "querying node %d\n", node);
908 974
@@ -910,12 +976,15 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
910 join_msg.node_idx = dlm->node_num; 976 join_msg.node_idx = dlm->node_num;
911 join_msg.name_len = strlen(dlm->name); 977 join_msg.name_len = strlen(dlm->name);
912 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 978 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
979 join_msg.dlm_proto = dlm->dlm_locking_proto;
980 join_msg.fs_proto = dlm->fs_locking_proto;
913 981
914 /* copy live node map to join message */ 982 /* copy live node map to join message */
915 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES); 983 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
916 984
917 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 985 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
918 sizeof(join_msg), node, &retval); 986 sizeof(join_msg), node,
987 &join_resp.intval);
919 if (status < 0 && status != -ENOPROTOOPT) { 988 if (status < 0 && status != -ENOPROTOOPT) {
920 mlog_errno(status); 989 mlog_errno(status);
921 goto bail; 990 goto bail;
@@ -928,14 +997,41 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
928 if (status == -ENOPROTOOPT) { 997 if (status == -ENOPROTOOPT) {
929 status = 0; 998 status = 0;
930 *response = JOIN_OK_NO_MAP; 999 *response = JOIN_OK_NO_MAP;
931 } else if (retval == JOIN_DISALLOW || 1000 } else if (join_resp.packet.code == JOIN_DISALLOW ||
932 retval == JOIN_OK || 1001 join_resp.packet.code == JOIN_OK_NO_MAP) {
933 retval == JOIN_OK_NO_MAP) { 1002 *response = join_resp.packet.code;
934 *response = retval; 1003 } else if (join_resp.packet.code == JOIN_PROTOCOL_MISMATCH) {
1004 mlog(ML_NOTICE,
1005 "This node requested DLM locking protocol %u.%u and "
1006 "filesystem locking protocol %u.%u. At least one of "
1007 "the protocol versions on node %d is not compatible, "
1008 "disconnecting\n",
1009 dlm->dlm_locking_proto.pv_major,
1010 dlm->dlm_locking_proto.pv_minor,
1011 dlm->fs_locking_proto.pv_major,
1012 dlm->fs_locking_proto.pv_minor,
1013 node);
1014 status = -EPROTO;
1015 *response = join_resp.packet.code;
1016 } else if (join_resp.packet.code == JOIN_OK) {
1017 *response = join_resp.packet.code;
1018 /* Use the same locking protocol as the remote node */
1019 dlm->dlm_locking_proto.pv_minor =
1020 join_resp.packet.dlm_minor;
1021 dlm->fs_locking_proto.pv_minor =
1022 join_resp.packet.fs_minor;
1023 mlog(0,
1024 "Node %d responds JOIN_OK with DLM locking protocol "
1025 "%u.%u and fs locking protocol %u.%u\n",
1026 node,
1027 dlm->dlm_locking_proto.pv_major,
1028 dlm->dlm_locking_proto.pv_minor,
1029 dlm->fs_locking_proto.pv_major,
1030 dlm->fs_locking_proto.pv_minor);
935 } else { 1031 } else {
936 status = -EINVAL; 1032 status = -EINVAL;
937 mlog(ML_ERROR, "invalid response %d from node %u\n", retval, 1033 mlog(ML_ERROR, "invalid response %d from node %u\n",
938 node); 1034 join_resp.packet.code, node);
939 } 1035 }
940 1036
941 mlog(0, "status %d, node %d response is %d\n", status, node, 1037 mlog(0, "status %d, node %d response is %d\n", status, node,
@@ -1008,7 +1104,7 @@ struct domain_join_ctxt {
1008 1104
1009static int dlm_should_restart_join(struct dlm_ctxt *dlm, 1105static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1010 struct domain_join_ctxt *ctxt, 1106 struct domain_join_ctxt *ctxt,
1011 enum dlm_query_join_response response) 1107 enum dlm_query_join_response_code response)
1012{ 1108{
1013 int ret; 1109 int ret;
1014 1110
@@ -1034,7 +1130,7 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1034{ 1130{
1035 int status = 0, tmpstat, node; 1131 int status = 0, tmpstat, node;
1036 struct domain_join_ctxt *ctxt; 1132 struct domain_join_ctxt *ctxt;
1037 enum dlm_query_join_response response = JOIN_DISALLOW; 1133 enum dlm_query_join_response_code response = JOIN_DISALLOW;
1038 1134
1039 mlog_entry("%p", dlm); 1135 mlog_entry("%p", dlm);
1040 1136
@@ -1450,10 +1546,38 @@ leave:
1450} 1546}
1451 1547
1452/* 1548/*
1453 * dlm_register_domain: one-time setup per "domain" 1549 * Compare a requested locking protocol version against the current one.
1550 *
1551 * If the major numbers are different, they are incompatible.
1552 * If the current minor is greater than the request, they are incompatible.
1553 * If the current minor is less than or equal to the request, they are
1554 * compatible, and the requester should run at the current minor version.
1555 */
1556static int dlm_protocol_compare(struct dlm_protocol_version *existing,
1557 struct dlm_protocol_version *request)
1558{
1559 if (existing->pv_major != request->pv_major)
1560 return 1;
1561
1562 if (existing->pv_minor > request->pv_minor)
1563 return 1;
1564
1565 if (existing->pv_minor < request->pv_minor)
1566 request->pv_minor = existing->pv_minor;
1567
1568 return 0;
1569}
1570
1571/*
1572 * dlm_register_domain: one-time setup per "domain".
1573 *
1574 * The filesystem passes in the requested locking version via proto.
1575 * If registration was successful, proto will contain the negotiated
1576 * locking protocol.
1454 */ 1577 */
1455struct dlm_ctxt * dlm_register_domain(const char *domain, 1578struct dlm_ctxt * dlm_register_domain(const char *domain,
1456 u32 key) 1579 u32 key,
1580 struct dlm_protocol_version *fs_proto)
1457{ 1581{
1458 int ret; 1582 int ret;
1459 struct dlm_ctxt *dlm = NULL; 1583 struct dlm_ctxt *dlm = NULL;
@@ -1496,6 +1620,15 @@ retry:
1496 goto retry; 1620 goto retry;
1497 } 1621 }
1498 1622
1623 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
1624 mlog(ML_ERROR,
1625 "Requested locking protocol version is not "
1626 "compatible with already registered domain "
1627 "\"%s\"\n", domain);
1628 ret = -EPROTO;
1629 goto leave;
1630 }
1631
1499 __dlm_get(dlm); 1632 __dlm_get(dlm);
1500 dlm->num_joins++; 1633 dlm->num_joins++;
1501 1634
@@ -1526,6 +1659,13 @@ retry:
1526 list_add_tail(&dlm->list, &dlm_domains); 1659 list_add_tail(&dlm->list, &dlm_domains);
1527 spin_unlock(&dlm_domain_lock); 1660 spin_unlock(&dlm_domain_lock);
1528 1661
1662 /*
1663 * Pass the locking protocol version into the join. If the join
1664 * succeeds, it will have the negotiated protocol set.
1665 */
1666 dlm->dlm_locking_proto = dlm_protocol;
1667 dlm->fs_locking_proto = *fs_proto;
1668
1529 ret = dlm_join_domain(dlm); 1669 ret = dlm_join_domain(dlm);
1530 if (ret) { 1670 if (ret) {
1531 mlog_errno(ret); 1671 mlog_errno(ret);
@@ -1533,6 +1673,9 @@ retry:
1533 goto leave; 1673 goto leave;
1534 } 1674 }
1535 1675
1676 /* Tell the caller what locking protocol we negotiated */
1677 *fs_proto = dlm->fs_locking_proto;
1678
1536 ret = 0; 1679 ret = 0;
1537leave: 1680leave:
1538 if (new_ctxt) 1681 if (new_ctxt)
diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
index 6639baab0798..61a000f8524c 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlm/dlmfs.c
@@ -60,6 +60,8 @@
60#define MLOG_MASK_PREFIX ML_DLMFS 60#define MLOG_MASK_PREFIX ML_DLMFS
61#include "cluster/masklog.h" 61#include "cluster/masklog.h"
62 62
63#include "ocfs2_lockingver.h"
64
63static const struct super_operations dlmfs_ops; 65static const struct super_operations dlmfs_ops;
64static const struct file_operations dlmfs_file_operations; 66static const struct file_operations dlmfs_file_operations;
65static const struct inode_operations dlmfs_dir_inode_operations; 67static const struct inode_operations dlmfs_dir_inode_operations;
@@ -70,6 +72,16 @@ static struct kmem_cache *dlmfs_inode_cache;
70struct workqueue_struct *user_dlm_worker; 72struct workqueue_struct *user_dlm_worker;
71 73
72/* 74/*
75 * This is the userdlmfs locking protocol version.
76 *
77 * See fs/ocfs2/dlmglue.c for more details on locking versions.
78 */
79static const struct dlm_protocol_version user_locking_protocol = {
80 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
81 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
82};
83
84/*
73 * decodes a set of open flags into a valid lock level and a set of flags. 85 * decodes a set of open flags into a valid lock level and a set of flags.
74 * returns < 0 if we have invalid flags 86 * returns < 0 if we have invalid flags
75 * flags which mean something to us: 87 * flags which mean something to us:
@@ -416,6 +428,7 @@ static int dlmfs_mkdir(struct inode * dir,
416 struct qstr *domain = &dentry->d_name; 428 struct qstr *domain = &dentry->d_name;
417 struct dlmfs_inode_private *ip; 429 struct dlmfs_inode_private *ip;
418 struct dlm_ctxt *dlm; 430 struct dlm_ctxt *dlm;
431 struct dlm_protocol_version proto = user_locking_protocol;
419 432
420 mlog(0, "mkdir %.*s\n", domain->len, domain->name); 433 mlog(0, "mkdir %.*s\n", domain->len, domain->name);
421 434
@@ -435,7 +448,7 @@ static int dlmfs_mkdir(struct inode * dir,
435 448
436 ip = DLMFS_I(inode); 449 ip = DLMFS_I(inode);
437 450
438 dlm = user_dlm_register_context(domain); 451 dlm = user_dlm_register_context(domain, &proto);
439 if (IS_ERR(dlm)) { 452 if (IS_ERR(dlm)) {
440 status = PTR_ERR(dlm); 453 status = PTR_ERR(dlm);
441 mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n", 454 mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c
index 7d2f578b267d..4cb1d3dae250 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlm/userdlm.c
@@ -645,7 +645,8 @@ bail:
645 return status; 645 return status;
646} 646}
647 647
648struct dlm_ctxt *user_dlm_register_context(struct qstr *name) 648struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
649 struct dlm_protocol_version *proto)
649{ 650{
650 struct dlm_ctxt *dlm; 651 struct dlm_ctxt *dlm;
651 u32 dlm_key; 652 u32 dlm_key;
@@ -661,7 +662,7 @@ struct dlm_ctxt *user_dlm_register_context(struct qstr *name)
661 662
662 snprintf(domain, name->len + 1, "%.*s", name->len, name->name); 663 snprintf(domain, name->len + 1, "%.*s", name->len, name->name);
663 664
664 dlm = dlm_register_domain(domain, dlm_key); 665 dlm = dlm_register_domain(domain, dlm_key, proto);
665 if (IS_ERR(dlm)) 666 if (IS_ERR(dlm))
666 mlog_errno(PTR_ERR(dlm)); 667 mlog_errno(PTR_ERR(dlm));
667 668
diff --git a/fs/ocfs2/dlm/userdlm.h b/fs/ocfs2/dlm/userdlm.h
index c400e93bbf79..39ec27738499 100644
--- a/fs/ocfs2/dlm/userdlm.h
+++ b/fs/ocfs2/dlm/userdlm.h
@@ -83,7 +83,8 @@ void user_dlm_write_lvb(struct inode *inode,
83void user_dlm_read_lvb(struct inode *inode, 83void user_dlm_read_lvb(struct inode *inode,
84 char *val, 84 char *val,
85 unsigned int len); 85 unsigned int len);
86struct dlm_ctxt *user_dlm_register_context(struct qstr *name); 86struct dlm_ctxt *user_dlm_register_context(struct qstr *name,
87 struct dlm_protocol_version *proto);
87void user_dlm_unregister_context(struct dlm_ctxt *dlm); 88void user_dlm_unregister_context(struct dlm_ctxt *dlm);
88 89
89struct dlmfs_inode_private { 90struct dlmfs_inode_private {