aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm')
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h21
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c2
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c103
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c24
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c57
-rw-r--r--fs/ocfs2/dlm/dlmthread.c6
6 files changed, 148 insertions, 65 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 9843ee17ea27..dc8ea666efdb 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -176,6 +176,7 @@ struct dlm_mig_lockres_priv
176{ 176{
177 struct dlm_lock_resource *lockres; 177 struct dlm_lock_resource *lockres;
178 u8 real_master; 178 u8 real_master;
179 u8 extra_ref;
179}; 180};
180 181
181struct dlm_assert_master_priv 182struct dlm_assert_master_priv
@@ -602,17 +603,19 @@ enum dlm_query_join_response_code {
602 JOIN_PROTOCOL_MISMATCH, 603 JOIN_PROTOCOL_MISMATCH,
603}; 604};
604 605
606struct dlm_query_join_packet {
607 u8 code; /* Response code. dlm_minor and fs_minor
608 are only valid if this is JOIN_OK */
609 u8 dlm_minor; /* The minor version of the protocol the
610 dlm is speaking. */
611 u8 fs_minor; /* The minor version of the protocol the
612 filesystem is speaking. */
613 u8 reserved;
614};
615
605union dlm_query_join_response { 616union dlm_query_join_response {
606 u32 intval; 617 u32 intval;
607 struct { 618 struct dlm_query_join_packet packet;
608 u8 code; /* Response code. dlm_minor and fs_minor
609 are only valid if this is JOIN_OK */
610 u8 dlm_minor; /* The minor version of the protocol the
611 dlm is speaking. */
612 u8 fs_minor; /* The minor version of the protocol the
613 filesystem is speaking. */
614 u8 reserved;
615 } packet;
616}; 619};
617 620
618struct dlm_lock_request 621struct dlm_lock_request
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index ecb4d997221e..75997b4deaf3 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -487,7 +487,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
487 "cookie=%u:%llu\n", 487 "cookie=%u:%llu\n",
488 dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)), 488 dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
489 dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie))); 489 dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
490 __dlm_print_one_lock_resource(res); 490 dlm_print_one_lock_resource(res);
491 goto leave; 491 goto leave;
492 } 492 }
493 493
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 638d2ebb892b..0879d86113e3 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -713,14 +713,46 @@ static int dlm_query_join_proto_check(char *proto_type, int node,
713 return rc; 713 return rc;
714} 714}
715 715
716/*
717 * struct dlm_query_join_packet is made up of four one-byte fields. They
718 * are effectively in big-endian order already. However, little-endian
719 * machines swap them before putting the packet on the wire (because
720 * query_join's response is a status, and that status is treated as a u32
721 * on the wire). Thus, a big-endian and little-endian machines will treat
722 * this structure differently.
723 *
724 * The solution is to have little-endian machines swap the structure when
725 * converting from the structure to the u32 representation. This will
726 * result in the structure having the correct format on the wire no matter
727 * the host endian format.
728 */
729static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
730 u32 *wire)
731{
732 union dlm_query_join_response response;
733
734 response.packet = *packet;
735 *wire = cpu_to_be32(response.intval);
736}
737
738static void dlm_query_join_wire_to_packet(u32 wire,
739 struct dlm_query_join_packet *packet)
740{
741 union dlm_query_join_response response;
742
743 response.intval = cpu_to_be32(wire);
744 *packet = response.packet;
745}
746
716static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data, 747static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
717 void **ret_data) 748 void **ret_data)
718{ 749{
719 struct dlm_query_join_request *query; 750 struct dlm_query_join_request *query;
720 union dlm_query_join_response response = { 751 struct dlm_query_join_packet packet = {
721 .packet.code = JOIN_DISALLOW, 752 .code = JOIN_DISALLOW,
722 }; 753 };
723 struct dlm_ctxt *dlm = NULL; 754 struct dlm_ctxt *dlm = NULL;
755 u32 response;
724 u8 nodenum; 756 u8 nodenum;
725 757
726 query = (struct dlm_query_join_request *) msg->buf; 758 query = (struct dlm_query_join_request *) msg->buf;
@@ -737,11 +769,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
737 mlog(0, "node %u is not in our live map yet\n", 769 mlog(0, "node %u is not in our live map yet\n",
738 query->node_idx); 770 query->node_idx);
739 771
740 response.packet.code = JOIN_DISALLOW; 772 packet.code = JOIN_DISALLOW;
741 goto respond; 773 goto respond;
742 } 774 }
743 775
744 response.packet.code = JOIN_OK_NO_MAP; 776 packet.code = JOIN_OK_NO_MAP;
745 777
746 spin_lock(&dlm_domain_lock); 778 spin_lock(&dlm_domain_lock);
747 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 779 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
@@ -760,7 +792,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
760 mlog(0, "disallow join as node %u does not " 792 mlog(0, "disallow join as node %u does not "
761 "have node %u in its nodemap\n", 793 "have node %u in its nodemap\n",
762 query->node_idx, nodenum); 794 query->node_idx, nodenum);
763 response.packet.code = JOIN_DISALLOW; 795 packet.code = JOIN_DISALLOW;
764 goto unlock_respond; 796 goto unlock_respond;
765 } 797 }
766 } 798 }
@@ -780,23 +812,23 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
780 /*If this is a brand new context and we 812 /*If this is a brand new context and we
781 * haven't started our join process yet, then 813 * haven't started our join process yet, then
782 * the other node won the race. */ 814 * the other node won the race. */
783 response.packet.code = JOIN_OK_NO_MAP; 815 packet.code = JOIN_OK_NO_MAP;
784 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 816 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
785 /* Disallow parallel joins. */ 817 /* Disallow parallel joins. */
786 response.packet.code = JOIN_DISALLOW; 818 packet.code = JOIN_DISALLOW;
787 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 819 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
788 mlog(0, "node %u trying to join, but recovery " 820 mlog(0, "node %u trying to join, but recovery "
789 "is ongoing.\n", bit); 821 "is ongoing.\n", bit);
790 response.packet.code = JOIN_DISALLOW; 822 packet.code = JOIN_DISALLOW;
791 } else if (test_bit(bit, dlm->recovery_map)) { 823 } else if (test_bit(bit, dlm->recovery_map)) {
792 mlog(0, "node %u trying to join, but it " 824 mlog(0, "node %u trying to join, but it "
793 "still needs recovery.\n", bit); 825 "still needs recovery.\n", bit);
794 response.packet.code = JOIN_DISALLOW; 826 packet.code = JOIN_DISALLOW;
795 } else if (test_bit(bit, dlm->domain_map)) { 827 } else if (test_bit(bit, dlm->domain_map)) {
796 mlog(0, "node %u trying to join, but it " 828 mlog(0, "node %u trying to join, but it "
797 "is still in the domain! needs recovery?\n", 829 "is still in the domain! needs recovery?\n",
798 bit); 830 bit);
799 response.packet.code = JOIN_DISALLOW; 831 packet.code = JOIN_DISALLOW;
800 } else { 832 } else {
801 /* Alright we're fully a part of this domain 833 /* Alright we're fully a part of this domain
802 * so we keep some state as to who's joining 834 * so we keep some state as to who's joining
@@ -807,19 +839,15 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
807 if (dlm_query_join_proto_check("DLM", bit, 839 if (dlm_query_join_proto_check("DLM", bit,
808 &dlm->dlm_locking_proto, 840 &dlm->dlm_locking_proto,
809 &query->dlm_proto)) { 841 &query->dlm_proto)) {
810 response.packet.code = 842 packet.code = JOIN_PROTOCOL_MISMATCH;
811 JOIN_PROTOCOL_MISMATCH;
812 } else if (dlm_query_join_proto_check("fs", bit, 843 } else if (dlm_query_join_proto_check("fs", bit,
813 &dlm->fs_locking_proto, 844 &dlm->fs_locking_proto,
814 &query->fs_proto)) { 845 &query->fs_proto)) {
815 response.packet.code = 846 packet.code = JOIN_PROTOCOL_MISMATCH;
816 JOIN_PROTOCOL_MISMATCH;
817 } else { 847 } else {
818 response.packet.dlm_minor = 848 packet.dlm_minor = query->dlm_proto.pv_minor;
819 query->dlm_proto.pv_minor; 849 packet.fs_minor = query->fs_proto.pv_minor;
820 response.packet.fs_minor = 850 packet.code = JOIN_OK;
821 query->fs_proto.pv_minor;
822 response.packet.code = JOIN_OK;
823 __dlm_set_joining_node(dlm, query->node_idx); 851 __dlm_set_joining_node(dlm, query->node_idx);
824 } 852 }
825 } 853 }
@@ -830,9 +858,10 @@ unlock_respond:
830 spin_unlock(&dlm_domain_lock); 858 spin_unlock(&dlm_domain_lock);
831 859
832respond: 860respond:
833 mlog(0, "We respond with %u\n", response.packet.code); 861 mlog(0, "We respond with %u\n", packet.code);
834 862
835 return response.intval; 863 dlm_query_join_packet_to_wire(&packet, &response);
864 return response;
836} 865}
837 866
838static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, 867static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -937,7 +966,7 @@ static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
937 sizeof(unsigned long))) { 966 sizeof(unsigned long))) {
938 mlog(ML_ERROR, 967 mlog(ML_ERROR,
939 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n", 968 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
940 map_size, BITS_TO_LONGS(O2NM_MAX_NODES)); 969 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
941 return -EINVAL; 970 return -EINVAL;
942 } 971 }
943 972
@@ -968,7 +997,8 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
968{ 997{
969 int status; 998 int status;
970 struct dlm_query_join_request join_msg; 999 struct dlm_query_join_request join_msg;
971 union dlm_query_join_response join_resp; 1000 struct dlm_query_join_packet packet;
1001 u32 join_resp;
972 1002
973 mlog(0, "querying node %d\n", node); 1003 mlog(0, "querying node %d\n", node);
974 1004
@@ -984,11 +1014,12 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
984 1014
985 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 1015 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
986 sizeof(join_msg), node, 1016 sizeof(join_msg), node,
987 &join_resp.intval); 1017 &join_resp);
988 if (status < 0 && status != -ENOPROTOOPT) { 1018 if (status < 0 && status != -ENOPROTOOPT) {
989 mlog_errno(status); 1019 mlog_errno(status);
990 goto bail; 1020 goto bail;
991 } 1021 }
1022 dlm_query_join_wire_to_packet(join_resp, &packet);
992 1023
993 /* -ENOPROTOOPT from the net code means the other side isn't 1024 /* -ENOPROTOOPT from the net code means the other side isn't
994 listening for our message type -- that's fine, it means 1025 listening for our message type -- that's fine, it means
@@ -997,10 +1028,10 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
997 if (status == -ENOPROTOOPT) { 1028 if (status == -ENOPROTOOPT) {
998 status = 0; 1029 status = 0;
999 *response = JOIN_OK_NO_MAP; 1030 *response = JOIN_OK_NO_MAP;
1000 } else if (join_resp.packet.code == JOIN_DISALLOW || 1031 } else if (packet.code == JOIN_DISALLOW ||
1001 join_resp.packet.code == JOIN_OK_NO_MAP) { 1032 packet.code == JOIN_OK_NO_MAP) {
1002 *response = join_resp.packet.code; 1033 *response = packet.code;
1003 } else if (join_resp.packet.code == JOIN_PROTOCOL_MISMATCH) { 1034 } else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
1004 mlog(ML_NOTICE, 1035 mlog(ML_NOTICE,
1005 "This node requested DLM locking protocol %u.%u and " 1036 "This node requested DLM locking protocol %u.%u and "
1006 "filesystem locking protocol %u.%u. At least one of " 1037 "filesystem locking protocol %u.%u. At least one of "
@@ -1012,14 +1043,12 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
1012 dlm->fs_locking_proto.pv_minor, 1043 dlm->fs_locking_proto.pv_minor,
1013 node); 1044 node);
1014 status = -EPROTO; 1045 status = -EPROTO;
1015 *response = join_resp.packet.code; 1046 *response = packet.code;
1016 } else if (join_resp.packet.code == JOIN_OK) { 1047 } else if (packet.code == JOIN_OK) {
1017 *response = join_resp.packet.code; 1048 *response = packet.code;
1018 /* Use the same locking protocol as the remote node */ 1049 /* Use the same locking protocol as the remote node */
1019 dlm->dlm_locking_proto.pv_minor = 1050 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1020 join_resp.packet.dlm_minor; 1051 dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1021 dlm->fs_locking_proto.pv_minor =
1022 join_resp.packet.fs_minor;
1023 mlog(0, 1052 mlog(0,
1024 "Node %d responds JOIN_OK with DLM locking protocol " 1053 "Node %d responds JOIN_OK with DLM locking protocol "
1025 "%u.%u and fs locking protocol %u.%u\n", 1054 "%u.%u and fs locking protocol %u.%u\n",
@@ -1031,11 +1060,11 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
1031 } else { 1060 } else {
1032 status = -EINVAL; 1061 status = -EINVAL;
1033 mlog(ML_ERROR, "invalid response %d from node %u\n", 1062 mlog(ML_ERROR, "invalid response %d from node %u\n",
1034 join_resp.packet.code, node); 1063 packet.code, node);
1035 } 1064 }
1036 1065
1037 mlog(0, "status %d, node %d response is %d\n", status, node, 1066 mlog(0, "status %d, node %d response is %d\n", status, node,
1038 *response); 1067 *response);
1039 1068
1040bail: 1069bail:
1041 return status; 1070 return status;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index a54d33d95ada..ea6b89577860 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -1663,7 +1663,12 @@ way_up_top:
1663 dlm_put_mle(tmpmle); 1663 dlm_put_mle(tmpmle);
1664 } 1664 }
1665send_response: 1665send_response:
1666 1666 /*
1667 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1668 * The reference is released by dlm_assert_master_worker() under
1669 * the call to dlm_dispatch_assert_master(). If
1670 * dlm_assert_master_worker() isn't called, we drop it here.
1671 */
1667 if (dispatch_assert) { 1672 if (dispatch_assert) {
1668 if (response != DLM_MASTER_RESP_YES) 1673 if (response != DLM_MASTER_RESP_YES)
1669 mlog(ML_ERROR, "invalid response %d\n", response); 1674 mlog(ML_ERROR, "invalid response %d\n", response);
@@ -1678,7 +1683,11 @@ send_response:
1678 if (ret < 0) { 1683 if (ret < 0) {
1679 mlog(ML_ERROR, "failed to dispatch assert master work\n"); 1684 mlog(ML_ERROR, "failed to dispatch assert master work\n");
1680 response = DLM_MASTER_RESP_ERROR; 1685 response = DLM_MASTER_RESP_ERROR;
1686 dlm_lockres_put(res);
1681 } 1687 }
1688 } else {
1689 if (res)
1690 dlm_lockres_put(res);
1682 } 1691 }
1683 1692
1684 dlm_put(dlm); 1693 dlm_put(dlm);
@@ -1695,9 +1704,9 @@ send_response:
1695 * can periodically run all locks owned by this node 1704 * can periodically run all locks owned by this node
1696 * and re-assert across the cluster... 1705 * and re-assert across the cluster...
1697 */ 1706 */
1698int dlm_do_assert_master(struct dlm_ctxt *dlm, 1707static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1699 struct dlm_lock_resource *res, 1708 struct dlm_lock_resource *res,
1700 void *nodemap, u32 flags) 1709 void *nodemap, u32 flags)
1701{ 1710{
1702 struct dlm_assert_master assert; 1711 struct dlm_assert_master assert;
1703 int to, tmpret; 1712 int to, tmpret;
@@ -2348,7 +2357,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2348 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " 2357 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2349 "but it is already dropped!\n", dlm->name, 2358 "but it is already dropped!\n", dlm->name,
2350 res->lockname.len, res->lockname.name, node); 2359 res->lockname.len, res->lockname.name, node);
2351 __dlm_print_one_lock_resource(res); 2360 dlm_print_one_lock_resource(res);
2352 } 2361 }
2353 ret = 0; 2362 ret = 0;
2354 goto done; 2363 goto done;
@@ -2408,7 +2417,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2408 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " 2417 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2409 "but it is already dropped!\n", dlm->name, 2418 "but it is already dropped!\n", dlm->name,
2410 res->lockname.len, res->lockname.name, node); 2419 res->lockname.len, res->lockname.name, node);
2411 __dlm_print_one_lock_resource(res); 2420 dlm_print_one_lock_resource(res);
2412 } 2421 }
2413 2422
2414 dlm_lockres_put(res); 2423 dlm_lockres_put(res);
@@ -2933,6 +2942,9 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2933 dlm_lockres_clear_refmap_bit(lock->ml.node, res); 2942 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2934 list_del_init(&lock->list); 2943 list_del_init(&lock->list);
2935 dlm_lock_put(lock); 2944 dlm_lock_put(lock);
2945 /* In a normal unlock, we would have added a
2946 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2947 dlm_lock_put(lock);
2936 } 2948 }
2937 } 2949 }
2938 queue++; 2950 queue++;
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 91f747b8a538..bcb9260c3735 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -519,9 +519,9 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
519 return 0; 519 return 0;
520 520
521master_here: 521master_here:
522 mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n", 522 mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node "
523 task_pid_nr(dlm->dlm_reco_thread_task), 523 "%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task),
524 dlm->name, dlm->reco.dead_node, dlm->node_num); 524 dlm->node_num, dlm->reco.dead_node, dlm->name);
525 525
526 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); 526 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
527 if (status < 0) { 527 if (status < 0) {
@@ -1191,7 +1191,7 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1191 (ml->type == LKM_EXMODE || 1191 (ml->type == LKM_EXMODE ||
1192 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { 1192 memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
1193 mlog(ML_ERROR, "mismatched lvbs!\n"); 1193 mlog(ML_ERROR, "mismatched lvbs!\n");
1194 __dlm_print_one_lock_resource(lock->lockres); 1194 dlm_print_one_lock_resource(lock->lockres);
1195 BUG(); 1195 BUG();
1196 } 1196 }
1197 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); 1197 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
@@ -1327,6 +1327,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
1327 (struct dlm_migratable_lockres *)msg->buf; 1327 (struct dlm_migratable_lockres *)msg->buf;
1328 int ret = 0; 1328 int ret = 0;
1329 u8 real_master; 1329 u8 real_master;
1330 u8 extra_refs = 0;
1330 char *buf = NULL; 1331 char *buf = NULL;
1331 struct dlm_work_item *item = NULL; 1332 struct dlm_work_item *item = NULL;
1332 struct dlm_lock_resource *res = NULL; 1333 struct dlm_lock_resource *res = NULL;
@@ -1404,16 +1405,28 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
1404 __dlm_insert_lockres(dlm, res); 1405 __dlm_insert_lockres(dlm, res);
1405 spin_unlock(&dlm->spinlock); 1406 spin_unlock(&dlm->spinlock);
1406 1407
1408 /* Add an extra ref for this lock-less lockres lest the
1409 * dlm_thread purges it before we get the chance to add
1410 * locks to it */
1411 dlm_lockres_get(res);
1412
1413 /* There are three refs that need to be put.
1414 * 1. Taken above.
1415 * 2. kref_init in dlm_new_lockres()->dlm_init_lockres().
1416 * 3. dlm_lookup_lockres()
1417 * The first one is handled at the end of this function. The
1418 * other two are handled in the worker thread after locks have
1419 * been attached. Yes, we don't wait for purge time to match
1420 * kref_init. The lockres will still have atleast one ref
1421 * added because it is in the hash __dlm_insert_lockres() */
1422 extra_refs++;
1423
1407 /* now that the new lockres is inserted, 1424 /* now that the new lockres is inserted,
1408 * make it usable by other processes */ 1425 * make it usable by other processes */
1409 spin_lock(&res->spinlock); 1426 spin_lock(&res->spinlock);
1410 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1427 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1411 spin_unlock(&res->spinlock); 1428 spin_unlock(&res->spinlock);
1412 wake_up(&res->wq); 1429 wake_up(&res->wq);
1413
1414 /* add an extra ref for just-allocated lockres
1415 * otherwise the lockres will be purged immediately */
1416 dlm_lockres_get(res);
1417 } 1430 }
1418 1431
1419 /* at this point we have allocated everything we need, 1432 /* at this point we have allocated everything we need,
@@ -1443,12 +1456,17 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
1443 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); 1456 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);
1444 item->u.ml.lockres = res; /* already have a ref */ 1457 item->u.ml.lockres = res; /* already have a ref */
1445 item->u.ml.real_master = real_master; 1458 item->u.ml.real_master = real_master;
1459 item->u.ml.extra_ref = extra_refs;
1446 spin_lock(&dlm->work_lock); 1460 spin_lock(&dlm->work_lock);
1447 list_add_tail(&item->list, &dlm->work_list); 1461 list_add_tail(&item->list, &dlm->work_list);
1448 spin_unlock(&dlm->work_lock); 1462 spin_unlock(&dlm->work_lock);
1449 queue_work(dlm->dlm_worker, &dlm->dispatched_work); 1463 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1450 1464
1451leave: 1465leave:
1466 /* One extra ref taken needs to be put here */
1467 if (extra_refs)
1468 dlm_lockres_put(res);
1469
1452 dlm_put(dlm); 1470 dlm_put(dlm);
1453 if (ret < 0) { 1471 if (ret < 0) {
1454 if (buf) 1472 if (buf)
@@ -1464,17 +1482,19 @@ leave:
1464 1482
1465static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data) 1483static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
1466{ 1484{
1467 struct dlm_ctxt *dlm = data; 1485 struct dlm_ctxt *dlm;
1468 struct dlm_migratable_lockres *mres; 1486 struct dlm_migratable_lockres *mres;
1469 int ret = 0; 1487 int ret = 0;
1470 struct dlm_lock_resource *res; 1488 struct dlm_lock_resource *res;
1471 u8 real_master; 1489 u8 real_master;
1490 u8 extra_ref;
1472 1491
1473 dlm = item->dlm; 1492 dlm = item->dlm;
1474 mres = (struct dlm_migratable_lockres *)data; 1493 mres = (struct dlm_migratable_lockres *)data;
1475 1494
1476 res = item->u.ml.lockres; 1495 res = item->u.ml.lockres;
1477 real_master = item->u.ml.real_master; 1496 real_master = item->u.ml.real_master;
1497 extra_ref = item->u.ml.extra_ref;
1478 1498
1479 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1499 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1480 /* this case is super-rare. only occurs if 1500 /* this case is super-rare. only occurs if
@@ -1517,6 +1537,12 @@ again:
1517 } 1537 }
1518 1538
1519leave: 1539leave:
1540 /* See comment in dlm_mig_lockres_handler() */
1541 if (res) {
1542 if (extra_ref)
1543 dlm_lockres_put(res);
1544 dlm_lockres_put(res);
1545 }
1520 kfree(data); 1546 kfree(data);
1521 mlog_exit(ret); 1547 mlog_exit(ret);
1522} 1548}
@@ -1644,7 +1670,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
1644 /* retry!? */ 1670 /* retry!? */
1645 BUG(); 1671 BUG();
1646 } 1672 }
1647 } 1673 } else /* put.. incase we are not the master */
1674 dlm_lockres_put(res);
1648 spin_unlock(&res->spinlock); 1675 spin_unlock(&res->spinlock);
1649 } 1676 }
1650 spin_unlock(&dlm->spinlock); 1677 spin_unlock(&dlm->spinlock);
@@ -1921,6 +1948,7 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
1921 "Recovering res %s:%.*s, is already on recovery list!\n", 1948 "Recovering res %s:%.*s, is already on recovery list!\n",
1922 dlm->name, res->lockname.len, res->lockname.name); 1949 dlm->name, res->lockname.len, res->lockname.name);
1923 list_del_init(&res->recovering); 1950 list_del_init(&res->recovering);
1951 dlm_lockres_put(res);
1924 } 1952 }
1925 /* We need to hold a reference while on the recovery list */ 1953 /* We need to hold a reference while on the recovery list */
1926 dlm_lockres_get(res); 1954 dlm_lockres_get(res);
@@ -2130,11 +2158,16 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2130 assert_spin_locked(&dlm->spinlock); 2158 assert_spin_locked(&dlm->spinlock);
2131 assert_spin_locked(&res->spinlock); 2159 assert_spin_locked(&res->spinlock);
2132 2160
2161 /* We do two dlm_lock_put(). One for removing from list and the other is
2162 * to force the DLM_UNLOCK_FREE_LOCK action so as to free the locks */
2163
2133 /* TODO: check pending_asts, pending_basts here */ 2164 /* TODO: check pending_asts, pending_basts here */
2134 list_for_each_entry_safe(lock, next, &res->granted, list) { 2165 list_for_each_entry_safe(lock, next, &res->granted, list) {
2135 if (lock->ml.node == dead_node) { 2166 if (lock->ml.node == dead_node) {
2136 list_del_init(&lock->list); 2167 list_del_init(&lock->list);
2137 dlm_lock_put(lock); 2168 dlm_lock_put(lock);
2169 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
2170 dlm_lock_put(lock);
2138 freed++; 2171 freed++;
2139 } 2172 }
2140 } 2173 }
@@ -2142,6 +2175,8 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2142 if (lock->ml.node == dead_node) { 2175 if (lock->ml.node == dead_node) {
2143 list_del_init(&lock->list); 2176 list_del_init(&lock->list);
2144 dlm_lock_put(lock); 2177 dlm_lock_put(lock);
2178 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
2179 dlm_lock_put(lock);
2145 freed++; 2180 freed++;
2146 } 2181 }
2147 } 2182 }
@@ -2149,6 +2184,8 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2149 if (lock->ml.node == dead_node) { 2184 if (lock->ml.node == dead_node) {
2150 list_del_init(&lock->list); 2185 list_del_init(&lock->list);
2151 dlm_lock_put(lock); 2186 dlm_lock_put(lock);
2187 /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
2188 dlm_lock_put(lock);
2152 freed++; 2189 freed++;
2153 } 2190 }
2154 } 2191 }
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index cebd089f8955..4060bb328bc8 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -176,12 +176,14 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm,
176 res->lockname.name, master); 176 res->lockname.name, master);
177 177
178 if (!master) { 178 if (!master) {
179 /* drop spinlock... retake below */
180 spin_unlock(&dlm->spinlock);
181
179 spin_lock(&res->spinlock); 182 spin_lock(&res->spinlock);
180 /* This ensures that clear refmap is sent after the set */ 183 /* This ensures that clear refmap is sent after the set */
181 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 184 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
182 spin_unlock(&res->spinlock); 185 spin_unlock(&res->spinlock);
183 /* drop spinlock to do messaging, retake below */ 186
184 spin_unlock(&dlm->spinlock);
185 /* clear our bit from the master's refmap, ignore errors */ 187 /* clear our bit from the master's refmap, ignore errors */
186 ret = dlm_drop_lockres_ref(dlm, res); 188 ret = dlm_drop_lockres_ref(dlm, res);
187 if (ret < 0) { 189 if (ret < 0) {