aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2007-05-18 09:59:31 -0400
committerSteven Whitehouse <swhiteho@redhat.com>2007-07-09 03:22:33 -0400
commit3ae1acf93a21512512f8a78430fcde5992dd208e (patch)
tree878ed3c619530c6acf862ecf663063f66fc47a06 /fs/dlm/lock.c
parent85e86edf951a8a39954c0ba1edbe4a58827dcd5c (diff)
[DLM] add lock timeouts and warnings [2/6]
New features: lock timeouts and time warnings. If the DLM_LKF_TIMEOUT flag is set, then the request/conversion will be canceled after waiting the specified number of centiseconds (specified per lock). This feature is only available for locks requested through libdlm (can be enabled for kernel dlm users if there's a use for it.) If the new DLM_LSFL_TIMEWARN flag is set when creating the lockspace, then a warning message will be sent to userspace (using genetlink) after a request/conversion has been waiting for a given number of centiseconds (configurable per node). The time warnings will be used in the future to do deadlock detection in userspace. Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c146
1 files changed, 144 insertions, 2 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 09668ec2e279..ab986dfbe6d3 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -82,10 +82,13 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r); 83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 86static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms); 87 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms); 88static int receive_extralen(struct dlm_message *ms);
88static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 89static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90static void del_timeout(struct dlm_lkb *lkb);
91void dlm_timeout_warn(struct dlm_lkb *lkb);
89 92
90/* 93/*
91 * Lock compatibilty matrix - thanks Steve 94 * Lock compatibilty matrix - thanks Steve
@@ -286,8 +289,17 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
286 if (is_master_copy(lkb)) 289 if (is_master_copy(lkb))
287 return; 290 return;
288 291
292 del_timeout(lkb);
293
289 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
290 295
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
291 lkb->lkb_lksb->sb_status = rv; 303 lkb->lkb_lksb->sb_status = rv;
292 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; 304 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
293 305
@@ -581,6 +593,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
581 kref_init(&lkb->lkb_ref); 593 kref_init(&lkb->lkb_ref);
582 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596 INIT_LIST_HEAD(&lkb->lkb_time_list);
584 597
585 get_random_bytes(&bucket, sizeof(bucket)); 598 get_random_bytes(&bucket, sizeof(bucket));
586 bucket &= (ls->ls_lkbtbl_size - 1); 599 bucket &= (ls->ls_lkbtbl_size - 1);
@@ -993,6 +1006,125 @@ void dlm_scan_rsbs(struct dlm_ls *ls)
993 } 1006 }
994} 1007}
995 1008
1009static void add_timeout(struct dlm_lkb *lkb)
1010{
1011 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1012
1013 if (is_master_copy(lkb))
1014 return;
1015
1016 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1017 goto add_it;
1018
1019 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1020 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1021 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1022 goto add_it;
1023 }
1024 return;
1025
1026 add_it:
1027 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1028 mutex_lock(&ls->ls_timeout_mutex);
1029 hold_lkb(lkb);
1030 lkb->lkb_timestamp = jiffies;
1031 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1032 mutex_unlock(&ls->ls_timeout_mutex);
1033}
1034
1035static void del_timeout(struct dlm_lkb *lkb)
1036{
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038
1039 mutex_lock(&ls->ls_timeout_mutex);
1040 if (!list_empty(&lkb->lkb_time_list)) {
1041 list_del_init(&lkb->lkb_time_list);
1042 unhold_lkb(lkb);
1043 }
1044 mutex_unlock(&ls->ls_timeout_mutex);
1045}
1046
1047/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1048 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1049 and then lock rsb because of lock ordering in add_timeout. We may need
1050 to specify some special timeout-related bits in the lkb that are just to
1051 be accessed under the timeout_mutex. */
1052
1053void dlm_scan_timeout(struct dlm_ls *ls)
1054{
1055 struct dlm_rsb *r;
1056 struct dlm_lkb *lkb;
1057 int do_cancel, do_warn;
1058
1059 for (;;) {
1060 if (dlm_locking_stopped(ls))
1061 break;
1062
1063 do_cancel = 0;
1064 do_warn = 0;
1065 mutex_lock(&ls->ls_timeout_mutex);
1066 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1067
1068 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1069 time_after_eq(jiffies, lkb->lkb_timestamp +
1070 lkb->lkb_timeout_cs * HZ/100))
1071 do_cancel = 1;
1072
1073 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1074 time_after_eq(jiffies, lkb->lkb_timestamp +
1075 dlm_config.ci_timewarn_cs * HZ/100))
1076 do_warn = 1;
1077
1078 if (!do_cancel && !do_warn)
1079 continue;
1080 hold_lkb(lkb);
1081 break;
1082 }
1083 mutex_unlock(&ls->ls_timeout_mutex);
1084
1085 if (!do_cancel && !do_warn)
1086 break;
1087
1088 r = lkb->lkb_resource;
1089 hold_rsb(r);
1090 lock_rsb(r);
1091
1092 if (do_warn) {
1093 /* clear flag so we only warn once */
1094 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1095 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1096 del_timeout(lkb);
1097 dlm_timeout_warn(lkb);
1098 }
1099
1100 if (do_cancel) {
1101 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1102 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1103 del_timeout(lkb);
1104 _cancel_lock(r, lkb);
1105 }
1106
1107 unlock_rsb(r);
1108 unhold_rsb(r);
1109 dlm_put_lkb(lkb);
1110 }
1111}
1112
1113/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1114 dlm_recoverd before checking/setting ls_recover_begin. */
1115
1116void dlm_adjust_timeouts(struct dlm_ls *ls)
1117{
1118 struct dlm_lkb *lkb;
1119 long adj = jiffies - ls->ls_recover_begin;
1120
1121 ls->ls_recover_begin = 0;
1122 mutex_lock(&ls->ls_timeout_mutex);
1123 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1124 lkb->lkb_timestamp += adj;
1125 mutex_unlock(&ls->ls_timeout_mutex);
1126}
1127
996/* lkb is master or local copy */ 1128/* lkb is master or local copy */
997 1129
998static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1130static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1902,6 +2034,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1902 if (is_overlap(lkb)) 2034 if (is_overlap(lkb))
1903 goto out; 2035 goto out;
1904 2036
2037 /* don't let scand try to do a cancel */
2038 del_timeout(lkb);
2039
1905 if (lkb->lkb_flags & DLM_IFL_RESEND) { 2040 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1906 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 2041 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1907 rv = -EBUSY; 2042 rv = -EBUSY;
@@ -1933,6 +2068,9 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1933 if (is_overlap_unlock(lkb)) 2068 if (is_overlap_unlock(lkb))
1934 goto out; 2069 goto out;
1935 2070
2071 /* don't let scand try to do a cancel */
2072 del_timeout(lkb);
2073
1936 if (lkb->lkb_flags & DLM_IFL_RESEND) { 2074 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1937 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 2075 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1938 rv = -EBUSY; 2076 rv = -EBUSY;
@@ -1993,6 +2131,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1993 error = -EINPROGRESS; 2131 error = -EINPROGRESS;
1994 add_lkb(r, lkb, DLM_LKSTS_WAITING); 2132 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1995 send_blocking_asts(r, lkb); 2133 send_blocking_asts(r, lkb);
2134 add_timeout(lkb);
1996 goto out; 2135 goto out;
1997 } 2136 }
1998 2137
@@ -2040,6 +2179,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2040 del_lkb(r, lkb); 2179 del_lkb(r, lkb);
2041 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2180 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2042 send_blocking_asts(r, lkb); 2181 send_blocking_asts(r, lkb);
2182 add_timeout(lkb);
2043 goto out; 2183 goto out;
2044 } 2184 }
2045 2185
@@ -3110,9 +3250,10 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3110 lkb->lkb_remid = ms->m_lkid; 3250 lkb->lkb_remid = ms->m_lkid;
3111 if (is_altmode(lkb)) 3251 if (is_altmode(lkb))
3112 munge_altmode(lkb, ms); 3252 munge_altmode(lkb, ms);
3113 if (result) 3253 if (result) {
3114 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3254 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3115 else { 3255 add_timeout(lkb);
3256 } else {
3116 grant_lock_pc(r, lkb, ms); 3257 grant_lock_pc(r, lkb, ms);
3117 queue_cast(r, lkb, 0); 3258 queue_cast(r, lkb, 0);
3118 } 3259 }
@@ -3178,6 +3319,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3178 munge_demoted(lkb, ms); 3319 munge_demoted(lkb, ms);
3179 del_lkb(r, lkb); 3320 del_lkb(r, lkb);
3180 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3321 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3322 add_timeout(lkb);
3181 break; 3323 break;
3182 3324
3183 case 0: 3325 case 0: