aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c955
1 files changed, 739 insertions, 216 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e725005fafd0..d8d6e729f96b 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -85,6 +85,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms); 86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms); 87static int receive_extralen(struct dlm_message *ms);
88static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
88 89
89/* 90/*
90 * Lock compatibilty matrix - thanks Steve 91 * Lock compatibilty matrix - thanks Steve
@@ -223,6 +224,16 @@ static inline int is_demoted(struct dlm_lkb *lkb)
223 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224} 225}
225 226
227static inline int is_altmode(struct dlm_lkb *lkb)
228{
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230}
231
232static inline int is_granted(struct dlm_lkb *lkb)
233{
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235}
236
226static inline int is_remote(struct dlm_rsb *r) 237static inline int is_remote(struct dlm_rsb *r)
227{ 238{
228 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
@@ -254,6 +265,22 @@ static inline int down_conversion(struct dlm_lkb *lkb)
254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255} 266}
256 267
268static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269{
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271}
272
273static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274{
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276}
277
278static inline int is_overlap(struct dlm_lkb *lkb)
279{
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
282}
283
257static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258{ 285{
259 if (is_master_copy(lkb)) 286 if (is_master_copy(lkb))
@@ -267,6 +294,12 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
267 dlm_add_ast(lkb, AST_COMP); 294 dlm_add_ast(lkb, AST_COMP);
268} 295}
269 296
297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298{
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301}
302
270static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271{ 304{
272 if (is_master_copy(lkb)) 305 if (is_master_copy(lkb))
@@ -547,6 +580,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
547 lkb->lkb_grmode = DLM_LOCK_IV; 580 lkb->lkb_grmode = DLM_LOCK_IV;
548 kref_init(&lkb->lkb_ref); 581 kref_init(&lkb->lkb_ref);
549 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
550 584
551 get_random_bytes(&bucket, sizeof(bucket)); 585 get_random_bytes(&bucket, sizeof(bucket));
552 bucket &= (ls->ls_lkbtbl_size - 1); 586 bucket &= (ls->ls_lkbtbl_size - 1);
@@ -556,7 +590,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
556 /* counter can roll over so we must verify lkid is not in use */ 590 /* counter can roll over so we must verify lkid is not in use */
557 591
558 while (lkid == 0) { 592 while (lkid == 0) {
559 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16); 593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
560 594
561 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, 595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 lkb_idtbl_list) { 596 lkb_idtbl_list) {
@@ -577,8 +611,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
577 611
578static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) 612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579{ 613{
580 uint16_t bucket = lkid & 0xFFFF;
581 struct dlm_lkb *lkb; 614 struct dlm_lkb *lkb;
615 uint16_t bucket = (lkid >> 16);
582 616
583 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { 617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 if (lkb->lkb_id == lkid) 618 if (lkb->lkb_id == lkid)
@@ -590,7 +624,7 @@ static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
590static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591{ 625{
592 struct dlm_lkb *lkb; 626 struct dlm_lkb *lkb;
593 uint16_t bucket = lkid & 0xFFFF; 627 uint16_t bucket = (lkid >> 16);
594 628
595 if (bucket >= ls->ls_lkbtbl_size) 629 if (bucket >= ls->ls_lkbtbl_size)
596 return -EBADSLT; 630 return -EBADSLT;
@@ -620,7 +654,7 @@ static void kill_lkb(struct kref *kref)
620 654
621static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
622{ 656{
623 uint16_t bucket = lkb->lkb_id & 0xFFFF; 657 uint16_t bucket = (lkb->lkb_id >> 16);
624 658
625 write_lock(&ls->ls_lkbtbl[bucket].lock); 659 write_lock(&ls->ls_lkbtbl[bucket].lock);
626 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
@@ -735,23 +769,75 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
735 unhold_lkb(lkb); 769 unhold_lkb(lkb);
736} 770}
737 771
772static int msg_reply_type(int mstype)
773{
774 switch (mstype) {
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
779 case DLM_MSG_UNLOCK:
780 return DLM_MSG_UNLOCK_REPLY;
781 case DLM_MSG_CANCEL:
782 return DLM_MSG_CANCEL_REPLY;
783 case DLM_MSG_LOOKUP:
784 return DLM_MSG_LOOKUP_REPLY;
785 }
786 return -1;
787}
788
738/* add/remove lkb from global waiters list of lkb's waiting for 789/* add/remove lkb from global waiters list of lkb's waiting for
739 a reply from a remote node */ 790 a reply from a remote node */
740 791
741static void add_to_waiters(struct dlm_lkb *lkb, int mstype) 792static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
742{ 793{
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795 int error = 0;
744 796
745 mutex_lock(&ls->ls_waiters_mutex); 797 mutex_lock(&ls->ls_waiters_mutex);
746 if (lkb->lkb_wait_type) { 798
747 log_print("add_to_waiters error %d", lkb->lkb_wait_type); 799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 error = -EINVAL;
802 goto out;
803 }
804
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 switch (mstype) {
807 case DLM_MSG_UNLOCK:
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 break;
810 case DLM_MSG_CANCEL:
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 break;
813 default:
814 error = -EBUSY;
815 goto out;
816 }
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
819
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
748 goto out; 823 goto out;
749 } 824 }
825
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
829
830 lkb->lkb_wait_count++;
750 lkb->lkb_wait_type = mstype; 831 lkb->lkb_wait_type = mstype;
751 kref_get(&lkb->lkb_ref); 832 hold_lkb(lkb);
752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753 out: 834 out:
835 if (error)
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
754 mutex_unlock(&ls->ls_waiters_mutex); 839 mutex_unlock(&ls->ls_waiters_mutex);
840 return error;
755} 841}
756 842
757/* We clear the RESEND flag because we might be taking an lkb off the waiters 843/* We clear the RESEND flag because we might be taking an lkb off the waiters
@@ -759,34 +845,85 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
759 request reply on the requestqueue) between dlm_recover_waiters_pre() which 845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
760 set RESEND and dlm_recover_waiters_post() */ 846 set RESEND and dlm_recover_waiters_post() */
761 847
762static int _remove_from_waiters(struct dlm_lkb *lkb) 848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
763{ 849{
764 int error = 0; 850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
765 852
766 if (!lkb->lkb_wait_type) { 853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
767 log_print("remove_from_waiters error"); 854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
768 error = -EINVAL; 855 overlap_done = 1;
769 goto out; 856 goto out_del;
857 }
858
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 overlap_done = 1;
862 goto out_del;
863 }
864
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
867
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
870 goto out_del;
871 }
872
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 return -1;
876
877 out_del:
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
881 this would happen */
882
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
770 } 888 }
771 lkb->lkb_wait_type = 0; 889
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
772 lkb->lkb_flags &= ~DLM_IFL_RESEND; 892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
773 list_del(&lkb->lkb_wait_reply); 893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
774 unhold_lkb(lkb); 896 unhold_lkb(lkb);
775 out: 897 return 0;
776 return error;
777} 898}
778 899
779static int remove_from_waiters(struct dlm_lkb *lkb) 900static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
780{ 901{
781 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782 int error; 903 int error;
783 904
784 mutex_lock(&ls->ls_waiters_mutex); 905 mutex_lock(&ls->ls_waiters_mutex);
785 error = _remove_from_waiters(lkb); 906 error = _remove_from_waiters(lkb, mstype);
786 mutex_unlock(&ls->ls_waiters_mutex); 907 mutex_unlock(&ls->ls_waiters_mutex);
787 return error; 908 return error;
788} 909}
789 910
911/* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
913
914static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915{
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 int error;
918
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
924 return error;
925}
926
790static void dir_remove(struct dlm_rsb *r) 927static void dir_remove(struct dlm_rsb *r)
791{ 928{
792 int to_nodeid; 929 int to_nodeid;
@@ -988,8 +1125,14 @@ static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 _remove_lock(r, lkb); 1125 _remove_lock(r, lkb);
989} 1126}
990 1127
991static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1128/* returns: 0 did nothing
1129 1 moved lock to granted
1130 -1 removed lock */
1131
1132static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992{ 1133{
1134 int rv = 0;
1135
993 lkb->lkb_rqmode = DLM_LOCK_IV; 1136 lkb->lkb_rqmode = DLM_LOCK_IV;
994 1137
995 switch (lkb->lkb_status) { 1138 switch (lkb->lkb_status) {
@@ -997,6 +1140,7 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
997 break; 1140 break;
998 case DLM_LKSTS_CONVERT: 1141 case DLM_LKSTS_CONVERT:
999 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1142 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1143 rv = 1;
1000 break; 1144 break;
1001 case DLM_LKSTS_WAITING: 1145 case DLM_LKSTS_WAITING:
1002 del_lkb(r, lkb); 1146 del_lkb(r, lkb);
@@ -1004,15 +1148,17 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1004 /* this unhold undoes the original ref from create_lkb() 1148 /* this unhold undoes the original ref from create_lkb()
1005 so this leads to the lkb being freed */ 1149 so this leads to the lkb being freed */
1006 unhold_lkb(lkb); 1150 unhold_lkb(lkb);
1151 rv = -1;
1007 break; 1152 break;
1008 default: 1153 default:
1009 log_print("invalid status for revert %d", lkb->lkb_status); 1154 log_print("invalid status for revert %d", lkb->lkb_status);
1010 } 1155 }
1156 return rv;
1011} 1157}
1012 1158
1013static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1159static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1014{ 1160{
1015 revert_lock(r, lkb); 1161 return revert_lock(r, lkb);
1016} 1162}
1017 1163
1018static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1164static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1055,6 +1201,50 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1055 queue_cast(r, lkb, 0); 1201 queue_cast(r, lkb, 0);
1056} 1202}
1057 1203
1204/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205 change the granted/requested modes. We're munging things accordingly in
1206 the process copy.
1207 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208 conversion deadlock
1209 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210 compatible with other granted locks */
1211
1212static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1213{
1214 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1215 log_print("munge_demoted %x invalid reply type %d",
1216 lkb->lkb_id, ms->m_type);
1217 return;
1218 }
1219
1220 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1221 log_print("munge_demoted %x invalid modes gr %d rq %d",
1222 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1223 return;
1224 }
1225
1226 lkb->lkb_grmode = DLM_LOCK_NL;
1227}
1228
1229static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1230{
1231 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1232 ms->m_type != DLM_MSG_GRANT) {
1233 log_print("munge_altmode %x invalid reply type %d",
1234 lkb->lkb_id, ms->m_type);
1235 return;
1236 }
1237
1238 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1239 lkb->lkb_rqmode = DLM_LOCK_PR;
1240 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1241 lkb->lkb_rqmode = DLM_LOCK_CW;
1242 else {
1243 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1244 dlm_print_lkb(lkb);
1245 }
1246}
1247
1058static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 1248static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1059{ 1249{
1060 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 1250 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
@@ -1499,7 +1689,7 @@ static void process_lookup_list(struct dlm_rsb *r)
1499 struct dlm_lkb *lkb, *safe; 1689 struct dlm_lkb *lkb, *safe;
1500 1690
1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 1691 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 list_del(&lkb->lkb_rsb_lookup); 1692 list_del_init(&lkb->lkb_rsb_lookup);
1503 _request_lock(r, lkb); 1693 _request_lock(r, lkb);
1504 schedule(); 1694 schedule();
1505 } 1695 }
@@ -1530,7 +1720,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
1530 if (!list_empty(&r->res_lookup)) { 1720 if (!list_empty(&r->res_lookup)) {
1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 1721 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532 lkb_rsb_lookup); 1722 lkb_rsb_lookup);
1533 list_del(&lkb->lkb_rsb_lookup); 1723 list_del_init(&lkb->lkb_rsb_lookup);
1534 r->res_first_lkid = lkb->lkb_id; 1724 r->res_first_lkid = lkb->lkb_id;
1535 _request_lock(r, lkb); 1725 _request_lock(r, lkb);
1536 } else 1726 } else
@@ -1614,6 +1804,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1614 DLM_LKF_FORCEUNLOCK)) 1804 DLM_LKF_FORCEUNLOCK))
1615 return -EINVAL; 1805 return -EINVAL;
1616 1806
1807 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1808 return -EINVAL;
1809
1617 args->flags = flags; 1810 args->flags = flags;
1618 args->astparam = (long) astarg; 1811 args->astparam = (long) astarg;
1619 return 0; 1812 return 0;
@@ -1638,6 +1831,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1638 1831
1639 if (lkb->lkb_wait_type) 1832 if (lkb->lkb_wait_type)
1640 goto out; 1833 goto out;
1834
1835 if (is_overlap(lkb))
1836 goto out;
1641 } 1837 }
1642 1838
1643 lkb->lkb_exflags = args->flags; 1839 lkb->lkb_exflags = args->flags;
@@ -1654,35 +1850,126 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1654 return rv; 1850 return rv;
1655} 1851}
1656 1852
1853/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1854 for success */
1855
1856/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857 because there may be a lookup in progress and it's valid to do
1858 cancel/unlockf on it */
1859
1657static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 1860static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1658{ 1861{
1862 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1659 int rv = -EINVAL; 1863 int rv = -EINVAL;
1660 1864
1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1865 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1866 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1867 dlm_print_lkb(lkb);
1662 goto out; 1868 goto out;
1869 }
1663 1870
1664 if (args->flags & DLM_LKF_FORCEUNLOCK) 1871 /* an lkb may still exist even though the lock is EOL'ed due to a
1665 goto out_ok; 1872 cancel, unlock or failed noqueue request; an app can't use these
1873 locks; return same error as if the lkid had not been found at all */
1666 1874
1667 if (args->flags & DLM_LKF_CANCEL && 1875 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1668 lkb->lkb_status == DLM_LKSTS_GRANTED) 1876 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1877 rv = -ENOENT;
1669 goto out; 1878 goto out;
1879 }
1670 1880
1671 if (!(args->flags & DLM_LKF_CANCEL) && 1881 /* an lkb may be waiting for an rsb lookup to complete where the
1672 lkb->lkb_status != DLM_LKSTS_GRANTED) 1882 lookup was initiated by another lock */
1673 goto out; 1883
1884 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1885 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1886 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1887 list_del_init(&lkb->lkb_rsb_lookup);
1888 queue_cast(lkb->lkb_resource, lkb,
1889 args->flags & DLM_LKF_CANCEL ?
1890 -DLM_ECANCEL : -DLM_EUNLOCK);
1891 unhold_lkb(lkb); /* undoes create_lkb() */
1892 rv = -EBUSY;
1893 goto out;
1894 }
1895 }
1896
1897 /* cancel not allowed with another cancel/unlock in progress */
1898
1899 if (args->flags & DLM_LKF_CANCEL) {
1900 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1901 goto out;
1902
1903 if (is_overlap(lkb))
1904 goto out;
1905
1906 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908 rv = -EBUSY;
1909 goto out;
1910 }
1911
1912 switch (lkb->lkb_wait_type) {
1913 case DLM_MSG_LOOKUP:
1914 case DLM_MSG_REQUEST:
1915 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1916 rv = -EBUSY;
1917 goto out;
1918 case DLM_MSG_UNLOCK:
1919 case DLM_MSG_CANCEL:
1920 goto out;
1921 }
1922 /* add_to_waiters() will set OVERLAP_CANCEL */
1923 goto out_ok;
1924 }
1925
1926 /* do we need to allow a force-unlock if there's a normal unlock
1927 already in progress? in what conditions could the normal unlock
1928 fail such that we'd want to send a force-unlock to be sure? */
1929
1930 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1931 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1932 goto out;
1933
1934 if (is_overlap_unlock(lkb))
1935 goto out;
1674 1936
1937 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939 rv = -EBUSY;
1940 goto out;
1941 }
1942
1943 switch (lkb->lkb_wait_type) {
1944 case DLM_MSG_LOOKUP:
1945 case DLM_MSG_REQUEST:
1946 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947 rv = -EBUSY;
1948 goto out;
1949 case DLM_MSG_UNLOCK:
1950 goto out;
1951 }
1952 /* add_to_waiters() will set OVERLAP_UNLOCK */
1953 goto out_ok;
1954 }
1955
1956 /* normal unlock not allowed if there's any op in progress */
1675 rv = -EBUSY; 1957 rv = -EBUSY;
1676 if (lkb->lkb_wait_type) 1958 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1677 goto out; 1959 goto out;
1678 1960
1679 out_ok: 1961 out_ok:
1680 lkb->lkb_exflags = args->flags; 1962 /* an overlapping op shouldn't blow away exflags from other op */
1963 lkb->lkb_exflags |= args->flags;
1681 lkb->lkb_sbflags = 0; 1964 lkb->lkb_sbflags = 0;
1682 lkb->lkb_astparam = args->astparam; 1965 lkb->lkb_astparam = args->astparam;
1683
1684 rv = 0; 1966 rv = 0;
1685 out: 1967 out:
1968 if (rv)
1969 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1970 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1971 args->flags, lkb->lkb_wait_type,
1972 lkb->lkb_resource->res_name);
1686 return rv; 1973 return rv;
1687} 1974}
1688 1975
@@ -1732,9 +2019,24 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1732 goto out; 2019 goto out;
1733 } 2020 }
1734 2021
1735 if (can_be_queued(lkb)) { 2022 /* is_demoted() means the can_be_granted() above set the grmode
1736 if (is_demoted(lkb)) 2023 to NL, and left us on the granted queue. This auto-demotion
2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025 now grantable. We have to try to grant other converting locks
2026 before we try again to grant this one. */
2027
2028 if (is_demoted(lkb)) {
2029 grant_pending_convert(r, DLM_LOCK_IV);
2030 if (_can_be_granted(r, lkb, 1)) {
2031 grant_lock(r, lkb);
2032 queue_cast(r, lkb, 0);
1737 grant_pending_locks(r); 2033 grant_pending_locks(r);
2034 goto out;
2035 }
2036 /* else fall through and move to convert queue */
2037 }
2038
2039 if (can_be_queued(lkb)) {
1738 error = -EINPROGRESS; 2040 error = -EINPROGRESS;
1739 del_lkb(r, lkb); 2041 del_lkb(r, lkb);
1740 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2042 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
@@ -1759,17 +2061,19 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1759 return -DLM_EUNLOCK; 2061 return -DLM_EUNLOCK;
1760} 2062}
1761 2063
1762/* FIXME: if revert_lock() finds that the lkb is granted, we should 2064/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
1763 skip the queue_cast(ECANCEL). It indicates that the request/convert
1764 completed (and queued a normal ast) just before the cancel; we don't
1765 want to clobber the sb_result for the normal ast with ECANCEL. */
1766 2065
1767static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2066static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1768{ 2067{
1769 revert_lock(r, lkb); 2068 int error;
1770 queue_cast(r, lkb, -DLM_ECANCEL); 2069
1771 grant_pending_locks(r); 2070 error = revert_lock(r, lkb);
1772 return -DLM_ECANCEL; 2071 if (error) {
2072 queue_cast(r, lkb, -DLM_ECANCEL);
2073 grant_pending_locks(r);
2074 return -DLM_ECANCEL;
2075 }
2076 return 0;
1773} 2077}
1774 2078
1775/* 2079/*
@@ -2035,6 +2339,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2035 2339
2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 2340 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037 error = 0; 2341 error = 0;
2342 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2343 error = 0;
2038 out_put: 2344 out_put:
2039 dlm_put_lkb(lkb); 2345 dlm_put_lkb(lkb);
2040 out: 2346 out:
@@ -2065,31 +2371,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2065 * receive_lookup_reply send_lookup_reply 2371 * receive_lookup_reply send_lookup_reply
2066 */ 2372 */
2067 2373
2068static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 2374static int _create_message(struct dlm_ls *ls, int mb_len,
2069 int to_nodeid, int mstype, 2375 int to_nodeid, int mstype,
2070 struct dlm_message **ms_ret, 2376 struct dlm_message **ms_ret,
2071 struct dlm_mhandle **mh_ret) 2377 struct dlm_mhandle **mh_ret)
2072{ 2378{
2073 struct dlm_message *ms; 2379 struct dlm_message *ms;
2074 struct dlm_mhandle *mh; 2380 struct dlm_mhandle *mh;
2075 char *mb; 2381 char *mb;
2076 int mb_len = sizeof(struct dlm_message);
2077
2078 switch (mstype) {
2079 case DLM_MSG_REQUEST:
2080 case DLM_MSG_LOOKUP:
2081 case DLM_MSG_REMOVE:
2082 mb_len += r->res_length;
2083 break;
2084 case DLM_MSG_CONVERT:
2085 case DLM_MSG_UNLOCK:
2086 case DLM_MSG_REQUEST_REPLY:
2087 case DLM_MSG_CONVERT_REPLY:
2088 case DLM_MSG_GRANT:
2089 if (lkb && lkb->lkb_lvbptr)
2090 mb_len += r->res_ls->ls_lvblen;
2091 break;
2092 }
2093 2382
2094 /* get_buffer gives us a message handle (mh) that we need to 2383 /* get_buffer gives us a message handle (mh) that we need to
2095 pass into lowcomms_commit and a message buffer (mb) that we 2384 pass into lowcomms_commit and a message buffer (mb) that we
@@ -2104,7 +2393,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2104 ms = (struct dlm_message *) mb; 2393 ms = (struct dlm_message *) mb;
2105 2394
2106 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 2395 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107 ms->m_header.h_lockspace = r->res_ls->ls_global_id; 2396 ms->m_header.h_lockspace = ls->ls_global_id;
2108 ms->m_header.h_nodeid = dlm_our_nodeid(); 2397 ms->m_header.h_nodeid = dlm_our_nodeid();
2109 ms->m_header.h_length = mb_len; 2398 ms->m_header.h_length = mb_len;
2110 ms->m_header.h_cmd = DLM_MSG; 2399 ms->m_header.h_cmd = DLM_MSG;
@@ -2116,6 +2405,33 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2116 return 0; 2405 return 0;
2117} 2406}
2118 2407
2408static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409 int to_nodeid, int mstype,
2410 struct dlm_message **ms_ret,
2411 struct dlm_mhandle **mh_ret)
2412{
2413 int mb_len = sizeof(struct dlm_message);
2414
2415 switch (mstype) {
2416 case DLM_MSG_REQUEST:
2417 case DLM_MSG_LOOKUP:
2418 case DLM_MSG_REMOVE:
2419 mb_len += r->res_length;
2420 break;
2421 case DLM_MSG_CONVERT:
2422 case DLM_MSG_UNLOCK:
2423 case DLM_MSG_REQUEST_REPLY:
2424 case DLM_MSG_CONVERT_REPLY:
2425 case DLM_MSG_GRANT:
2426 if (lkb && lkb->lkb_lvbptr)
2427 mb_len += r->res_ls->ls_lvblen;
2428 break;
2429 }
2430
2431 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2432 ms_ret, mh_ret);
2433}
2434
2119/* further lowcomms enhancements or alternate implementations may make 2435/* further lowcomms enhancements or alternate implementations may make
2120 the return value from this function useful at some point */ 2436 the return value from this function useful at some point */
2121 2437
@@ -2176,7 +2492,9 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2176 struct dlm_mhandle *mh; 2492 struct dlm_mhandle *mh;
2177 int to_nodeid, error; 2493 int to_nodeid, error;
2178 2494
2179 add_to_waiters(lkb, mstype); 2495 error = add_to_waiters(lkb, mstype);
2496 if (error)
2497 return error;
2180 2498
2181 to_nodeid = r->res_nodeid; 2499 to_nodeid = r->res_nodeid;
2182 2500
@@ -2192,7 +2510,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2192 return 0; 2510 return 0;
2193 2511
2194 fail: 2512 fail:
2195 remove_from_waiters(lkb); 2513 remove_from_waiters(lkb, msg_reply_type(mstype));
2196 return error; 2514 return error;
2197} 2515}
2198 2516
@@ -2209,7 +2527,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2209 2527
2210 /* down conversions go without a reply from the master */ 2528 /* down conversions go without a reply from the master */
2211 if (!error && down_conversion(lkb)) { 2529 if (!error && down_conversion(lkb)) {
2212 remove_from_waiters(lkb); 2530 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2531 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2213 r->res_ls->ls_stub_ms.m_result = 0; 2532 r->res_ls->ls_stub_ms.m_result = 0;
2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; 2533 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2534 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
@@ -2280,7 +2599,9 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2280 struct dlm_mhandle *mh; 2599 struct dlm_mhandle *mh;
2281 int to_nodeid, error; 2600 int to_nodeid, error;
2282 2601
2283 add_to_waiters(lkb, DLM_MSG_LOOKUP); 2602 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2603 if (error)
2604 return error;
2284 2605
2285 to_nodeid = dlm_dir_nodeid(r); 2606 to_nodeid = dlm_dir_nodeid(r);
2286 2607
@@ -2296,7 +2617,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2296 return 0; 2617 return 0;
2297 2618
2298 fail: 2619 fail:
2299 remove_from_waiters(lkb); 2620 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2300 return error; 2621 return error;
2301} 2622}
2302 2623
@@ -2656,6 +2977,8 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2656 lock_rsb(r); 2977 lock_rsb(r);
2657 2978
2658 receive_flags_reply(lkb, ms); 2979 receive_flags_reply(lkb, ms);
2980 if (is_altmode(lkb))
2981 munge_altmode(lkb, ms);
2659 grant_lock_pc(r, lkb, ms); 2982 grant_lock_pc(r, lkb, ms);
2660 queue_cast(r, lkb, 0); 2983 queue_cast(r, lkb, 0);
2661 2984
@@ -2736,11 +3059,16 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2736 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 3059 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2737} 3060}
2738 3061
3062static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3063{
3064 do_purge(ls, ms->m_nodeid, ms->m_pid);
3065}
3066
2739static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 3067static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2740{ 3068{
2741 struct dlm_lkb *lkb; 3069 struct dlm_lkb *lkb;
2742 struct dlm_rsb *r; 3070 struct dlm_rsb *r;
2743 int error, mstype; 3071 int error, mstype, result;
2744 3072
2745 error = find_lkb(ls, ms->m_remid, &lkb); 3073 error = find_lkb(ls, ms->m_remid, &lkb);
2746 if (error) { 3074 if (error) {
@@ -2749,20 +3077,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2749 } 3077 }
2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3078 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2751 3079
2752 mstype = lkb->lkb_wait_type;
2753 error = remove_from_waiters(lkb);
2754 if (error) {
2755 log_error(ls, "receive_request_reply not on waiters");
2756 goto out;
2757 }
2758
2759 /* this is the value returned from do_request() on the master */
2760 error = ms->m_result;
2761
2762 r = lkb->lkb_resource; 3080 r = lkb->lkb_resource;
2763 hold_rsb(r); 3081 hold_rsb(r);
2764 lock_rsb(r); 3082 lock_rsb(r);
2765 3083
3084 mstype = lkb->lkb_wait_type;
3085 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3086 if (error)
3087 goto out;
3088
2766 /* Optimization: the dir node was also the master, so it took our 3089 /* Optimization: the dir node was also the master, so it took our
2767 lookup as a request and sent request reply instead of lookup reply */ 3090 lookup as a request and sent request reply instead of lookup reply */
2768 if (mstype == DLM_MSG_LOOKUP) { 3091 if (mstype == DLM_MSG_LOOKUP) {
@@ -2770,14 +3093,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2770 lkb->lkb_nodeid = r->res_nodeid; 3093 lkb->lkb_nodeid = r->res_nodeid;
2771 } 3094 }
2772 3095
2773 switch (error) { 3096 /* this is the value returned from do_request() on the master */
3097 result = ms->m_result;
3098
3099 switch (result) {
2774 case -EAGAIN: 3100 case -EAGAIN:
2775 /* request would block (be queued) on remote master; 3101 /* request would block (be queued) on remote master */
2776 the unhold undoes the original ref from create_lkb()
2777 so it leads to the lkb being freed */
2778 queue_cast(r, lkb, -EAGAIN); 3102 queue_cast(r, lkb, -EAGAIN);
2779 confirm_master(r, -EAGAIN); 3103 confirm_master(r, -EAGAIN);
2780 unhold_lkb(lkb); 3104 unhold_lkb(lkb); /* undoes create_lkb() */
2781 break; 3105 break;
2782 3106
2783 case -EINPROGRESS: 3107 case -EINPROGRESS:
@@ -2785,41 +3109,64 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2785 /* request was queued or granted on remote master */ 3109 /* request was queued or granted on remote master */
2786 receive_flags_reply(lkb, ms); 3110 receive_flags_reply(lkb, ms);
2787 lkb->lkb_remid = ms->m_lkid; 3111 lkb->lkb_remid = ms->m_lkid;
2788 if (error) 3112 if (is_altmode(lkb))
3113 munge_altmode(lkb, ms);
3114 if (result)
2789 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3115 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790 else { 3116 else {
2791 grant_lock_pc(r, lkb, ms); 3117 grant_lock_pc(r, lkb, ms);
2792 queue_cast(r, lkb, 0); 3118 queue_cast(r, lkb, 0);
2793 } 3119 }
2794 confirm_master(r, error); 3120 confirm_master(r, result);
2795 break; 3121 break;
2796 3122
2797 case -EBADR: 3123 case -EBADR:
2798 case -ENOTBLK: 3124 case -ENOTBLK:
2799 /* find_rsb failed to find rsb or rsb wasn't master */ 3125 /* find_rsb failed to find rsb or rsb wasn't master */
3126 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3127 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
2800 r->res_nodeid = -1; 3128 r->res_nodeid = -1;
2801 lkb->lkb_nodeid = -1; 3129 lkb->lkb_nodeid = -1;
2802 _request_lock(r, lkb); 3130
3131 if (is_overlap(lkb)) {
3132 /* we'll ignore error in cancel/unlock reply */
3133 queue_cast_overlap(r, lkb);
3134 unhold_lkb(lkb); /* undoes create_lkb() */
3135 } else
3136 _request_lock(r, lkb);
2803 break; 3137 break;
2804 3138
2805 default: 3139 default:
2806 log_error(ls, "receive_request_reply error %d", error); 3140 log_error(ls, "receive_request_reply %x error %d",
3141 lkb->lkb_id, result);
2807 } 3142 }
2808 3143
3144 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3145 log_debug(ls, "receive_request_reply %x result %d unlock",
3146 lkb->lkb_id, result);
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3149 send_unlock(r, lkb);
3150 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3151 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3153 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3154 send_cancel(r, lkb);
3155 } else {
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3157 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3158 }
3159 out:
2809 unlock_rsb(r); 3160 unlock_rsb(r);
2810 put_rsb(r); 3161 put_rsb(r);
2811 out:
2812 dlm_put_lkb(lkb); 3162 dlm_put_lkb(lkb);
2813} 3163}
2814 3164
2815static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3165static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 struct dlm_message *ms) 3166 struct dlm_message *ms)
2817{ 3167{
2818 int error = ms->m_result;
2819
2820 /* this is the value returned from do_convert() on the master */ 3168 /* this is the value returned from do_convert() on the master */
2821 3169 switch (ms->m_result) {
2822 switch (error) {
2823 case -EAGAIN: 3170 case -EAGAIN:
2824 /* convert would block (be queued) on remote master */ 3171 /* convert would block (be queued) on remote master */
2825 queue_cast(r, lkb, -EAGAIN); 3172 queue_cast(r, lkb, -EAGAIN);
@@ -2827,6 +3174,9 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2827 3174
2828 case -EINPROGRESS: 3175 case -EINPROGRESS:
2829 /* convert was queued on remote master */ 3176 /* convert was queued on remote master */
3177 receive_flags_reply(lkb, ms);
3178 if (is_demoted(lkb))
3179 munge_demoted(lkb, ms);
2830 del_lkb(r, lkb); 3180 del_lkb(r, lkb);
2831 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3181 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2832 break; 3182 break;
@@ -2834,24 +3184,33 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2834 case 0: 3184 case 0:
2835 /* convert was granted on remote master */ 3185 /* convert was granted on remote master */
2836 receive_flags_reply(lkb, ms); 3186 receive_flags_reply(lkb, ms);
3187 if (is_demoted(lkb))
3188 munge_demoted(lkb, ms);
2837 grant_lock_pc(r, lkb, ms); 3189 grant_lock_pc(r, lkb, ms);
2838 queue_cast(r, lkb, 0); 3190 queue_cast(r, lkb, 0);
2839 break; 3191 break;
2840 3192
2841 default: 3193 default:
2842 log_error(r->res_ls, "receive_convert_reply error %d", error); 3194 log_error(r->res_ls, "receive_convert_reply %x error %d",
3195 lkb->lkb_id, ms->m_result);
2843 } 3196 }
2844} 3197}
2845 3198
2846static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3199static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2847{ 3200{
2848 struct dlm_rsb *r = lkb->lkb_resource; 3201 struct dlm_rsb *r = lkb->lkb_resource;
3202 int error;
2849 3203
2850 hold_rsb(r); 3204 hold_rsb(r);
2851 lock_rsb(r); 3205 lock_rsb(r);
2852 3206
2853 __receive_convert_reply(r, lkb, ms); 3207 /* stub reply can happen with waiters_mutex held */
3208 error = remove_from_waiters_ms(lkb, ms);
3209 if (error)
3210 goto out;
2854 3211
3212 __receive_convert_reply(r, lkb, ms);
3213 out:
2855 unlock_rsb(r); 3214 unlock_rsb(r);
2856 put_rsb(r); 3215 put_rsb(r);
2857} 3216}
@@ -2868,37 +3227,38 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2868 } 3227 }
2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3228 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2870 3229
2871 error = remove_from_waiters(lkb);
2872 if (error) {
2873 log_error(ls, "receive_convert_reply not on waiters");
2874 goto out;
2875 }
2876
2877 _receive_convert_reply(lkb, ms); 3230 _receive_convert_reply(lkb, ms);
2878 out:
2879 dlm_put_lkb(lkb); 3231 dlm_put_lkb(lkb);
2880} 3232}
2881 3233
2882static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3234static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2883{ 3235{
2884 struct dlm_rsb *r = lkb->lkb_resource; 3236 struct dlm_rsb *r = lkb->lkb_resource;
2885 int error = ms->m_result; 3237 int error;
2886 3238
2887 hold_rsb(r); 3239 hold_rsb(r);
2888 lock_rsb(r); 3240 lock_rsb(r);
2889 3241
3242 /* stub reply can happen with waiters_mutex held */
3243 error = remove_from_waiters_ms(lkb, ms);
3244 if (error)
3245 goto out;
3246
2890 /* this is the value returned from do_unlock() on the master */ 3247 /* this is the value returned from do_unlock() on the master */
2891 3248
2892 switch (error) { 3249 switch (ms->m_result) {
2893 case -DLM_EUNLOCK: 3250 case -DLM_EUNLOCK:
2894 receive_flags_reply(lkb, ms); 3251 receive_flags_reply(lkb, ms);
2895 remove_lock_pc(r, lkb); 3252 remove_lock_pc(r, lkb);
2896 queue_cast(r, lkb, -DLM_EUNLOCK); 3253 queue_cast(r, lkb, -DLM_EUNLOCK);
2897 break; 3254 break;
3255 case -ENOENT:
3256 break;
2898 default: 3257 default:
2899 log_error(r->res_ls, "receive_unlock_reply error %d", error); 3258 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3259 lkb->lkb_id, ms->m_result);
2900 } 3260 }
2901 3261 out:
2902 unlock_rsb(r); 3262 unlock_rsb(r);
2903 put_rsb(r); 3263 put_rsb(r);
2904} 3264}
@@ -2915,37 +3275,39 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2915 } 3275 }
2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3276 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2917 3277
2918 error = remove_from_waiters(lkb);
2919 if (error) {
2920 log_error(ls, "receive_unlock_reply not on waiters");
2921 goto out;
2922 }
2923
2924 _receive_unlock_reply(lkb, ms); 3278 _receive_unlock_reply(lkb, ms);
2925 out:
2926 dlm_put_lkb(lkb); 3279 dlm_put_lkb(lkb);
2927} 3280}
2928 3281
2929static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3282static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930{ 3283{
2931 struct dlm_rsb *r = lkb->lkb_resource; 3284 struct dlm_rsb *r = lkb->lkb_resource;
2932 int error = ms->m_result; 3285 int error;
2933 3286
2934 hold_rsb(r); 3287 hold_rsb(r);
2935 lock_rsb(r); 3288 lock_rsb(r);
2936 3289
3290 /* stub reply can happen with waiters_mutex held */
3291 error = remove_from_waiters_ms(lkb, ms);
3292 if (error)
3293 goto out;
3294
2937 /* this is the value returned from do_cancel() on the master */ 3295 /* this is the value returned from do_cancel() on the master */
2938 3296
2939 switch (error) { 3297 switch (ms->m_result) {
2940 case -DLM_ECANCEL: 3298 case -DLM_ECANCEL:
2941 receive_flags_reply(lkb, ms); 3299 receive_flags_reply(lkb, ms);
2942 revert_lock_pc(r, lkb); 3300 revert_lock_pc(r, lkb);
2943 queue_cast(r, lkb, -DLM_ECANCEL); 3301 if (ms->m_result)
3302 queue_cast(r, lkb, -DLM_ECANCEL);
3303 break;
3304 case 0:
2944 break; 3305 break;
2945 default: 3306 default:
2946 log_error(r->res_ls, "receive_cancel_reply error %d", error); 3307 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3308 lkb->lkb_id, ms->m_result);
2947 } 3309 }
2948 3310 out:
2949 unlock_rsb(r); 3311 unlock_rsb(r);
2950 put_rsb(r); 3312 put_rsb(r);
2951} 3313}
@@ -2962,14 +3324,7 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2962 } 3324 }
2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3325 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2964 3326
2965 error = remove_from_waiters(lkb);
2966 if (error) {
2967 log_error(ls, "receive_cancel_reply not on waiters");
2968 goto out;
2969 }
2970
2971 _receive_cancel_reply(lkb, ms); 3327 _receive_cancel_reply(lkb, ms);
2972 out:
2973 dlm_put_lkb(lkb); 3328 dlm_put_lkb(lkb);
2974} 3329}
2975 3330
@@ -2985,20 +3340,17 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2985 return; 3340 return;
2986 } 3341 }
2987 3342
2988 error = remove_from_waiters(lkb); 3343 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
2989 if (error) {
2990 log_error(ls, "receive_lookup_reply not on waiters");
2991 goto out;
2992 }
2993
2994 /* this is the value returned by dlm_dir_lookup on dir node
2995 FIXME: will a non-zero error ever be returned? */ 3344 FIXME: will a non-zero error ever be returned? */
2996 error = ms->m_result;
2997 3345
2998 r = lkb->lkb_resource; 3346 r = lkb->lkb_resource;
2999 hold_rsb(r); 3347 hold_rsb(r);
3000 lock_rsb(r); 3348 lock_rsb(r);
3001 3349
3350 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3351 if (error)
3352 goto out;
3353
3002 ret_nodeid = ms->m_nodeid; 3354 ret_nodeid = ms->m_nodeid;
3003 if (ret_nodeid == dlm_our_nodeid()) { 3355 if (ret_nodeid == dlm_our_nodeid()) {
3004 r->res_nodeid = 0; 3356 r->res_nodeid = 0;
@@ -3009,14 +3361,22 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3009 r->res_nodeid = ret_nodeid; 3361 r->res_nodeid = ret_nodeid;
3010 } 3362 }
3011 3363
3364 if (is_overlap(lkb)) {
3365 log_debug(ls, "receive_lookup_reply %x unlock %x",
3366 lkb->lkb_id, lkb->lkb_flags);
3367 queue_cast_overlap(r, lkb);
3368 unhold_lkb(lkb); /* undoes create_lkb() */
3369 goto out_list;
3370 }
3371
3012 _request_lock(r, lkb); 3372 _request_lock(r, lkb);
3013 3373
3374 out_list:
3014 if (!ret_nodeid) 3375 if (!ret_nodeid)
3015 process_lookup_list(r); 3376 process_lookup_list(r);
3016 3377 out:
3017 unlock_rsb(r); 3378 unlock_rsb(r);
3018 put_rsb(r); 3379 put_rsb(r);
3019 out:
3020 dlm_put_lkb(lkb); 3380 dlm_put_lkb(lkb);
3021} 3381}
3022 3382
@@ -3133,6 +3493,12 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3133 receive_lookup_reply(ls, ms); 3493 receive_lookup_reply(ls, ms);
3134 break; 3494 break;
3135 3495
3496 /* other messages */
3497
3498 case DLM_MSG_PURGE:
3499 receive_purge(ls, ms);
3500 break;
3501
3136 default: 3502 default:
3137 log_error(ls, "unknown message type %d", ms->m_type); 3503 log_error(ls, "unknown message type %d", ms->m_type);
3138 } 3504 }
@@ -3153,9 +3519,9 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3153{ 3519{
3154 if (middle_conversion(lkb)) { 3520 if (middle_conversion(lkb)) {
3155 hold_lkb(lkb); 3521 hold_lkb(lkb);
3522 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3156 ls->ls_stub_ms.m_result = -EINPROGRESS; 3523 ls->ls_stub_ms.m_result = -EINPROGRESS;
3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3524 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158 _remove_from_waiters(lkb);
3159 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3525 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3160 3526
3161 /* Same special case as in receive_rcom_lock_args() */ 3527 /* Same special case as in receive_rcom_lock_args() */
@@ -3227,18 +3593,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3227 3593
3228 case DLM_MSG_UNLOCK: 3594 case DLM_MSG_UNLOCK:
3229 hold_lkb(lkb); 3595 hold_lkb(lkb);
3596 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK; 3597 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3598 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232 _remove_from_waiters(lkb);
3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3599 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3234 dlm_put_lkb(lkb); 3600 dlm_put_lkb(lkb);
3235 break; 3601 break;
3236 3602
3237 case DLM_MSG_CANCEL: 3603 case DLM_MSG_CANCEL:
3238 hold_lkb(lkb); 3604 hold_lkb(lkb);
3605 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL; 3606 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3607 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241 _remove_from_waiters(lkb);
3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3608 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3243 dlm_put_lkb(lkb); 3609 dlm_put_lkb(lkb);
3244 break; 3610 break;
@@ -3252,37 +3618,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3252 mutex_unlock(&ls->ls_waiters_mutex); 3618 mutex_unlock(&ls->ls_waiters_mutex);
3253} 3619}
3254 3620
3255static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 3621static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3256{ 3622{
3257 struct dlm_lkb *lkb; 3623 struct dlm_lkb *lkb;
3258 int rv = 0; 3624 int found = 0;
3259 3625
3260 mutex_lock(&ls->ls_waiters_mutex); 3626 mutex_lock(&ls->ls_waiters_mutex);
3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 3627 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3628 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 rv = lkb->lkb_wait_type; 3629 hold_lkb(lkb);
3264 _remove_from_waiters(lkb); 3630 found = 1;
3265 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266 break; 3631 break;
3267 } 3632 }
3268 } 3633 }
3269 mutex_unlock(&ls->ls_waiters_mutex); 3634 mutex_unlock(&ls->ls_waiters_mutex);
3270 3635
3271 if (!rv) 3636 if (!found)
3272 lkb = NULL; 3637 lkb = NULL;
3273 *lkb_ret = lkb; 3638 return lkb;
3274 return rv;
3275} 3639}
3276 3640
3277/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 3641/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3278 master or dir-node for r. Processing the lkb may result in it being placed 3642 master or dir-node for r. Processing the lkb may result in it being placed
3279 back on waiters. */ 3643 back on waiters. */
3280 3644
3645/* We do this after normal locking has been enabled and any saved messages
3646 (in requestqueue) have been processed. We should be confident that at
3647 this point we won't get or process a reply to any of these waiting
3648 operations. But, new ops may be coming in on the rsbs/locks here from
3649 userspace or remotely. */
3650
3651/* there may have been an overlap unlock/cancel prior to recovery or after
3652 recovery. if before, the lkb may still have a pos wait_count; if after, the
3653 overlap flag would just have been set and nothing new sent. we can be
3654 confident here than any replies to either the initial op or overlap ops
3655 prior to recovery have been received. */
3656
3281int dlm_recover_waiters_post(struct dlm_ls *ls) 3657int dlm_recover_waiters_post(struct dlm_ls *ls)
3282{ 3658{
3283 struct dlm_lkb *lkb; 3659 struct dlm_lkb *lkb;
3284 struct dlm_rsb *r; 3660 struct dlm_rsb *r;
3285 int error = 0, mstype; 3661 int error = 0, mstype, err, oc, ou;
3286 3662
3287 while (1) { 3663 while (1) {
3288 if (dlm_locking_stopped(ls)) { 3664 if (dlm_locking_stopped(ls)) {
@@ -3291,48 +3667,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
3291 break; 3667 break;
3292 } 3668 }
3293 3669
3294 mstype = remove_resend_waiter(ls, &lkb); 3670 lkb = find_resend_waiter(ls);
3295 if (!mstype) 3671 if (!lkb)
3296 break; 3672 break;
3297 3673
3298 r = lkb->lkb_resource; 3674 r = lkb->lkb_resource;
3675 hold_rsb(r);
3676 lock_rsb(r);
3677
3678 mstype = lkb->lkb_wait_type;
3679 oc = is_overlap_cancel(lkb);
3680 ou = is_overlap_unlock(lkb);
3681 err = 0;
3299 3682
3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 3683 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 3684 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3302 3685
3303 switch (mstype) { 3686 /* At this point we assume that we won't get a reply to any
3304 3687 previous op or overlap op on this lock. First, do a big
3305 case DLM_MSG_LOOKUP: 3688 remove_from_waiters() for all previous ops. */
3306 hold_rsb(r); 3689
3307 lock_rsb(r); 3690 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3308 _request_lock(r, lkb); 3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3309 if (is_master(r)) 3692 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3310 confirm_master(r, 0); 3693 lkb->lkb_wait_type = 0;
3311 unlock_rsb(r); 3694 lkb->lkb_wait_count = 0;
3312 put_rsb(r); 3695 mutex_lock(&ls->ls_waiters_mutex);
3313 break; 3696 list_del_init(&lkb->lkb_wait_reply);
3314 3697 mutex_unlock(&ls->ls_waiters_mutex);
3315 case DLM_MSG_REQUEST: 3698 unhold_lkb(lkb); /* for waiters list */
3316 hold_rsb(r); 3699
3317 lock_rsb(r); 3700 if (oc || ou) {
3318 _request_lock(r, lkb); 3701 /* do an unlock or cancel instead of resending */
3319 if (is_master(r)) 3702 switch (mstype) {
3320 confirm_master(r, 0); 3703 case DLM_MSG_LOOKUP:
3321 unlock_rsb(r); 3704 case DLM_MSG_REQUEST:
3322 put_rsb(r); 3705 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3323 break; 3706 -DLM_ECANCEL);
3324 3707 unhold_lkb(lkb); /* undoes create_lkb() */
3325 case DLM_MSG_CONVERT: 3708 break;
3326 hold_rsb(r); 3709 case DLM_MSG_CONVERT:
3327 lock_rsb(r); 3710 if (oc) {
3328 _convert_lock(r, lkb); 3711 queue_cast(r, lkb, -DLM_ECANCEL);
3329 unlock_rsb(r); 3712 } else {
3330 put_rsb(r); 3713 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3331 break; 3714 _unlock_lock(r, lkb);
3332 3715 }
3333 default: 3716 break;
3334 log_error(ls, "recover_waiters_post type %d", mstype); 3717 default:
3718 err = 1;
3719 }
3720 } else {
3721 switch (mstype) {
3722 case DLM_MSG_LOOKUP:
3723 case DLM_MSG_REQUEST:
3724 _request_lock(r, lkb);
3725 if (is_master(r))
3726 confirm_master(r, 0);
3727 break;
3728 case DLM_MSG_CONVERT:
3729 _convert_lock(r, lkb);
3730 break;
3731 default:
3732 err = 1;
3733 }
3335 } 3734 }
3735
3736 if (err)
3737 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3738 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3739 unlock_rsb(r);
3740 put_rsb(r);
3741 dlm_put_lkb(lkb);
3336 } 3742 }
3337 3743
3338 return error; 3744 return error;
@@ -3684,7 +4090,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3684 4090
3685 /* add this new lkb to the per-process list of locks */ 4091 /* add this new lkb to the per-process list of locks */
3686 spin_lock(&ua->proc->locks_spin); 4092 spin_lock(&ua->proc->locks_spin);
3687 kref_get(&lkb->lkb_ref); 4093 hold_lkb(lkb);
3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 4094 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 spin_unlock(&ua->proc->locks_spin); 4095 spin_unlock(&ua->proc->locks_spin);
3690 out: 4096 out:
@@ -3774,6 +4180,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3774 4180
3775 if (error == -DLM_EUNLOCK) 4181 if (error == -DLM_EUNLOCK)
3776 error = 0; 4182 error = 0;
4183 /* from validate_unlock_args() */
4184 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4185 error = 0;
3777 if (error) 4186 if (error)
3778 goto out_put; 4187 goto out_put;
3779 4188
@@ -3786,6 +4195,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3786 dlm_put_lkb(lkb); 4195 dlm_put_lkb(lkb);
3787 out: 4196 out:
3788 unlock_recovery(ls); 4197 unlock_recovery(ls);
4198 kfree(ua_tmp);
3789 return error; 4199 return error;
3790} 4200}
3791 4201
@@ -3815,33 +4225,37 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3815 4225
3816 if (error == -DLM_ECANCEL) 4226 if (error == -DLM_ECANCEL)
3817 error = 0; 4227 error = 0;
3818 if (error) 4228 /* from validate_unlock_args() */
3819 goto out_put; 4229 if (error == -EBUSY)
3820 4230 error = 0;
3821 /* this lkb was removed from the WAITING queue */
3822 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 spin_lock(&ua->proc->locks_spin);
3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825 spin_unlock(&ua->proc->locks_spin);
3826 }
3827 out_put: 4231 out_put:
3828 dlm_put_lkb(lkb); 4232 dlm_put_lkb(lkb);
3829 out: 4233 out:
3830 unlock_recovery(ls); 4234 unlock_recovery(ls);
4235 kfree(ua_tmp);
3831 return error; 4236 return error;
3832} 4237}
3833 4238
4239/* lkb's that are removed from the waiters list by revert are just left on the
4240 orphans list with the granted orphan locks, to be freed by purge */
4241
3834static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4242static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3835{ 4243{
3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4244 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4245 struct dlm_args args;
4246 int error;
3837 4247
3838 if (ua->lksb.sb_lvbptr) 4248 hold_lkb(lkb);
3839 kfree(ua->lksb.sb_lvbptr); 4249 mutex_lock(&ls->ls_orphans_mutex);
3840 kfree(ua); 4250 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
3841 lkb->lkb_astparam = (long)NULL; 4251 mutex_unlock(&ls->ls_orphans_mutex);
3842 4252
3843 /* TODO: propogate to master if needed */ 4253 set_unlock_args(0, ua, &args);
3844 return 0; 4254
4255 error = cancel_lock(ls, lkb, &args);
4256 if (error == -DLM_ECANCEL)
4257 error = 0;
4258 return error;
3845} 4259}
3846 4260
3847/* The force flag allows the unlock to go ahead even if the lkb isn't granted. 4261/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
@@ -3853,10 +4267,6 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3853 struct dlm_args args; 4267 struct dlm_args args;
3854 int error; 4268 int error;
3855 4269
3856 /* FIXME: we need to handle the case where the lkb is in limbo
3857 while the rsb is being looked up, currently we assert in
3858 _unlock_lock/is_remote because rsb nodeid is -1. */
3859
3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); 4270 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3861 4271
3862 error = unlock_lock(ls, lkb, &args); 4272 error = unlock_lock(ls, lkb, &args);
@@ -3865,6 +4275,31 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3865 return error; 4275 return error;
3866} 4276}
3867 4277
4278/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279 (which does lock_rsb) due to deadlock with receiving a message that does
4280 lock_rsb followed by dlm_user_add_ast() */
4281
4282static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4283 struct dlm_user_proc *proc)
4284{
4285 struct dlm_lkb *lkb = NULL;
4286
4287 mutex_lock(&ls->ls_clear_proc_locks);
4288 if (list_empty(&proc->locks))
4289 goto out;
4290
4291 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4292 list_del_init(&lkb->lkb_ownqueue);
4293
4294 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4295 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296 else
4297 lkb->lkb_flags |= DLM_IFL_DEAD;
4298 out:
4299 mutex_unlock(&ls->ls_clear_proc_locks);
4300 return lkb;
4301}
4302
3868/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4303/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4304 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870 which we clear here. */ 4305 which we clear here. */
@@ -3880,18 +4315,15 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3880 struct dlm_lkb *lkb, *safe; 4315 struct dlm_lkb *lkb, *safe;
3881 4316
3882 lock_recovery(ls); 4317 lock_recovery(ls);
3883 mutex_lock(&ls->ls_clear_proc_locks);
3884 4318
3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { 4319 while (1) {
3886 list_del_init(&lkb->lkb_ownqueue); 4320 lkb = del_proc_lock(ls, proc);
3887 4321 if (!lkb)
3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { 4322 break;
3889 lkb->lkb_flags |= DLM_IFL_ORPHAN; 4323 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
3890 orphan_proc_lock(ls, lkb); 4324 orphan_proc_lock(ls, lkb);
3891 } else { 4325 else
3892 lkb->lkb_flags |= DLM_IFL_DEAD;
3893 unlock_proc_lock(ls, lkb); 4326 unlock_proc_lock(ls, lkb);
3894 }
3895 4327
3896 /* this removes the reference for the proc->locks list 4328 /* this removes the reference for the proc->locks list
3897 added by dlm_user_request, it may result in the lkb 4329 added by dlm_user_request, it may result in the lkb
@@ -3900,6 +4332,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3900 dlm_put_lkb(lkb); 4332 dlm_put_lkb(lkb);
3901 } 4333 }
3902 4334
4335 mutex_lock(&ls->ls_clear_proc_locks);
4336
3903 /* in-progress unlocks */ 4337 /* in-progress unlocks */
3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4338 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 list_del_init(&lkb->lkb_ownqueue); 4339 list_del_init(&lkb->lkb_ownqueue);
@@ -3916,3 +4350,92 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3916 unlock_recovery(ls); 4350 unlock_recovery(ls);
3917} 4351}
3918 4352
4353static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4354{
4355 struct dlm_lkb *lkb, *safe;
4356
4357 while (1) {
4358 lkb = NULL;
4359 spin_lock(&proc->locks_spin);
4360 if (!list_empty(&proc->locks)) {
4361 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362 lkb_ownqueue);
4363 list_del_init(&lkb->lkb_ownqueue);
4364 }
4365 spin_unlock(&proc->locks_spin);
4366
4367 if (!lkb)
4368 break;
4369
4370 lkb->lkb_flags |= DLM_IFL_DEAD;
4371 unlock_proc_lock(ls, lkb);
4372 dlm_put_lkb(lkb); /* ref from proc->locks list */
4373 }
4374
4375 spin_lock(&proc->locks_spin);
4376 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4377 list_del_init(&lkb->lkb_ownqueue);
4378 lkb->lkb_flags |= DLM_IFL_DEAD;
4379 dlm_put_lkb(lkb);
4380 }
4381 spin_unlock(&proc->locks_spin);
4382
4383 spin_lock(&proc->asts_spin);
4384 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4385 list_del(&lkb->lkb_astqueue);
4386 dlm_put_lkb(lkb);
4387 }
4388 spin_unlock(&proc->asts_spin);
4389}
4390
4391/* pid of 0 means purge all orphans */
4392
4393static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4394{
4395 struct dlm_lkb *lkb, *safe;
4396
4397 mutex_lock(&ls->ls_orphans_mutex);
4398 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4399 if (pid && lkb->lkb_ownpid != pid)
4400 continue;
4401 unlock_proc_lock(ls, lkb);
4402 list_del_init(&lkb->lkb_ownqueue);
4403 dlm_put_lkb(lkb);
4404 }
4405 mutex_unlock(&ls->ls_orphans_mutex);
4406}
4407
4408static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4409{
4410 struct dlm_message *ms;
4411 struct dlm_mhandle *mh;
4412 int error;
4413
4414 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4415 DLM_MSG_PURGE, &ms, &mh);
4416 if (error)
4417 return error;
4418 ms->m_nodeid = nodeid;
4419 ms->m_pid = pid;
4420
4421 return send_message(mh, ms);
4422}
4423
4424int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4425 int nodeid, int pid)
4426{
4427 int error = 0;
4428
4429 if (nodeid != dlm_our_nodeid()) {
4430 error = send_purge(ls, nodeid, pid);
4431 } else {
4432 lock_recovery(ls);
4433 if (pid == current->pid)
4434 purge_proc_locks(ls, proc);
4435 else
4436 do_purge(ls, nodeid, pid);
4437 unlock_recovery(ls);
4438 }
4439 return error;
4440}
4441