aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/dlm_internal.h10
-rw-r--r--fs/dlm/lock.c710
-rw-r--r--fs/dlm/lockspace.c4
-rw-r--r--fs/dlm/user.c77
4 files changed, 577 insertions, 224 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 61d93201e1b2..178931cca67c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -210,6 +210,9 @@ struct dlm_args {
210#define DLM_IFL_MSTCPY 0x00010000 210#define DLM_IFL_MSTCPY 0x00010000
211#define DLM_IFL_RESEND 0x00020000 211#define DLM_IFL_RESEND 0x00020000
212#define DLM_IFL_DEAD 0x00040000 212#define DLM_IFL_DEAD 0x00040000
213#define DLM_IFL_OVERLAP_UNLOCK 0x00080000
214#define DLM_IFL_OVERLAP_CANCEL 0x00100000
215#define DLM_IFL_ENDOFLIFE 0x00200000
213#define DLM_IFL_USER 0x00000001 216#define DLM_IFL_USER 0x00000001
214#define DLM_IFL_ORPHAN 0x00000002 217#define DLM_IFL_ORPHAN 0x00000002
215 218
@@ -230,8 +233,8 @@ struct dlm_lkb {
230 int8_t lkb_grmode; /* granted lock mode */ 233 int8_t lkb_grmode; /* granted lock mode */
231 int8_t lkb_bastmode; /* requested mode */ 234 int8_t lkb_bastmode; /* requested mode */
232 int8_t lkb_highbast; /* highest mode bast sent for */ 235 int8_t lkb_highbast; /* highest mode bast sent for */
233
234 int8_t lkb_wait_type; /* type of reply waiting for */ 236 int8_t lkb_wait_type; /* type of reply waiting for */
237 int8_t lkb_wait_count;
235 int8_t lkb_ast_type; /* type of ast queued for */ 238 int8_t lkb_ast_type; /* type of ast queued for */
236 239
237 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ 240 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */
@@ -440,6 +443,9 @@ struct dlm_ls {
440 struct mutex ls_waiters_mutex; 443 struct mutex ls_waiters_mutex;
441 struct list_head ls_waiters; /* lkbs needing a reply */ 444 struct list_head ls_waiters; /* lkbs needing a reply */
442 445
446 struct mutex ls_orphans_mutex;
447 struct list_head ls_orphans;
448
443 struct list_head ls_nodes; /* current nodes in ls */ 449 struct list_head ls_nodes; /* current nodes in ls */
444 struct list_head ls_nodes_gone; /* dead node list, recovery */ 450 struct list_head ls_nodes_gone; /* dead node list, recovery */
445 int ls_num_nodes; /* number of nodes in ls */ 451 int ls_num_nodes; /* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e725005fafd0..b865a46059dd 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -254,6 +254,22 @@ static inline int down_conversion(struct dlm_lkb *lkb)
254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255} 255}
256 256
257static inline int is_overlap_unlock(struct dlm_lkb *lkb)
258{
259 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
260}
261
262static inline int is_overlap_cancel(struct dlm_lkb *lkb)
263{
264 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
265}
266
267static inline int is_overlap(struct dlm_lkb *lkb)
268{
269 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
270 DLM_IFL_OVERLAP_CANCEL));
271}
272
257static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 273static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258{ 274{
259 if (is_master_copy(lkb)) 275 if (is_master_copy(lkb))
@@ -267,6 +283,12 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
267 dlm_add_ast(lkb, AST_COMP); 283 dlm_add_ast(lkb, AST_COMP);
268} 284}
269 285
286static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
287{
288 queue_cast(r, lkb,
289 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
290}
291
270static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 292static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271{ 293{
272 if (is_master_copy(lkb)) 294 if (is_master_copy(lkb))
@@ -547,6 +569,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
547 lkb->lkb_grmode = DLM_LOCK_IV; 569 lkb->lkb_grmode = DLM_LOCK_IV;
548 kref_init(&lkb->lkb_ref); 570 kref_init(&lkb->lkb_ref);
549 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 571 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
572 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
550 573
551 get_random_bytes(&bucket, sizeof(bucket)); 574 get_random_bytes(&bucket, sizeof(bucket));
552 bucket &= (ls->ls_lkbtbl_size - 1); 575 bucket &= (ls->ls_lkbtbl_size - 1);
@@ -735,23 +758,75 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
735 unhold_lkb(lkb); 758 unhold_lkb(lkb);
736} 759}
737 760
761static int msg_reply_type(int mstype)
762{
763 switch (mstype) {
764 case DLM_MSG_REQUEST:
765 return DLM_MSG_REQUEST_REPLY;
766 case DLM_MSG_CONVERT:
767 return DLM_MSG_CONVERT_REPLY;
768 case DLM_MSG_UNLOCK:
769 return DLM_MSG_UNLOCK_REPLY;
770 case DLM_MSG_CANCEL:
771 return DLM_MSG_CANCEL_REPLY;
772 case DLM_MSG_LOOKUP:
773 return DLM_MSG_LOOKUP_REPLY;
774 }
775 return -1;
776}
777
738/* add/remove lkb from global waiters list of lkb's waiting for 778/* add/remove lkb from global waiters list of lkb's waiting for
739 a reply from a remote node */ 779 a reply from a remote node */
740 780
741static void add_to_waiters(struct dlm_lkb *lkb, int mstype) 781static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
742{ 782{
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 783 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
784 int error = 0;
744 785
745 mutex_lock(&ls->ls_waiters_mutex); 786 mutex_lock(&ls->ls_waiters_mutex);
746 if (lkb->lkb_wait_type) { 787
747 log_print("add_to_waiters error %d", lkb->lkb_wait_type); 788 if (is_overlap_unlock(lkb) ||
789 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
790 error = -EINVAL;
791 goto out;
792 }
793
794 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
795 switch (mstype) {
796 case DLM_MSG_UNLOCK:
797 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
798 break;
799 case DLM_MSG_CANCEL:
800 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
801 break;
802 default:
803 error = -EBUSY;
804 goto out;
805 }
806 lkb->lkb_wait_count++;
807 hold_lkb(lkb);
808
809 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
810 lkb->lkb_id, lkb->lkb_wait_type, mstype,
811 lkb->lkb_wait_count, lkb->lkb_flags);
748 goto out; 812 goto out;
749 } 813 }
814
815 DLM_ASSERT(!lkb->lkb_wait_count,
816 dlm_print_lkb(lkb);
817 printk("wait_count %d\n", lkb->lkb_wait_count););
818
819 lkb->lkb_wait_count++;
750 lkb->lkb_wait_type = mstype; 820 lkb->lkb_wait_type = mstype;
751 kref_get(&lkb->lkb_ref); 821 hold_lkb(lkb);
752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 822 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753 out: 823 out:
824 if (error)
825 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
826 lkb->lkb_id, error, lkb->lkb_flags, mstype,
827 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
754 mutex_unlock(&ls->ls_waiters_mutex); 828 mutex_unlock(&ls->ls_waiters_mutex);
829 return error;
755} 830}
756 831
757/* We clear the RESEND flag because we might be taking an lkb off the waiters 832/* We clear the RESEND flag because we might be taking an lkb off the waiters
@@ -759,34 +834,85 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
759 request reply on the requestqueue) between dlm_recover_waiters_pre() which 834 request reply on the requestqueue) between dlm_recover_waiters_pre() which
760 set RESEND and dlm_recover_waiters_post() */ 835 set RESEND and dlm_recover_waiters_post() */
761 836
762static int _remove_from_waiters(struct dlm_lkb *lkb) 837static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
763{ 838{
764 int error = 0; 839 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
840 int overlap_done = 0;
765 841
766 if (!lkb->lkb_wait_type) { 842 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
767 log_print("remove_from_waiters error"); 843 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
768 error = -EINVAL; 844 overlap_done = 1;
769 goto out; 845 goto out_del;
770 } 846 }
771 lkb->lkb_wait_type = 0; 847
848 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
849 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
850 overlap_done = 1;
851 goto out_del;
852 }
853
854 /* N.B. type of reply may not always correspond to type of original
855 msg due to lookup->request optimization, verify others? */
856
857 if (lkb->lkb_wait_type) {
858 lkb->lkb_wait_type = 0;
859 goto out_del;
860 }
861
862 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
863 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
864 return -1;
865
866 out_del:
867 /* the force-unlock/cancel has completed and we haven't recvd a reply
868 to the op that was in progress prior to the unlock/cancel; we
869 give up on any reply to the earlier op. FIXME: not sure when/how
870 this would happen */
871
872 if (overlap_done && lkb->lkb_wait_type) {
873 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
874 lkb->lkb_id, mstype, lkb->lkb_wait_type);
875 lkb->lkb_wait_count--;
876 lkb->lkb_wait_type = 0;
877 }
878
879 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
880
772 lkb->lkb_flags &= ~DLM_IFL_RESEND; 881 lkb->lkb_flags &= ~DLM_IFL_RESEND;
773 list_del(&lkb->lkb_wait_reply); 882 lkb->lkb_wait_count--;
883 if (!lkb->lkb_wait_count)
884 list_del_init(&lkb->lkb_wait_reply);
774 unhold_lkb(lkb); 885 unhold_lkb(lkb);
775 out: 886 return 0;
776 return error;
777} 887}
778 888
779static int remove_from_waiters(struct dlm_lkb *lkb) 889static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
780{ 890{
781 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 891 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782 int error; 892 int error;
783 893
784 mutex_lock(&ls->ls_waiters_mutex); 894 mutex_lock(&ls->ls_waiters_mutex);
785 error = _remove_from_waiters(lkb); 895 error = _remove_from_waiters(lkb, mstype);
786 mutex_unlock(&ls->ls_waiters_mutex); 896 mutex_unlock(&ls->ls_waiters_mutex);
787 return error; 897 return error;
788} 898}
789 899
900/* Handles situations where we might be processing a "fake" or "stub" reply in
901 which we can't try to take waiters_mutex again. */
902
903static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
904{
905 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
906 int error;
907
908 if (ms != &ls->ls_stub_ms)
909 mutex_lock(&ls->ls_waiters_mutex);
910 error = _remove_from_waiters(lkb, ms->m_type);
911 if (ms != &ls->ls_stub_ms)
912 mutex_unlock(&ls->ls_waiters_mutex);
913 return error;
914}
915
790static void dir_remove(struct dlm_rsb *r) 916static void dir_remove(struct dlm_rsb *r)
791{ 917{
792 int to_nodeid; 918 int to_nodeid;
@@ -988,8 +1114,14 @@ static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 _remove_lock(r, lkb); 1114 _remove_lock(r, lkb);
989} 1115}
990 1116
991static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1117/* returns: 0 did nothing
1118 1 moved lock to granted
1119 -1 removed lock */
1120
1121static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992{ 1122{
1123 int rv = 0;
1124
993 lkb->lkb_rqmode = DLM_LOCK_IV; 1125 lkb->lkb_rqmode = DLM_LOCK_IV;
994 1126
995 switch (lkb->lkb_status) { 1127 switch (lkb->lkb_status) {
@@ -997,6 +1129,7 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
997 break; 1129 break;
998 case DLM_LKSTS_CONVERT: 1130 case DLM_LKSTS_CONVERT:
999 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1131 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1132 rv = 1;
1000 break; 1133 break;
1001 case DLM_LKSTS_WAITING: 1134 case DLM_LKSTS_WAITING:
1002 del_lkb(r, lkb); 1135 del_lkb(r, lkb);
@@ -1004,15 +1137,17 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1004 /* this unhold undoes the original ref from create_lkb() 1137 /* this unhold undoes the original ref from create_lkb()
1005 so this leads to the lkb being freed */ 1138 so this leads to the lkb being freed */
1006 unhold_lkb(lkb); 1139 unhold_lkb(lkb);
1140 rv = -1;
1007 break; 1141 break;
1008 default: 1142 default:
1009 log_print("invalid status for revert %d", lkb->lkb_status); 1143 log_print("invalid status for revert %d", lkb->lkb_status);
1010 } 1144 }
1145 return rv;
1011} 1146}
1012 1147
1013static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1148static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1014{ 1149{
1015 revert_lock(r, lkb); 1150 return revert_lock(r, lkb);
1016} 1151}
1017 1152
1018static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1153static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1499,7 +1634,7 @@ static void process_lookup_list(struct dlm_rsb *r)
1499 struct dlm_lkb *lkb, *safe; 1634 struct dlm_lkb *lkb, *safe;
1500 1635
1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 1636 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 list_del(&lkb->lkb_rsb_lookup); 1637 list_del_init(&lkb->lkb_rsb_lookup);
1503 _request_lock(r, lkb); 1638 _request_lock(r, lkb);
1504 schedule(); 1639 schedule();
1505 } 1640 }
@@ -1530,7 +1665,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
1530 if (!list_empty(&r->res_lookup)) { 1665 if (!list_empty(&r->res_lookup)) {
1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 1666 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532 lkb_rsb_lookup); 1667 lkb_rsb_lookup);
1533 list_del(&lkb->lkb_rsb_lookup); 1668 list_del_init(&lkb->lkb_rsb_lookup);
1534 r->res_first_lkid = lkb->lkb_id; 1669 r->res_first_lkid = lkb->lkb_id;
1535 _request_lock(r, lkb); 1670 _request_lock(r, lkb);
1536 } else 1671 } else
@@ -1614,6 +1749,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1614 DLM_LKF_FORCEUNLOCK)) 1749 DLM_LKF_FORCEUNLOCK))
1615 return -EINVAL; 1750 return -EINVAL;
1616 1751
1752 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1753 return -EINVAL;
1754
1617 args->flags = flags; 1755 args->flags = flags;
1618 args->astparam = (long) astarg; 1756 args->astparam = (long) astarg;
1619 return 0; 1757 return 0;
@@ -1638,6 +1776,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1638 1776
1639 if (lkb->lkb_wait_type) 1777 if (lkb->lkb_wait_type)
1640 goto out; 1778 goto out;
1779
1780 if (is_overlap(lkb))
1781 goto out;
1641 } 1782 }
1642 1783
1643 lkb->lkb_exflags = args->flags; 1784 lkb->lkb_exflags = args->flags;
@@ -1654,35 +1795,126 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1654 return rv; 1795 return rv;
1655} 1796}
1656 1797
1798/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1799 for success */
1800
1801/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1802 because there may be a lookup in progress and it's valid to do
1803 cancel/unlockf on it */
1804
1657static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 1805static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1658{ 1806{
1807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1659 int rv = -EINVAL; 1808 int rv = -EINVAL;
1660 1809
1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1810 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1811 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1812 dlm_print_lkb(lkb);
1662 goto out; 1813 goto out;
1814 }
1663 1815
1664 if (args->flags & DLM_LKF_FORCEUNLOCK) 1816 /* an lkb may still exist even though the lock is EOL'ed due to a
1665 goto out_ok; 1817 cancel, unlock or failed noqueue request; an app can't use these
1818 locks; return same error as if the lkid had not been found at all */
1666 1819
1667 if (args->flags & DLM_LKF_CANCEL && 1820 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1668 lkb->lkb_status == DLM_LKSTS_GRANTED) 1821 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1822 rv = -ENOENT;
1669 goto out; 1823 goto out;
1824 }
1670 1825
1671 if (!(args->flags & DLM_LKF_CANCEL) && 1826 /* an lkb may be waiting for an rsb lookup to complete where the
1672 lkb->lkb_status != DLM_LKSTS_GRANTED) 1827 lookup was initiated by another lock */
1673 goto out; 1828
1829 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1830 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1831 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1832 list_del_init(&lkb->lkb_rsb_lookup);
1833 queue_cast(lkb->lkb_resource, lkb,
1834 args->flags & DLM_LKF_CANCEL ?
1835 -DLM_ECANCEL : -DLM_EUNLOCK);
1836 unhold_lkb(lkb); /* undoes create_lkb() */
1837 rv = -EBUSY;
1838 goto out;
1839 }
1840 }
1841
1842 /* cancel not allowed with another cancel/unlock in progress */
1843
1844 if (args->flags & DLM_LKF_CANCEL) {
1845 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1846 goto out;
1847
1848 if (is_overlap(lkb))
1849 goto out;
1850
1851 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1852 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1853 rv = -EBUSY;
1854 goto out;
1855 }
1856
1857 switch (lkb->lkb_wait_type) {
1858 case DLM_MSG_LOOKUP:
1859 case DLM_MSG_REQUEST:
1860 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1861 rv = -EBUSY;
1862 goto out;
1863 case DLM_MSG_UNLOCK:
1864 case DLM_MSG_CANCEL:
1865 goto out;
1866 }
1867 /* add_to_waiters() will set OVERLAP_CANCEL */
1868 goto out_ok;
1869 }
1870
1871 /* do we need to allow a force-unlock if there's a normal unlock
1872 already in progress? in what conditions could the normal unlock
1873 fail such that we'd want to send a force-unlock to be sure? */
1874
1875 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1876 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1877 goto out;
1878
1879 if (is_overlap_unlock(lkb))
1880 goto out;
1674 1881
1882 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1883 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1884 rv = -EBUSY;
1885 goto out;
1886 }
1887
1888 switch (lkb->lkb_wait_type) {
1889 case DLM_MSG_LOOKUP:
1890 case DLM_MSG_REQUEST:
1891 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1892 rv = -EBUSY;
1893 goto out;
1894 case DLM_MSG_UNLOCK:
1895 goto out;
1896 }
1897 /* add_to_waiters() will set OVERLAP_UNLOCK */
1898 goto out_ok;
1899 }
1900
1901 /* normal unlock not allowed if there's any op in progress */
1675 rv = -EBUSY; 1902 rv = -EBUSY;
1676 if (lkb->lkb_wait_type) 1903 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1677 goto out; 1904 goto out;
1678 1905
1679 out_ok: 1906 out_ok:
1680 lkb->lkb_exflags = args->flags; 1907 /* an overlapping op shouldn't blow away exflags from other op */
1908 lkb->lkb_exflags |= args->flags;
1681 lkb->lkb_sbflags = 0; 1909 lkb->lkb_sbflags = 0;
1682 lkb->lkb_astparam = args->astparam; 1910 lkb->lkb_astparam = args->astparam;
1683
1684 rv = 0; 1911 rv = 0;
1685 out: 1912 out:
1913 if (rv)
1914 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1915 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1916 args->flags, lkb->lkb_wait_type,
1917 lkb->lkb_resource->res_name);
1686 return rv; 1918 return rv;
1687} 1919}
1688 1920
@@ -1759,17 +1991,19 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1759 return -DLM_EUNLOCK; 1991 return -DLM_EUNLOCK;
1760} 1992}
1761 1993
1762/* FIXME: if revert_lock() finds that the lkb is granted, we should 1994/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
1763 skip the queue_cast(ECANCEL). It indicates that the request/convert
1764 completed (and queued a normal ast) just before the cancel; we don't
1765 want to clobber the sb_result for the normal ast with ECANCEL. */
1766 1995
1767static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 1996static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1768{ 1997{
1769 revert_lock(r, lkb); 1998 int error;
1770 queue_cast(r, lkb, -DLM_ECANCEL); 1999
1771 grant_pending_locks(r); 2000 error = revert_lock(r, lkb);
1772 return -DLM_ECANCEL; 2001 if (error) {
2002 queue_cast(r, lkb, -DLM_ECANCEL);
2003 grant_pending_locks(r);
2004 return -DLM_ECANCEL;
2005 }
2006 return 0;
1773} 2007}
1774 2008
1775/* 2009/*
@@ -2035,6 +2269,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2035 2269
2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 2270 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037 error = 0; 2271 error = 0;
2272 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2273 error = 0;
2038 out_put: 2274 out_put:
2039 dlm_put_lkb(lkb); 2275 dlm_put_lkb(lkb);
2040 out: 2276 out:
@@ -2176,7 +2412,9 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2176 struct dlm_mhandle *mh; 2412 struct dlm_mhandle *mh;
2177 int to_nodeid, error; 2413 int to_nodeid, error;
2178 2414
2179 add_to_waiters(lkb, mstype); 2415 error = add_to_waiters(lkb, mstype);
2416 if (error)
2417 return error;
2180 2418
2181 to_nodeid = r->res_nodeid; 2419 to_nodeid = r->res_nodeid;
2182 2420
@@ -2192,7 +2430,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2192 return 0; 2430 return 0;
2193 2431
2194 fail: 2432 fail:
2195 remove_from_waiters(lkb); 2433 remove_from_waiters(lkb, msg_reply_type(mstype));
2196 return error; 2434 return error;
2197} 2435}
2198 2436
@@ -2209,7 +2447,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2209 2447
2210 /* down conversions go without a reply from the master */ 2448 /* down conversions go without a reply from the master */
2211 if (!error && down_conversion(lkb)) { 2449 if (!error && down_conversion(lkb)) {
2212 remove_from_waiters(lkb); 2450 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2451 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2213 r->res_ls->ls_stub_ms.m_result = 0; 2452 r->res_ls->ls_stub_ms.m_result = 0;
2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; 2453 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2454 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
@@ -2280,7 +2519,9 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2280 struct dlm_mhandle *mh; 2519 struct dlm_mhandle *mh;
2281 int to_nodeid, error; 2520 int to_nodeid, error;
2282 2521
2283 add_to_waiters(lkb, DLM_MSG_LOOKUP); 2522 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2523 if (error)
2524 return error;
2284 2525
2285 to_nodeid = dlm_dir_nodeid(r); 2526 to_nodeid = dlm_dir_nodeid(r);
2286 2527
@@ -2296,7 +2537,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2296 return 0; 2537 return 0;
2297 2538
2298 fail: 2539 fail:
2299 remove_from_waiters(lkb); 2540 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2300 return error; 2541 return error;
2301} 2542}
2302 2543
@@ -2740,7 +2981,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2740{ 2981{
2741 struct dlm_lkb *lkb; 2982 struct dlm_lkb *lkb;
2742 struct dlm_rsb *r; 2983 struct dlm_rsb *r;
2743 int error, mstype; 2984 int error, mstype, result;
2744 2985
2745 error = find_lkb(ls, ms->m_remid, &lkb); 2986 error = find_lkb(ls, ms->m_remid, &lkb);
2746 if (error) { 2987 if (error) {
@@ -2749,20 +2990,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2749 } 2990 }
2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2991 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2751 2992
2752 mstype = lkb->lkb_wait_type;
2753 error = remove_from_waiters(lkb);
2754 if (error) {
2755 log_error(ls, "receive_request_reply not on waiters");
2756 goto out;
2757 }
2758
2759 /* this is the value returned from do_request() on the master */
2760 error = ms->m_result;
2761
2762 r = lkb->lkb_resource; 2993 r = lkb->lkb_resource;
2763 hold_rsb(r); 2994 hold_rsb(r);
2764 lock_rsb(r); 2995 lock_rsb(r);
2765 2996
2997 mstype = lkb->lkb_wait_type;
2998 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
2999 if (error)
3000 goto out;
3001
2766 /* Optimization: the dir node was also the master, so it took our 3002 /* Optimization: the dir node was also the master, so it took our
2767 lookup as a request and sent request reply instead of lookup reply */ 3003 lookup as a request and sent request reply instead of lookup reply */
2768 if (mstype == DLM_MSG_LOOKUP) { 3004 if (mstype == DLM_MSG_LOOKUP) {
@@ -2770,14 +3006,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2770 lkb->lkb_nodeid = r->res_nodeid; 3006 lkb->lkb_nodeid = r->res_nodeid;
2771 } 3007 }
2772 3008
2773 switch (error) { 3009 /* this is the value returned from do_request() on the master */
3010 result = ms->m_result;
3011
3012 switch (result) {
2774 case -EAGAIN: 3013 case -EAGAIN:
2775 /* request would block (be queued) on remote master; 3014 /* request would block (be queued) on remote master */
2776 the unhold undoes the original ref from create_lkb()
2777 so it leads to the lkb being freed */
2778 queue_cast(r, lkb, -EAGAIN); 3015 queue_cast(r, lkb, -EAGAIN);
2779 confirm_master(r, -EAGAIN); 3016 confirm_master(r, -EAGAIN);
2780 unhold_lkb(lkb); 3017 unhold_lkb(lkb); /* undoes create_lkb() */
2781 break; 3018 break;
2782 3019
2783 case -EINPROGRESS: 3020 case -EINPROGRESS:
@@ -2785,41 +3022,62 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2785 /* request was queued or granted on remote master */ 3022 /* request was queued or granted on remote master */
2786 receive_flags_reply(lkb, ms); 3023 receive_flags_reply(lkb, ms);
2787 lkb->lkb_remid = ms->m_lkid; 3024 lkb->lkb_remid = ms->m_lkid;
2788 if (error) 3025 if (result)
2789 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3026 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790 else { 3027 else {
2791 grant_lock_pc(r, lkb, ms); 3028 grant_lock_pc(r, lkb, ms);
2792 queue_cast(r, lkb, 0); 3029 queue_cast(r, lkb, 0);
2793 } 3030 }
2794 confirm_master(r, error); 3031 confirm_master(r, result);
2795 break; 3032 break;
2796 3033
2797 case -EBADR: 3034 case -EBADR:
2798 case -ENOTBLK: 3035 case -ENOTBLK:
2799 /* find_rsb failed to find rsb or rsb wasn't master */ 3036 /* find_rsb failed to find rsb or rsb wasn't master */
3037 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3038 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
2800 r->res_nodeid = -1; 3039 r->res_nodeid = -1;
2801 lkb->lkb_nodeid = -1; 3040 lkb->lkb_nodeid = -1;
2802 _request_lock(r, lkb); 3041
3042 if (is_overlap(lkb)) {
3043 /* we'll ignore error in cancel/unlock reply */
3044 queue_cast_overlap(r, lkb);
3045 unhold_lkb(lkb); /* undoes create_lkb() */
3046 } else
3047 _request_lock(r, lkb);
2803 break; 3048 break;
2804 3049
2805 default: 3050 default:
2806 log_error(ls, "receive_request_reply error %d", error); 3051 log_error(ls, "receive_request_reply %x error %d",
3052 lkb->lkb_id, result);
2807 } 3053 }
2808 3054
3055 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3056 log_debug(ls, "receive_request_reply %x result %d unlock",
3057 lkb->lkb_id, result);
3058 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3059 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3060 send_unlock(r, lkb);
3061 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3062 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3063 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3064 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3065 send_cancel(r, lkb);
3066 } else {
3067 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3068 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3069 }
3070 out:
2809 unlock_rsb(r); 3071 unlock_rsb(r);
2810 put_rsb(r); 3072 put_rsb(r);
2811 out:
2812 dlm_put_lkb(lkb); 3073 dlm_put_lkb(lkb);
2813} 3074}
2814 3075
2815static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3076static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 struct dlm_message *ms) 3077 struct dlm_message *ms)
2817{ 3078{
2818 int error = ms->m_result;
2819
2820 /* this is the value returned from do_convert() on the master */ 3079 /* this is the value returned from do_convert() on the master */
2821 3080 switch (ms->m_result) {
2822 switch (error) {
2823 case -EAGAIN: 3081 case -EAGAIN:
2824 /* convert would block (be queued) on remote master */ 3082 /* convert would block (be queued) on remote master */
2825 queue_cast(r, lkb, -EAGAIN); 3083 queue_cast(r, lkb, -EAGAIN);
@@ -2839,19 +3097,26 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2839 break; 3097 break;
2840 3098
2841 default: 3099 default:
2842 log_error(r->res_ls, "receive_convert_reply error %d", error); 3100 log_error(r->res_ls, "receive_convert_reply %x error %d",
3101 lkb->lkb_id, ms->m_result);
2843 } 3102 }
2844} 3103}
2845 3104
2846static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3105static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2847{ 3106{
2848 struct dlm_rsb *r = lkb->lkb_resource; 3107 struct dlm_rsb *r = lkb->lkb_resource;
3108 int error;
2849 3109
2850 hold_rsb(r); 3110 hold_rsb(r);
2851 lock_rsb(r); 3111 lock_rsb(r);
2852 3112
2853 __receive_convert_reply(r, lkb, ms); 3113 /* stub reply can happen with waiters_mutex held */
3114 error = remove_from_waiters_ms(lkb, ms);
3115 if (error)
3116 goto out;
2854 3117
3118 __receive_convert_reply(r, lkb, ms);
3119 out:
2855 unlock_rsb(r); 3120 unlock_rsb(r);
2856 put_rsb(r); 3121 put_rsb(r);
2857} 3122}
@@ -2868,37 +3133,38 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2868 } 3133 }
2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3134 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2870 3135
2871 error = remove_from_waiters(lkb);
2872 if (error) {
2873 log_error(ls, "receive_convert_reply not on waiters");
2874 goto out;
2875 }
2876
2877 _receive_convert_reply(lkb, ms); 3136 _receive_convert_reply(lkb, ms);
2878 out:
2879 dlm_put_lkb(lkb); 3137 dlm_put_lkb(lkb);
2880} 3138}
2881 3139
2882static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3140static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2883{ 3141{
2884 struct dlm_rsb *r = lkb->lkb_resource; 3142 struct dlm_rsb *r = lkb->lkb_resource;
2885 int error = ms->m_result; 3143 int error;
2886 3144
2887 hold_rsb(r); 3145 hold_rsb(r);
2888 lock_rsb(r); 3146 lock_rsb(r);
2889 3147
3148 /* stub reply can happen with waiters_mutex held */
3149 error = remove_from_waiters_ms(lkb, ms);
3150 if (error)
3151 goto out;
3152
2890 /* this is the value returned from do_unlock() on the master */ 3153 /* this is the value returned from do_unlock() on the master */
2891 3154
2892 switch (error) { 3155 switch (ms->m_result) {
2893 case -DLM_EUNLOCK: 3156 case -DLM_EUNLOCK:
2894 receive_flags_reply(lkb, ms); 3157 receive_flags_reply(lkb, ms);
2895 remove_lock_pc(r, lkb); 3158 remove_lock_pc(r, lkb);
2896 queue_cast(r, lkb, -DLM_EUNLOCK); 3159 queue_cast(r, lkb, -DLM_EUNLOCK);
2897 break; 3160 break;
3161 case -ENOENT:
3162 break;
2898 default: 3163 default:
2899 log_error(r->res_ls, "receive_unlock_reply error %d", error); 3164 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3165 lkb->lkb_id, ms->m_result);
2900 } 3166 }
2901 3167 out:
2902 unlock_rsb(r); 3168 unlock_rsb(r);
2903 put_rsb(r); 3169 put_rsb(r);
2904} 3170}
@@ -2915,37 +3181,39 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2915 } 3181 }
2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3182 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2917 3183
2918 error = remove_from_waiters(lkb);
2919 if (error) {
2920 log_error(ls, "receive_unlock_reply not on waiters");
2921 goto out;
2922 }
2923
2924 _receive_unlock_reply(lkb, ms); 3184 _receive_unlock_reply(lkb, ms);
2925 out:
2926 dlm_put_lkb(lkb); 3185 dlm_put_lkb(lkb);
2927} 3186}
2928 3187
2929static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3188static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930{ 3189{
2931 struct dlm_rsb *r = lkb->lkb_resource; 3190 struct dlm_rsb *r = lkb->lkb_resource;
2932 int error = ms->m_result; 3191 int error;
2933 3192
2934 hold_rsb(r); 3193 hold_rsb(r);
2935 lock_rsb(r); 3194 lock_rsb(r);
2936 3195
3196 /* stub reply can happen with waiters_mutex held */
3197 error = remove_from_waiters_ms(lkb, ms);
3198 if (error)
3199 goto out;
3200
2937 /* this is the value returned from do_cancel() on the master */ 3201 /* this is the value returned from do_cancel() on the master */
2938 3202
2939 switch (error) { 3203 switch (ms->m_result) {
2940 case -DLM_ECANCEL: 3204 case -DLM_ECANCEL:
2941 receive_flags_reply(lkb, ms); 3205 receive_flags_reply(lkb, ms);
2942 revert_lock_pc(r, lkb); 3206 revert_lock_pc(r, lkb);
2943 queue_cast(r, lkb, -DLM_ECANCEL); 3207 if (ms->m_result)
3208 queue_cast(r, lkb, -DLM_ECANCEL);
3209 break;
3210 case 0:
2944 break; 3211 break;
2945 default: 3212 default:
2946 log_error(r->res_ls, "receive_cancel_reply error %d", error); 3213 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3214 lkb->lkb_id, ms->m_result);
2947 } 3215 }
2948 3216 out:
2949 unlock_rsb(r); 3217 unlock_rsb(r);
2950 put_rsb(r); 3218 put_rsb(r);
2951} 3219}
@@ -2962,14 +3230,7 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2962 } 3230 }
2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3231 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2964 3232
2965 error = remove_from_waiters(lkb);
2966 if (error) {
2967 log_error(ls, "receive_cancel_reply not on waiters");
2968 goto out;
2969 }
2970
2971 _receive_cancel_reply(lkb, ms); 3233 _receive_cancel_reply(lkb, ms);
2972 out:
2973 dlm_put_lkb(lkb); 3234 dlm_put_lkb(lkb);
2974} 3235}
2975 3236
@@ -2985,20 +3246,17 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2985 return; 3246 return;
2986 } 3247 }
2987 3248
2988 error = remove_from_waiters(lkb); 3249 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
2989 if (error) {
2990 log_error(ls, "receive_lookup_reply not on waiters");
2991 goto out;
2992 }
2993
2994 /* this is the value returned by dlm_dir_lookup on dir node
2995 FIXME: will a non-zero error ever be returned? */ 3250 FIXME: will a non-zero error ever be returned? */
2996 error = ms->m_result;
2997 3251
2998 r = lkb->lkb_resource; 3252 r = lkb->lkb_resource;
2999 hold_rsb(r); 3253 hold_rsb(r);
3000 lock_rsb(r); 3254 lock_rsb(r);
3001 3255
3256 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3257 if (error)
3258 goto out;
3259
3002 ret_nodeid = ms->m_nodeid; 3260 ret_nodeid = ms->m_nodeid;
3003 if (ret_nodeid == dlm_our_nodeid()) { 3261 if (ret_nodeid == dlm_our_nodeid()) {
3004 r->res_nodeid = 0; 3262 r->res_nodeid = 0;
@@ -3009,14 +3267,22 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3009 r->res_nodeid = ret_nodeid; 3267 r->res_nodeid = ret_nodeid;
3010 } 3268 }
3011 3269
3270 if (is_overlap(lkb)) {
3271 log_debug(ls, "receive_lookup_reply %x unlock %x",
3272 lkb->lkb_id, lkb->lkb_flags);
3273 queue_cast_overlap(r, lkb);
3274 unhold_lkb(lkb); /* undoes create_lkb() */
3275 goto out_list;
3276 }
3277
3012 _request_lock(r, lkb); 3278 _request_lock(r, lkb);
3013 3279
3280 out_list:
3014 if (!ret_nodeid) 3281 if (!ret_nodeid)
3015 process_lookup_list(r); 3282 process_lookup_list(r);
3016 3283 out:
3017 unlock_rsb(r); 3284 unlock_rsb(r);
3018 put_rsb(r); 3285 put_rsb(r);
3019 out:
3020 dlm_put_lkb(lkb); 3286 dlm_put_lkb(lkb);
3021} 3287}
3022 3288
@@ -3153,9 +3419,9 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3153{ 3419{
3154 if (middle_conversion(lkb)) { 3420 if (middle_conversion(lkb)) {
3155 hold_lkb(lkb); 3421 hold_lkb(lkb);
3422 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3156 ls->ls_stub_ms.m_result = -EINPROGRESS; 3423 ls->ls_stub_ms.m_result = -EINPROGRESS;
3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3424 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158 _remove_from_waiters(lkb);
3159 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3425 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3160 3426
3161 /* Same special case as in receive_rcom_lock_args() */ 3427 /* Same special case as in receive_rcom_lock_args() */
@@ -3227,18 +3493,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3227 3493
3228 case DLM_MSG_UNLOCK: 3494 case DLM_MSG_UNLOCK:
3229 hold_lkb(lkb); 3495 hold_lkb(lkb);
3496 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK; 3497 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3498 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232 _remove_from_waiters(lkb);
3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3499 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3234 dlm_put_lkb(lkb); 3500 dlm_put_lkb(lkb);
3235 break; 3501 break;
3236 3502
3237 case DLM_MSG_CANCEL: 3503 case DLM_MSG_CANCEL:
3238 hold_lkb(lkb); 3504 hold_lkb(lkb);
3505 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL; 3506 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3507 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241 _remove_from_waiters(lkb);
3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3508 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3243 dlm_put_lkb(lkb); 3509 dlm_put_lkb(lkb);
3244 break; 3510 break;
@@ -3252,37 +3518,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3252 mutex_unlock(&ls->ls_waiters_mutex); 3518 mutex_unlock(&ls->ls_waiters_mutex);
3253} 3519}
3254 3520
3255static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 3521static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3256{ 3522{
3257 struct dlm_lkb *lkb; 3523 struct dlm_lkb *lkb;
3258 int rv = 0; 3524 int found = 0;
3259 3525
3260 mutex_lock(&ls->ls_waiters_mutex); 3526 mutex_lock(&ls->ls_waiters_mutex);
3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 3527 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3528 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 rv = lkb->lkb_wait_type; 3529 hold_lkb(lkb);
3264 _remove_from_waiters(lkb); 3530 found = 1;
3265 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266 break; 3531 break;
3267 } 3532 }
3268 } 3533 }
3269 mutex_unlock(&ls->ls_waiters_mutex); 3534 mutex_unlock(&ls->ls_waiters_mutex);
3270 3535
3271 if (!rv) 3536 if (!found)
3272 lkb = NULL; 3537 lkb = NULL;
3273 *lkb_ret = lkb; 3538 return lkb;
3274 return rv;
3275} 3539}
3276 3540
3277/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 3541/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3278 master or dir-node for r. Processing the lkb may result in it being placed 3542 master or dir-node for r. Processing the lkb may result in it being placed
3279 back on waiters. */ 3543 back on waiters. */
3280 3544
3545/* We do this after normal locking has been enabled and any saved messages
3546 (in requestqueue) have been processed. We should be confident that at
3547 this point we won't get or process a reply to any of these waiting
3548 operations. But, new ops may be coming in on the rsbs/locks here from
3549 userspace or remotely. */
3550
3551/* there may have been an overlap unlock/cancel prior to recovery or after
3552 recovery. if before, the lkb may still have a pos wait_count; if after, the
3553 overlap flag would just have been set and nothing new sent. we can be
3554 confident here than any replies to either the initial op or overlap ops
3555 prior to recovery have been received. */
3556
3281int dlm_recover_waiters_post(struct dlm_ls *ls) 3557int dlm_recover_waiters_post(struct dlm_ls *ls)
3282{ 3558{
3283 struct dlm_lkb *lkb; 3559 struct dlm_lkb *lkb;
3284 struct dlm_rsb *r; 3560 struct dlm_rsb *r;
3285 int error = 0, mstype; 3561 int error = 0, mstype, err, oc, ou;
3286 3562
3287 while (1) { 3563 while (1) {
3288 if (dlm_locking_stopped(ls)) { 3564 if (dlm_locking_stopped(ls)) {
@@ -3291,48 +3567,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
3291 break; 3567 break;
3292 } 3568 }
3293 3569
3294 mstype = remove_resend_waiter(ls, &lkb); 3570 lkb = find_resend_waiter(ls);
3295 if (!mstype) 3571 if (!lkb)
3296 break; 3572 break;
3297 3573
3298 r = lkb->lkb_resource; 3574 r = lkb->lkb_resource;
3575 hold_rsb(r);
3576 lock_rsb(r);
3577
3578 mstype = lkb->lkb_wait_type;
3579 oc = is_overlap_cancel(lkb);
3580 ou = is_overlap_unlock(lkb);
3581 err = 0;
3299 3582
3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 3583 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 3584 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3302 3585
3303 switch (mstype) { 3586 /* At this point we assume that we won't get a reply to any
3304 3587 previous op or overlap op on this lock. First, do a big
3305 case DLM_MSG_LOOKUP: 3588 remove_from_waiters() for all previous ops. */
3306 hold_rsb(r); 3589
3307 lock_rsb(r); 3590 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3308 _request_lock(r, lkb); 3591 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3309 if (is_master(r)) 3592 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3310 confirm_master(r, 0); 3593 lkb->lkb_wait_type = 0;
3311 unlock_rsb(r); 3594 lkb->lkb_wait_count = 0;
3312 put_rsb(r); 3595 mutex_lock(&ls->ls_waiters_mutex);
3313 break; 3596 list_del_init(&lkb->lkb_wait_reply);
3314 3597 mutex_unlock(&ls->ls_waiters_mutex);
3315 case DLM_MSG_REQUEST: 3598 unhold_lkb(lkb); /* for waiters list */
3316 hold_rsb(r); 3599
3317 lock_rsb(r); 3600 if (oc || ou) {
3318 _request_lock(r, lkb); 3601 /* do an unlock or cancel instead of resending */
3319 if (is_master(r)) 3602 switch (mstype) {
3320 confirm_master(r, 0); 3603 case DLM_MSG_LOOKUP:
3321 unlock_rsb(r); 3604 case DLM_MSG_REQUEST:
3322 put_rsb(r); 3605 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3323 break; 3606 -DLM_ECANCEL);
3324 3607 unhold_lkb(lkb); /* undoes create_lkb() */
3325 case DLM_MSG_CONVERT: 3608 break;
3326 hold_rsb(r); 3609 case DLM_MSG_CONVERT:
3327 lock_rsb(r); 3610 if (oc) {
3328 _convert_lock(r, lkb); 3611 queue_cast(r, lkb, -DLM_ECANCEL);
3329 unlock_rsb(r); 3612 } else {
3330 put_rsb(r); 3613 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3331 break; 3614 _unlock_lock(r, lkb);
3332 3615 }
3333 default: 3616 break;
3334 log_error(ls, "recover_waiters_post type %d", mstype); 3617 default:
3618 err = 1;
3619 }
3620 } else {
3621 switch (mstype) {
3622 case DLM_MSG_LOOKUP:
3623 case DLM_MSG_REQUEST:
3624 _request_lock(r, lkb);
3625 if (is_master(r))
3626 confirm_master(r, 0);
3627 break;
3628 case DLM_MSG_CONVERT:
3629 _convert_lock(r, lkb);
3630 break;
3631 default:
3632 err = 1;
3633 }
3335 } 3634 }
3635
3636 if (err)
3637 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3638 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3639 unlock_rsb(r);
3640 put_rsb(r);
3641 dlm_put_lkb(lkb);
3336 } 3642 }
3337 3643
3338 return error; 3644 return error;
@@ -3684,7 +3990,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3684 3990
3685 /* add this new lkb to the per-process list of locks */ 3991 /* add this new lkb to the per-process list of locks */
3686 spin_lock(&ua->proc->locks_spin); 3992 spin_lock(&ua->proc->locks_spin);
3687 kref_get(&lkb->lkb_ref); 3993 hold_lkb(lkb);
3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 3994 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 spin_unlock(&ua->proc->locks_spin); 3995 spin_unlock(&ua->proc->locks_spin);
3690 out: 3996 out:
@@ -3774,6 +4080,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3774 4080
3775 if (error == -DLM_EUNLOCK) 4081 if (error == -DLM_EUNLOCK)
3776 error = 0; 4082 error = 0;
4083 /* from validate_unlock_args() */
4084 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4085 error = 0;
3777 if (error) 4086 if (error)
3778 goto out_put; 4087 goto out_put;
3779 4088
@@ -3786,6 +4095,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3786 dlm_put_lkb(lkb); 4095 dlm_put_lkb(lkb);
3787 out: 4096 out:
3788 unlock_recovery(ls); 4097 unlock_recovery(ls);
4098 kfree(ua_tmp);
3789 return error; 4099 return error;
3790} 4100}
3791 4101
@@ -3815,33 +4125,37 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3815 4125
3816 if (error == -DLM_ECANCEL) 4126 if (error == -DLM_ECANCEL)
3817 error = 0; 4127 error = 0;
3818 if (error) 4128 /* from validate_unlock_args() */
3819 goto out_put; 4129 if (error == -EBUSY)
3820 4130 error = 0;
3821 /* this lkb was removed from the WAITING queue */
3822 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 spin_lock(&ua->proc->locks_spin);
3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825 spin_unlock(&ua->proc->locks_spin);
3826 }
3827 out_put: 4131 out_put:
3828 dlm_put_lkb(lkb); 4132 dlm_put_lkb(lkb);
3829 out: 4133 out:
3830 unlock_recovery(ls); 4134 unlock_recovery(ls);
4135 kfree(ua_tmp);
3831 return error; 4136 return error;
3832} 4137}
3833 4138
4139/* lkb's that are removed from the waiters list by revert are just left on the
4140 orphans list with the granted orphan locks, to be freed by purge */
4141
3834static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4142static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3835{ 4143{
3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4144 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4145 struct dlm_args args;
4146 int error;
3837 4147
3838 if (ua->lksb.sb_lvbptr) 4148 hold_lkb(lkb);
3839 kfree(ua->lksb.sb_lvbptr); 4149 mutex_lock(&ls->ls_orphans_mutex);
3840 kfree(ua); 4150 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
3841 lkb->lkb_astparam = (long)NULL; 4151 mutex_unlock(&ls->ls_orphans_mutex);
3842 4152
3843 /* TODO: propogate to master if needed */ 4153 set_unlock_args(0, ua, &args);
3844 return 0; 4154
4155 error = cancel_lock(ls, lkb, &args);
4156 if (error == -DLM_ECANCEL)
4157 error = 0;
4158 return error;
3845} 4159}
3846 4160
3847/* The force flag allows the unlock to go ahead even if the lkb isn't granted. 4161/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
@@ -3853,10 +4167,6 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3853 struct dlm_args args; 4167 struct dlm_args args;
3854 int error; 4168 int error;
3855 4169
3856 /* FIXME: we need to handle the case where the lkb is in limbo
3857 while the rsb is being looked up, currently we assert in
3858 _unlock_lock/is_remote because rsb nodeid is -1. */
3859
3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); 4170 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3861 4171
3862 error = unlock_lock(ls, lkb, &args); 4172 error = unlock_lock(ls, lkb, &args);
@@ -3865,6 +4175,31 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3865 return error; 4175 return error;
3866} 4176}
3867 4177
4178/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4179 (which does lock_rsb) due to deadlock with receiving a message that does
4180 lock_rsb followed by dlm_user_add_ast() */
4181
4182static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4183 struct dlm_user_proc *proc)
4184{
4185 struct dlm_lkb *lkb = NULL;
4186
4187 mutex_lock(&ls->ls_clear_proc_locks);
4188 if (list_empty(&proc->locks))
4189 goto out;
4190
4191 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4192 list_del_init(&lkb->lkb_ownqueue);
4193
4194 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4195 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4196 else
4197 lkb->lkb_flags |= DLM_IFL_DEAD;
4198 out:
4199 mutex_unlock(&ls->ls_clear_proc_locks);
4200 return lkb;
4201}
4202
3868/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4203/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4204 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870 which we clear here. */ 4205 which we clear here. */
@@ -3880,18 +4215,15 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3880 struct dlm_lkb *lkb, *safe; 4215 struct dlm_lkb *lkb, *safe;
3881 4216
3882 lock_recovery(ls); 4217 lock_recovery(ls);
3883 mutex_lock(&ls->ls_clear_proc_locks);
3884 4218
3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { 4219 while (1) {
3886 list_del_init(&lkb->lkb_ownqueue); 4220 lkb = del_proc_lock(ls, proc);
3887 4221 if (!lkb)
3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { 4222 break;
3889 lkb->lkb_flags |= DLM_IFL_ORPHAN; 4223 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
3890 orphan_proc_lock(ls, lkb); 4224 orphan_proc_lock(ls, lkb);
3891 } else { 4225 else
3892 lkb->lkb_flags |= DLM_IFL_DEAD;
3893 unlock_proc_lock(ls, lkb); 4226 unlock_proc_lock(ls, lkb);
3894 }
3895 4227
3896 /* this removes the reference for the proc->locks list 4228 /* this removes the reference for the proc->locks list
3897 added by dlm_user_request, it may result in the lkb 4229 added by dlm_user_request, it may result in the lkb
@@ -3900,6 +4232,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3900 dlm_put_lkb(lkb); 4232 dlm_put_lkb(lkb);
3901 } 4233 }
3902 4234
4235 mutex_lock(&ls->ls_clear_proc_locks);
4236
3903 /* in-progress unlocks */ 4237 /* in-progress unlocks */
3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4238 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 list_del_init(&lkb->lkb_ownqueue); 4239 list_del_init(&lkb->lkb_ownqueue);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index f40817b53c6f..f607ca2f0792 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -459,6 +459,8 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
459 459
460 INIT_LIST_HEAD(&ls->ls_waiters); 460 INIT_LIST_HEAD(&ls->ls_waiters);
461 mutex_init(&ls->ls_waiters_mutex); 461 mutex_init(&ls->ls_waiters_mutex);
462 INIT_LIST_HEAD(&ls->ls_orphans);
463 mutex_init(&ls->ls_orphans_mutex);
462 464
463 INIT_LIST_HEAD(&ls->ls_nodes); 465 INIT_LIST_HEAD(&ls->ls_nodes);
464 INIT_LIST_HEAD(&ls->ls_nodes_gone); 466 INIT_LIST_HEAD(&ls->ls_nodes_gone);
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 27a75ce571cf..c978c67b1eff 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2006 Red Hat, Inc. All rights reserved. 2 * Copyright (C) 2006-2007 Red Hat, Inc. All rights reserved.
3 * 3 *
4 * This copyrighted material is made available to anyone wishing to use, 4 * This copyrighted material is made available to anyone wishing to use,
5 * modify, copy, or redistribute it subject to the terms and conditions 5 * modify, copy, or redistribute it subject to the terms and conditions
@@ -128,35 +128,30 @@ static void compat_output(struct dlm_lock_result *res,
128} 128}
129#endif 129#endif
130 130
131/* we could possibly check if the cancel of an orphan has resulted in the lkb
132 being removed and then remove that lkb from the orphans list and free it */
131 133
132void dlm_user_add_ast(struct dlm_lkb *lkb, int type) 134void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
133{ 135{
134 struct dlm_ls *ls; 136 struct dlm_ls *ls;
135 struct dlm_user_args *ua; 137 struct dlm_user_args *ua;
136 struct dlm_user_proc *proc; 138 struct dlm_user_proc *proc;
137 int remove_ownqueue = 0; 139 int eol = 0, ast_type;
138 140
139 /* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each 141 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
140 lkb before dealing with it. We need to check this
141 flag before taking ls_clear_proc_locks mutex because if
142 it's set, dlm_clear_proc_locks() holds the mutex. */
143
144 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
145 /* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */
146 return; 142 return;
147 }
148 143
149 ls = lkb->lkb_resource->res_ls; 144 ls = lkb->lkb_resource->res_ls;
150 mutex_lock(&ls->ls_clear_proc_locks); 145 mutex_lock(&ls->ls_clear_proc_locks);
151 146
152 /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast 147 /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
153 can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed 148 can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed
154 lkb->ua so we can't try to use it. */ 149 lkb->ua so we can't try to use it. This second check is necessary
150 for cases where a completion ast is received for an operation that
151 began before clear_proc_locks did its cancel/unlock. */
155 152
156 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) { 153 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
157 /* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */
158 goto out; 154 goto out;
159 }
160 155
161 DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb);); 156 DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb););
162 ua = (struct dlm_user_args *)lkb->lkb_astparam; 157 ua = (struct dlm_user_args *)lkb->lkb_astparam;
@@ -166,28 +161,42 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
166 goto out; 161 goto out;
167 162
168 spin_lock(&proc->asts_spin); 163 spin_lock(&proc->asts_spin);
169 if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) { 164
165 ast_type = lkb->lkb_ast_type;
166 lkb->lkb_ast_type |= type;
167
168 if (!ast_type) {
170 kref_get(&lkb->lkb_ref); 169 kref_get(&lkb->lkb_ref);
171 list_add_tail(&lkb->lkb_astqueue, &proc->asts); 170 list_add_tail(&lkb->lkb_astqueue, &proc->asts);
172 lkb->lkb_ast_type |= type;
173 wake_up_interruptible(&proc->wait); 171 wake_up_interruptible(&proc->wait);
174 } 172 }
175 173 if (type == AST_COMP && (ast_type & AST_COMP))
176 /* noqueue requests that fail may need to be removed from the 174 log_debug(ls, "ast overlap %x status %x %x",
177 proc's locks list, there should be a better way of detecting 175 lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags);
178 this situation than checking all these things... */ 176
179 177 /* Figure out if this lock is at the end of its life and no longer
180 if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV && 178 available for the application to use. The lkb still exists until
181 ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue)) 179 the final ast is read. A lock becomes EOL in three situations:
182 remove_ownqueue = 1; 180 1. a noqueue request fails with EAGAIN
183 181 2. an unlock completes with EUNLOCK
184 /* unlocks or cancels of waiting requests need to be removed from the 182 3. a cancel of a waiting request completes with ECANCEL
185 proc's unlocking list, again there must be a better way... */ 183 An EOL lock needs to be removed from the process's list of locks.
186 184 And we can't allow any new operation on an EOL lock. This is
187 if (ua->lksb.sb_status == -DLM_EUNLOCK || 185 not related to the lifetime of the lkb struct which is managed
186 entirely by refcount. */
187
188 if (type == AST_COMP &&
189 lkb->lkb_grmode == DLM_LOCK_IV &&
190 ua->lksb.sb_status == -EAGAIN)
191 eol = 1;
192 else if (ua->lksb.sb_status == -DLM_EUNLOCK ||
188 (ua->lksb.sb_status == -DLM_ECANCEL && 193 (ua->lksb.sb_status == -DLM_ECANCEL &&
189 lkb->lkb_grmode == DLM_LOCK_IV)) 194 lkb->lkb_grmode == DLM_LOCK_IV))
190 remove_ownqueue = 1; 195 eol = 1;
196 if (eol) {
197 lkb->lkb_ast_type &= ~AST_BAST;
198 lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
199 }
191 200
192 /* We want to copy the lvb to userspace when the completion 201 /* We want to copy the lvb to userspace when the completion
193 ast is read if the status is 0, the lock has an lvb and 202 ast is read if the status is 0, the lock has an lvb and
@@ -204,11 +213,13 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
204 213
205 spin_unlock(&proc->asts_spin); 214 spin_unlock(&proc->asts_spin);
206 215
207 if (remove_ownqueue) { 216 if (eol) {
208 spin_lock(&ua->proc->locks_spin); 217 spin_lock(&ua->proc->locks_spin);
209 list_del_init(&lkb->lkb_ownqueue); 218 if (!list_empty(&lkb->lkb_ownqueue)) {
219 list_del_init(&lkb->lkb_ownqueue);
220 dlm_put_lkb(lkb);
221 }
210 spin_unlock(&ua->proc->locks_spin); 222 spin_unlock(&ua->proc->locks_spin);
211 dlm_put_lkb(lkb);
212 } 223 }
213 out: 224 out:
214 mutex_unlock(&ls->ls_clear_proc_locks); 225 mutex_unlock(&ls->ls_clear_proc_locks);