aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
authorGlenn Elliott <gelliott@cs.unc.edu>2012-03-04 19:47:13 -0500
committerGlenn Elliott <gelliott@cs.unc.edu>2012-03-04 19:47:13 -0500
commitc71c03bda1e86c9d5198c5d83f712e695c4f2a1e (patch)
treeecb166cb3e2b7e2adb3b5e292245fefd23381ac8 /fs/dlm
parentea53c912f8a86a8567697115b6a0d8152beee5c8 (diff)
parent6a00f206debf8a5c8899055726ad127dbeeed098 (diff)
Merge branch 'mpi-master' into wip-k-fmlpwip-k-fmlp
Conflicts: litmus/sched_cedf.c
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/Kconfig3
-rw-r--r--fs/dlm/ast.c257
-rw-r--r--fs/dlm/ast.h7
-rw-r--r--fs/dlm/config.c13
-rw-r--r--fs/dlm/config.h1
-rw-r--r--fs/dlm/debug_fs.c7
-rw-r--r--fs/dlm/dlm_internal.h38
-rw-r--r--fs/dlm/lock.c225
-rw-r--r--fs/dlm/lock.h1
-rw-r--r--fs/dlm/lockspace.c6
-rw-r--r--fs/dlm/lowcomms.c65
-rw-r--r--fs/dlm/main.c2
-rw-r--r--fs/dlm/plock.c68
-rw-r--r--fs/dlm/rcom.c4
-rw-r--r--fs/dlm/recover.c2
-rw-r--r--fs/dlm/user.c189
-rw-r--r--fs/dlm/user.h3
17 files changed, 597 insertions, 294 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig
index 2dbb422e8116..1897eb1b4b6a 100644
--- a/fs/dlm/Kconfig
+++ b/fs/dlm/Kconfig
@@ -1,8 +1,7 @@
1menuconfig DLM 1menuconfig DLM
2 tristate "Distributed Lock Manager (DLM)" 2 tristate "Distributed Lock Manager (DLM)"
3 depends on EXPERIMENTAL && INET 3 depends on EXPERIMENTAL && INET
4 depends on SYSFS && (IPV6 || IPV6=n) 4 depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n)
5 select CONFIGFS_FS
6 select IP_SCTP 5 select IP_SCTP
7 help 6 help
8 A general purpose distributed lock manager for kernel or userspace 7 A general purpose distributed lock manager for kernel or userspace
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 4314f0d48d85..abc49f292454 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -18,6 +18,7 @@
18 18
19#define WAKE_ASTS 0 19#define WAKE_ASTS 0
20 20
21static uint64_t ast_seq_count;
21static struct list_head ast_queue; 22static struct list_head ast_queue;
22static spinlock_t ast_queue_lock; 23static spinlock_t ast_queue_lock;
23static struct task_struct * astd_task; 24static struct task_struct * astd_task;
@@ -25,40 +26,186 @@ static unsigned long astd_wakeflags;
25static struct mutex astd_running; 26static struct mutex astd_running;
26 27
27 28
29static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
30{
31 int i;
32
33 log_print("last_bast %x %llu flags %x mode %d sb %d %x",
34 lkb->lkb_id,
35 (unsigned long long)lkb->lkb_last_bast.seq,
36 lkb->lkb_last_bast.flags,
37 lkb->lkb_last_bast.mode,
38 lkb->lkb_last_bast.sb_status,
39 lkb->lkb_last_bast.sb_flags);
40
41 log_print("last_cast %x %llu flags %x mode %d sb %d %x",
42 lkb->lkb_id,
43 (unsigned long long)lkb->lkb_last_cast.seq,
44 lkb->lkb_last_cast.flags,
45 lkb->lkb_last_cast.mode,
46 lkb->lkb_last_cast.sb_status,
47 lkb->lkb_last_cast.sb_flags);
48
49 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
50 log_print("cb %x %llu flags %x mode %d sb %d %x",
51 lkb->lkb_id,
52 (unsigned long long)lkb->lkb_callbacks[i].seq,
53 lkb->lkb_callbacks[i].flags,
54 lkb->lkb_callbacks[i].mode,
55 lkb->lkb_callbacks[i].sb_status,
56 lkb->lkb_callbacks[i].sb_flags);
57 }
58}
59
28void dlm_del_ast(struct dlm_lkb *lkb) 60void dlm_del_ast(struct dlm_lkb *lkb)
29{ 61{
30 spin_lock(&ast_queue_lock); 62 spin_lock(&ast_queue_lock);
31 if (lkb->lkb_ast_type & (AST_COMP | AST_BAST)) 63 if (!list_empty(&lkb->lkb_astqueue))
32 list_del(&lkb->lkb_astqueue); 64 list_del_init(&lkb->lkb_astqueue);
33 spin_unlock(&ast_queue_lock); 65 spin_unlock(&ast_queue_lock);
34} 66}
35 67
36void dlm_add_ast(struct dlm_lkb *lkb, int type, int mode) 68int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
69 int status, uint32_t sbflags, uint64_t seq)
37{ 70{
71 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
72 uint64_t prev_seq;
73 int prev_mode;
74 int i;
75
76 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
77 if (lkb->lkb_callbacks[i].seq)
78 continue;
79
80 /*
81 * Suppress some redundant basts here, do more on removal.
82 * Don't even add a bast if the callback just before it
83 * is a bast for the same mode or a more restrictive mode.
84 * (the addional > PR check is needed for PR/CW inversion)
85 */
86
87 if ((i > 0) && (flags & DLM_CB_BAST) &&
88 (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
89
90 prev_seq = lkb->lkb_callbacks[i-1].seq;
91 prev_mode = lkb->lkb_callbacks[i-1].mode;
92
93 if ((prev_mode == mode) ||
94 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
95
96 log_debug(ls, "skip %x add bast %llu mode %d "
97 "for bast %llu mode %d",
98 lkb->lkb_id,
99 (unsigned long long)seq,
100 mode,
101 (unsigned long long)prev_seq,
102 prev_mode);
103 return 0;
104 }
105 }
106
107 lkb->lkb_callbacks[i].seq = seq;
108 lkb->lkb_callbacks[i].flags = flags;
109 lkb->lkb_callbacks[i].mode = mode;
110 lkb->lkb_callbacks[i].sb_status = status;
111 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
112 break;
113 }
114
115 if (i == DLM_CALLBACKS_SIZE) {
116 log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
117 lkb->lkb_id, (unsigned long long)seq,
118 flags, mode, status, sbflags);
119 dlm_dump_lkb_callbacks(lkb);
120 return -1;
121 }
122
123 return 0;
124}
125
126int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
127 struct dlm_callback *cb, int *resid)
128{
129 int i;
130
131 *resid = 0;
132
133 if (!lkb->lkb_callbacks[0].seq)
134 return -ENOENT;
135
136 /* oldest undelivered cb is callbacks[0] */
137
138 memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
139 memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
140
141 /* shift others down */
142
143 for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
144 if (!lkb->lkb_callbacks[i].seq)
145 break;
146 memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
147 sizeof(struct dlm_callback));
148 memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
149 (*resid)++;
150 }
151
152 /* if cb is a bast, it should be skipped if the blocking mode is
153 compatible with the last granted mode */
154
155 if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
156 if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
157 cb->flags |= DLM_CB_SKIP;
158
159 log_debug(ls, "skip %x bast %llu mode %d "
160 "for cast %llu mode %d",
161 lkb->lkb_id,
162 (unsigned long long)cb->seq,
163 cb->mode,
164 (unsigned long long)lkb->lkb_last_cast.seq,
165 lkb->lkb_last_cast.mode);
166 return 0;
167 }
168 }
169
170 if (cb->flags & DLM_CB_CAST) {
171 memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
172 lkb->lkb_last_cast_time = ktime_get();
173 }
174
175 if (cb->flags & DLM_CB_BAST) {
176 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
177 lkb->lkb_last_bast_time = ktime_get();
178 }
179
180 return 0;
181}
182
183void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
184 uint32_t sbflags)
185{
186 uint64_t seq;
187 int rv;
188
189 spin_lock(&ast_queue_lock);
190
191 seq = ++ast_seq_count;
192
38 if (lkb->lkb_flags & DLM_IFL_USER) { 193 if (lkb->lkb_flags & DLM_IFL_USER) {
39 dlm_user_add_ast(lkb, type, mode); 194 spin_unlock(&ast_queue_lock);
195 dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
40 return; 196 return;
41 } 197 }
42 198
43 spin_lock(&ast_queue_lock); 199 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
44 if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) { 200 if (rv < 0) {
201 spin_unlock(&ast_queue_lock);
202 return;
203 }
204
205 if (list_empty(&lkb->lkb_astqueue)) {
45 kref_get(&lkb->lkb_ref); 206 kref_get(&lkb->lkb_ref);
46 list_add_tail(&lkb->lkb_astqueue, &ast_queue); 207 list_add_tail(&lkb->lkb_astqueue, &ast_queue);
47 lkb->lkb_ast_first = type;
48 } 208 }
49
50 /* sanity check, this should not happen */
51
52 if ((type == AST_COMP) && (lkb->lkb_ast_type & AST_COMP))
53 log_print("repeat cast %d castmode %d lock %x %s",
54 mode, lkb->lkb_castmode,
55 lkb->lkb_id, lkb->lkb_resource->res_name);
56
57 lkb->lkb_ast_type |= type;
58 if (type == AST_BAST)
59 lkb->lkb_bastmode = mode;
60 else
61 lkb->lkb_castmode = mode;
62 spin_unlock(&ast_queue_lock); 209 spin_unlock(&ast_queue_lock);
63 210
64 set_bit(WAKE_ASTS, &astd_wakeflags); 211 set_bit(WAKE_ASTS, &astd_wakeflags);
@@ -72,7 +219,8 @@ static void process_asts(void)
72 struct dlm_lkb *lkb; 219 struct dlm_lkb *lkb;
73 void (*castfn) (void *astparam); 220 void (*castfn) (void *astparam);
74 void (*bastfn) (void *astparam, int mode); 221 void (*bastfn) (void *astparam, int mode);
75 int type, first, bastmode, castmode, do_bast, do_cast, last_castmode; 222 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
223 int i, rv, resid;
76 224
77repeat: 225repeat:
78 spin_lock(&ast_queue_lock); 226 spin_lock(&ast_queue_lock);
@@ -83,54 +231,45 @@ repeat:
83 if (dlm_locking_stopped(ls)) 231 if (dlm_locking_stopped(ls))
84 continue; 232 continue;
85 233
86 list_del(&lkb->lkb_astqueue); 234 /* we remove from astqueue list and remove everything in
87 type = lkb->lkb_ast_type; 235 lkb_callbacks before releasing the spinlock so empty
88 lkb->lkb_ast_type = 0; 236 lkb_astqueue is always consistent with empty lkb_callbacks */
89 first = lkb->lkb_ast_first; 237
90 lkb->lkb_ast_first = 0; 238 list_del_init(&lkb->lkb_astqueue);
91 bastmode = lkb->lkb_bastmode; 239
92 castmode = lkb->lkb_castmode;
93 castfn = lkb->lkb_astfn; 240 castfn = lkb->lkb_astfn;
94 bastfn = lkb->lkb_bastfn; 241 bastfn = lkb->lkb_bastfn;
95 spin_unlock(&ast_queue_lock);
96 242
97 do_cast = (type & AST_COMP) && castfn; 243 memset(&callbacks, 0, sizeof(callbacks));
98 do_bast = (type & AST_BAST) && bastfn;
99 244
100 /* Skip a bast if its blocking mode is compatible with the 245 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
101 granted mode of the preceding cast. */ 246 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
247 if (rv < 0)
248 break;
249 }
250 spin_unlock(&ast_queue_lock);
102 251
103 if (do_bast) { 252 if (resid) {
104 if (first == AST_COMP) 253 /* shouldn't happen, for loop should have removed all */
105 last_castmode = castmode; 254 log_error(ls, "callback resid %d lkb %x",
106 else 255 resid, lkb->lkb_id);
107 last_castmode = lkb->lkb_castmode_done;
108 if (dlm_modes_compat(bastmode, last_castmode))
109 do_bast = 0;
110 } 256 }
111 257
112 if (first == AST_COMP) { 258 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
113 if (do_cast) 259 if (!callbacks[i].seq)
114 castfn(lkb->lkb_astparam); 260 break;
115 if (do_bast) 261 if (callbacks[i].flags & DLM_CB_SKIP) {
116 bastfn(lkb->lkb_astparam, bastmode); 262 continue;
117 } else if (first == AST_BAST) { 263 } else if (callbacks[i].flags & DLM_CB_BAST) {
118 if (do_bast) 264 bastfn(lkb->lkb_astparam, callbacks[i].mode);
119 bastfn(lkb->lkb_astparam, bastmode); 265 } else if (callbacks[i].flags & DLM_CB_CAST) {
120 if (do_cast) 266 lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
267 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
121 castfn(lkb->lkb_astparam); 268 castfn(lkb->lkb_astparam);
122 } else { 269 }
123 log_error(ls, "bad ast_first %d ast_type %d",
124 first, type);
125 } 270 }
126 271
127 if (do_cast) 272 /* removes ref for ast_queue, may cause lkb to be freed */
128 lkb->lkb_castmode_done = castmode;
129 if (do_bast)
130 lkb->lkb_bastmode_done = bastmode;
131
132 /* this removes the reference added by dlm_add_ast
133 and may result in the lkb being freed */
134 dlm_put_lkb(lkb); 273 dlm_put_lkb(lkb);
135 274
136 cond_resched(); 275 cond_resched();
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index bcb1aaba519d..8aa89c9b5611 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -13,8 +13,13 @@
13#ifndef __ASTD_DOT_H__ 13#ifndef __ASTD_DOT_H__
14#define __ASTD_DOT_H__ 14#define __ASTD_DOT_H__
15 15
16void dlm_add_ast(struct dlm_lkb *lkb, int type, int mode);
17void dlm_del_ast(struct dlm_lkb *lkb); 16void dlm_del_ast(struct dlm_lkb *lkb);
17int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
18 int status, uint32_t sbflags, uint64_t seq);
19int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
20 struct dlm_callback *cb, int *resid);
21void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
22 uint32_t sbflags);
18 23
19void dlm_astd_wake(void); 24void dlm_astd_wake(void);
20int dlm_astd_start(void); 25int dlm_astd_start(void);
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index b54bca03d92f..9b026ea8baa9 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -100,6 +100,7 @@ struct dlm_cluster {
100 unsigned int cl_log_debug; 100 unsigned int cl_log_debug;
101 unsigned int cl_protocol; 101 unsigned int cl_protocol;
102 unsigned int cl_timewarn_cs; 102 unsigned int cl_timewarn_cs;
103 unsigned int cl_waitwarn_us;
103}; 104};
104 105
105enum { 106enum {
@@ -114,6 +115,7 @@ enum {
114 CLUSTER_ATTR_LOG_DEBUG, 115 CLUSTER_ATTR_LOG_DEBUG,
115 CLUSTER_ATTR_PROTOCOL, 116 CLUSTER_ATTR_PROTOCOL,
116 CLUSTER_ATTR_TIMEWARN_CS, 117 CLUSTER_ATTR_TIMEWARN_CS,
118 CLUSTER_ATTR_WAITWARN_US,
117}; 119};
118 120
119struct cluster_attribute { 121struct cluster_attribute {
@@ -166,6 +168,7 @@ CLUSTER_ATTR(scan_secs, 1);
166CLUSTER_ATTR(log_debug, 0); 168CLUSTER_ATTR(log_debug, 0);
167CLUSTER_ATTR(protocol, 0); 169CLUSTER_ATTR(protocol, 0);
168CLUSTER_ATTR(timewarn_cs, 1); 170CLUSTER_ATTR(timewarn_cs, 1);
171CLUSTER_ATTR(waitwarn_us, 0);
169 172
170static struct configfs_attribute *cluster_attrs[] = { 173static struct configfs_attribute *cluster_attrs[] = {
171 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 174 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -179,6 +182,7 @@ static struct configfs_attribute *cluster_attrs[] = {
179 [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, 182 [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
180 [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, 183 [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
181 [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr, 184 [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
185 [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
182 NULL, 186 NULL,
183}; 187};
184 188
@@ -439,6 +443,7 @@ static struct config_group *make_cluster(struct config_group *g,
439 cl->cl_log_debug = dlm_config.ci_log_debug; 443 cl->cl_log_debug = dlm_config.ci_log_debug;
440 cl->cl_protocol = dlm_config.ci_protocol; 444 cl->cl_protocol = dlm_config.ci_protocol;
441 cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs; 445 cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
446 cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
442 447
443 space_list = &sps->ss_group; 448 space_list = &sps->ss_group;
444 comm_list = &cms->cs_group; 449 comm_list = &cms->cs_group;
@@ -977,15 +982,16 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
977/* Config file defaults */ 982/* Config file defaults */
978#define DEFAULT_TCP_PORT 21064 983#define DEFAULT_TCP_PORT 21064
979#define DEFAULT_BUFFER_SIZE 4096 984#define DEFAULT_BUFFER_SIZE 4096
980#define DEFAULT_RSBTBL_SIZE 256 985#define DEFAULT_RSBTBL_SIZE 1024
981#define DEFAULT_LKBTBL_SIZE 1024 986#define DEFAULT_LKBTBL_SIZE 1024
982#define DEFAULT_DIRTBL_SIZE 512 987#define DEFAULT_DIRTBL_SIZE 1024
983#define DEFAULT_RECOVER_TIMER 5 988#define DEFAULT_RECOVER_TIMER 5
984#define DEFAULT_TOSS_SECS 10 989#define DEFAULT_TOSS_SECS 10
985#define DEFAULT_SCAN_SECS 5 990#define DEFAULT_SCAN_SECS 5
986#define DEFAULT_LOG_DEBUG 0 991#define DEFAULT_LOG_DEBUG 0
987#define DEFAULT_PROTOCOL 0 992#define DEFAULT_PROTOCOL 0
988#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */ 993#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
994#define DEFAULT_WAITWARN_US 0
989 995
990struct dlm_config_info dlm_config = { 996struct dlm_config_info dlm_config = {
991 .ci_tcp_port = DEFAULT_TCP_PORT, 997 .ci_tcp_port = DEFAULT_TCP_PORT,
@@ -998,6 +1004,7 @@ struct dlm_config_info dlm_config = {
998 .ci_scan_secs = DEFAULT_SCAN_SECS, 1004 .ci_scan_secs = DEFAULT_SCAN_SECS,
999 .ci_log_debug = DEFAULT_LOG_DEBUG, 1005 .ci_log_debug = DEFAULT_LOG_DEBUG,
1000 .ci_protocol = DEFAULT_PROTOCOL, 1006 .ci_protocol = DEFAULT_PROTOCOL,
1001 .ci_timewarn_cs = DEFAULT_TIMEWARN_CS 1007 .ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
1008 .ci_waitwarn_us = DEFAULT_WAITWARN_US
1002}; 1009};
1003 1010
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 4f1d6fce58c5..dd0ce24d5a80 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -28,6 +28,7 @@ struct dlm_config_info {
28 int ci_log_debug; 28 int ci_log_debug;
29 int ci_protocol; 29 int ci_protocol;
30 int ci_timewarn_cs; 30 int ci_timewarn_cs;
31 int ci_waitwarn_us;
31}; 32};
32 33
33extern struct dlm_config_info dlm_config; 34extern struct dlm_config_info dlm_config;
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index c6cf25158746..59779237e2b4 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -257,12 +257,12 @@ static int print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
257 lkb->lkb_status, 257 lkb->lkb_status,
258 lkb->lkb_grmode, 258 lkb->lkb_grmode,
259 lkb->lkb_rqmode, 259 lkb->lkb_rqmode,
260 lkb->lkb_bastmode, 260 lkb->lkb_last_bast.mode,
261 rsb_lookup, 261 rsb_lookup,
262 lkb->lkb_wait_type, 262 lkb->lkb_wait_type,
263 lkb->lkb_lvbseq, 263 lkb->lkb_lvbseq,
264 (unsigned long long)ktime_to_ns(lkb->lkb_timestamp), 264 (unsigned long long)ktime_to_ns(lkb->lkb_timestamp),
265 (unsigned long long)ktime_to_ns(lkb->lkb_time_bast)); 265 (unsigned long long)ktime_to_ns(lkb->lkb_last_bast_time));
266 return rv; 266 return rv;
267} 267}
268 268
@@ -643,7 +643,8 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
643static const struct file_operations waiters_fops = { 643static const struct file_operations waiters_fops = {
644 .owner = THIS_MODULE, 644 .owner = THIS_MODULE,
645 .open = waiters_open, 645 .open = waiters_open,
646 .read = waiters_read 646 .read = waiters_read,
647 .llseek = default_llseek,
647}; 648};
648 649
649void dlm_delete_debug_file(struct dlm_ls *ls) 650void dlm_delete_debug_file(struct dlm_ls *ls)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index f632b58cd222..0262451eb9c6 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -192,11 +192,6 @@ struct dlm_args {
192 * lkb is a process copy, the nodeid specifies the lock master. 192 * lkb is a process copy, the nodeid specifies the lock master.
193 */ 193 */
194 194
195/* lkb_ast_type */
196
197#define AST_COMP 1
198#define AST_BAST 2
199
200/* lkb_status */ 195/* lkb_status */
201 196
202#define DLM_LKSTS_WAITING 1 197#define DLM_LKSTS_WAITING 1
@@ -214,9 +209,24 @@ struct dlm_args {
214#define DLM_IFL_WATCH_TIMEWARN 0x00400000 209#define DLM_IFL_WATCH_TIMEWARN 0x00400000
215#define DLM_IFL_TIMEOUT_CANCEL 0x00800000 210#define DLM_IFL_TIMEOUT_CANCEL 0x00800000
216#define DLM_IFL_DEADLOCK_CANCEL 0x01000000 211#define DLM_IFL_DEADLOCK_CANCEL 0x01000000
212#define DLM_IFL_STUB_MS 0x02000000 /* magic number for m_flags */
217#define DLM_IFL_USER 0x00000001 213#define DLM_IFL_USER 0x00000001
218#define DLM_IFL_ORPHAN 0x00000002 214#define DLM_IFL_ORPHAN 0x00000002
219 215
216#define DLM_CALLBACKS_SIZE 6
217
218#define DLM_CB_CAST 0x00000001
219#define DLM_CB_BAST 0x00000002
220#define DLM_CB_SKIP 0x00000004
221
222struct dlm_callback {
223 uint64_t seq;
224 uint32_t flags; /* DLM_CBF_ */
225 int sb_status; /* copy to lksb status */
226 uint8_t sb_flags; /* copy to lksb flags */
227 int8_t mode; /* rq mode of bast, gr mode of cast */
228};
229
220struct dlm_lkb { 230struct dlm_lkb {
221 struct dlm_rsb *lkb_resource; /* the rsb */ 231 struct dlm_rsb *lkb_resource; /* the rsb */
222 struct kref lkb_ref; 232 struct kref lkb_ref;
@@ -236,13 +246,7 @@ struct dlm_lkb {
236 246
237 int8_t lkb_wait_type; /* type of reply waiting for */ 247 int8_t lkb_wait_type; /* type of reply waiting for */
238 int8_t lkb_wait_count; 248 int8_t lkb_wait_count;
239 int8_t lkb_ast_type; /* type of ast queued for */ 249 int lkb_wait_nodeid; /* for debugging */
240 int8_t lkb_ast_first; /* type of first ast queued */
241
242 int8_t lkb_bastmode; /* req mode of queued bast */
243 int8_t lkb_castmode; /* gr mode of queued cast */
244 int8_t lkb_bastmode_done; /* last delivered bastmode */
245 int8_t lkb_castmode_done; /* last delivered castmode */
246 250
247 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ 251 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */
248 struct list_head lkb_statequeue; /* rsb g/c/w list */ 252 struct list_head lkb_statequeue; /* rsb g/c/w list */
@@ -251,10 +255,16 @@ struct dlm_lkb {
251 struct list_head lkb_astqueue; /* need ast to be sent */ 255 struct list_head lkb_astqueue; /* need ast to be sent */
252 struct list_head lkb_ownqueue; /* list of locks for a process */ 256 struct list_head lkb_ownqueue; /* list of locks for a process */
253 struct list_head lkb_time_list; 257 struct list_head lkb_time_list;
254 ktime_t lkb_time_bast; /* for debugging */
255 ktime_t lkb_timestamp; 258 ktime_t lkb_timestamp;
259 ktime_t lkb_wait_time;
256 unsigned long lkb_timeout_cs; 260 unsigned long lkb_timeout_cs;
257 261
262 struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE];
263 struct dlm_callback lkb_last_cast;
264 struct dlm_callback lkb_last_bast;
265 ktime_t lkb_last_cast_time; /* for debugging */
266 ktime_t lkb_last_bast_time; /* for debugging */
267
258 char *lkb_lvbptr; 268 char *lkb_lvbptr;
259 struct dlm_lksb *lkb_lksb; /* caller's status block */ 269 struct dlm_lksb *lkb_lksb; /* caller's status block */
260 void (*lkb_astfn) (void *astparam); 270 void (*lkb_astfn) (void *astparam);
@@ -544,8 +554,6 @@ struct dlm_user_args {
544 (dlm_user_proc) on the struct file, 554 (dlm_user_proc) on the struct file,
545 the process's locks point back to it*/ 555 the process's locks point back to it*/
546 struct dlm_lksb lksb; 556 struct dlm_lksb lksb;
547 int old_mode;
548 int update_user_lvb;
549 struct dlm_lksb __user *user_lksb; 557 struct dlm_lksb __user *user_lksb;
550 void __user *castparam; 558 void __user *castparam;
551 void __user *castaddr; 559 void __user *castaddr;
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 031dbe3a15ca..f71d0b5abd95 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,10 +160,10 @@ static const int __quecvt_compat_matrix[8][8] = {
160void dlm_print_lkb(struct dlm_lkb *lkb) 160void dlm_print_lkb(struct dlm_lkb *lkb)
161{ 161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", 163 " status %d rqmode %d grmode %d wait_type %d\n",
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); 166 lkb->lkb_grmode, lkb->lkb_wait_type);
167} 167}
168 168
169static void dlm_print_rsb(struct dlm_rsb *r) 169static void dlm_print_rsb(struct dlm_rsb *r)
@@ -305,10 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
305 rv = -EDEADLK; 305 rv = -EDEADLK;
306 } 306 }
307 307
308 lkb->lkb_lksb->sb_status = rv; 308 dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
310
311 dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
312} 309}
313 310
314static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -319,13 +316,10 @@ static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
319 316
320static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 317static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321{ 318{
322 lkb->lkb_time_bast = ktime_get();
323
324 if (is_master_copy(lkb)) { 319 if (is_master_copy(lkb)) {
325 lkb->lkb_bastmode = rqmode; /* printed by debugfs */
326 send_bast(r, lkb, rqmode); 320 send_bast(r, lkb, rqmode);
327 } else { 321 } else {
328 dlm_add_ast(lkb, AST_BAST, rqmode); 322 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
329 } 323 }
330} 324}
331 325
@@ -525,7 +519,7 @@ static void toss_rsb(struct kref *kref)
525 } 519 }
526} 520}
527 521
528/* When all references to the rsb are gone it's transfered to 522/* When all references to the rsb are gone it's transferred to
529 the tossed list for later disposal. */ 523 the tossed list for later disposal. */
530 524
531static void put_rsb(struct dlm_rsb *r) 525static void put_rsb(struct dlm_rsb *r)
@@ -600,6 +594,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
600 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
601 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
602 INIT_LIST_HEAD(&lkb->lkb_time_list); 596 INIT_LIST_HEAD(&lkb->lkb_time_list);
597 INIT_LIST_HEAD(&lkb->lkb_astqueue);
603 598
604 get_random_bytes(&bucket, sizeof(bucket)); 599 get_random_bytes(&bucket, sizeof(bucket));
605 bucket &= (ls->ls_lkbtbl_size - 1); 600 bucket &= (ls->ls_lkbtbl_size - 1);
@@ -804,10 +799,84 @@ static int msg_reply_type(int mstype)
804 return -1; 799 return -1;
805} 800}
806 801
802static int nodeid_warned(int nodeid, int num_nodes, int *warned)
803{
804 int i;
805
806 for (i = 0; i < num_nodes; i++) {
807 if (!warned[i]) {
808 warned[i] = nodeid;
809 return 0;
810 }
811 if (warned[i] == nodeid)
812 return 1;
813 }
814 return 0;
815}
816
817void dlm_scan_waiters(struct dlm_ls *ls)
818{
819 struct dlm_lkb *lkb;
820 ktime_t zero = ktime_set(0, 0);
821 s64 us;
822 s64 debug_maxus = 0;
823 u32 debug_scanned = 0;
824 u32 debug_expired = 0;
825 int num_nodes = 0;
826 int *warned = NULL;
827
828 if (!dlm_config.ci_waitwarn_us)
829 return;
830
831 mutex_lock(&ls->ls_waiters_mutex);
832
833 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834 if (ktime_equal(lkb->lkb_wait_time, zero))
835 continue;
836
837 debug_scanned++;
838
839 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
840
841 if (us < dlm_config.ci_waitwarn_us)
842 continue;
843
844 lkb->lkb_wait_time = zero;
845
846 debug_expired++;
847 if (us > debug_maxus)
848 debug_maxus = us;
849
850 if (!num_nodes) {
851 num_nodes = ls->ls_num_nodes;
852 warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
853 if (warned)
854 memset(warned, 0, num_nodes * sizeof(int));
855 }
856 if (!warned)
857 continue;
858 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
859 continue;
860
861 log_error(ls, "waitwarn %x %lld %d us check connection to "
862 "node %d", lkb->lkb_id, (long long)us,
863 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864 }
865 mutex_unlock(&ls->ls_waiters_mutex);
866
867 if (warned)
868 kfree(warned);
869
870 if (debug_expired)
871 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872 debug_scanned, debug_expired,
873 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
874}
875
807/* add/remove lkb from global waiters list of lkb's waiting for 876/* add/remove lkb from global waiters list of lkb's waiting for
808 a reply from a remote node */ 877 a reply from a remote node */
809 878
810static int add_to_waiters(struct dlm_lkb *lkb, int mstype) 879static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
811{ 880{
812 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 881 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813 int error = 0; 882 int error = 0;
@@ -847,6 +916,8 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
847 916
848 lkb->lkb_wait_count++; 917 lkb->lkb_wait_count++;
849 lkb->lkb_wait_type = mstype; 918 lkb->lkb_wait_type = mstype;
919 lkb->lkb_wait_time = ktime_get();
920 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
850 hold_lkb(lkb); 921 hold_lkb(lkb);
851 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 922 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852 out: 923 out:
@@ -966,10 +1037,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
966 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967 int error; 1038 int error;
968 1039
969 if (ms != &ls->ls_stub_ms) 1040 if (ms->m_flags != DLM_IFL_STUB_MS)
970 mutex_lock(&ls->ls_waiters_mutex); 1041 mutex_lock(&ls->ls_waiters_mutex);
971 error = _remove_from_waiters(lkb, ms->m_type, ms); 1042 error = _remove_from_waiters(lkb, ms->m_type, ms);
972 if (ms != &ls->ls_stub_ms) 1043 if (ms->m_flags != DLM_IFL_STUB_MS)
973 mutex_unlock(&ls->ls_waiters_mutex); 1044 mutex_unlock(&ls->ls_waiters_mutex);
974 return error; 1045 return error;
975} 1046}
@@ -1162,6 +1233,16 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
1162 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) 1233 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); 1234 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164 mutex_unlock(&ls->ls_timeout_mutex); 1235 mutex_unlock(&ls->ls_timeout_mutex);
1236
1237 if (!dlm_config.ci_waitwarn_us)
1238 return;
1239
1240 mutex_lock(&ls->ls_waiters_mutex);
1241 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242 if (ktime_to_us(lkb->lkb_wait_time))
1243 lkb->lkb_wait_time = ktime_get();
1244 }
1245 mutex_unlock(&ls->ls_waiters_mutex);
1165} 1246}
1166 1247
1167/* lkb is master or local copy */ 1248/* lkb is master or local copy */
@@ -1381,14 +1462,8 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1381 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 1462 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382 compatible with other granted locks */ 1463 compatible with other granted locks */
1383 1464
1384static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) 1465static void munge_demoted(struct dlm_lkb *lkb)
1385{ 1466{
1386 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387 log_print("munge_demoted %x invalid reply type %d",
1388 lkb->lkb_id, ms->m_type);
1389 return;
1390 }
1391
1392 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 1467 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393 log_print("munge_demoted %x invalid modes gr %d rq %d", 1468 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 1469 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
@@ -1846,6 +1921,9 @@ static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1846 struct dlm_lkb *gr; 1921 struct dlm_lkb *gr;
1847 1922
1848 list_for_each_entry(gr, head, lkb_statequeue) { 1923 list_for_each_entry(gr, head, lkb_statequeue) {
1924 /* skip self when sending basts to convertqueue */
1925 if (gr == lkb)
1926 continue;
1849 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 1927 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850 queue_bast(r, gr, lkb->lkb_rqmode); 1928 queue_bast(r, gr, lkb->lkb_rqmode);
1851 gr->lkb_highbast = lkb->lkb_rqmode; 1929 gr->lkb_highbast = lkb->lkb_rqmode;
@@ -2816,9 +2894,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 not from lkb fields */ 2894 not from lkb fields */
2817 2895
2818 if (lkb->lkb_bastfn) 2896 if (lkb->lkb_bastfn)
2819 ms->m_asts |= AST_BAST; 2897 ms->m_asts |= DLM_CB_BAST;
2820 if (lkb->lkb_astfn) 2898 if (lkb->lkb_astfn)
2821 ms->m_asts |= AST_COMP; 2899 ms->m_asts |= DLM_CB_CAST;
2822 2900
2823 /* compare with switch in create_message; send_remove() doesn't 2901 /* compare with switch in create_message; send_remove() doesn't
2824 use send_args() */ 2902 use send_args() */
@@ -2846,12 +2924,12 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2846 struct dlm_mhandle *mh; 2924 struct dlm_mhandle *mh;
2847 int to_nodeid, error; 2925 int to_nodeid, error;
2848 2926
2849 error = add_to_waiters(lkb, mstype); 2927 to_nodeid = r->res_nodeid;
2928
2929 error = add_to_waiters(lkb, mstype, to_nodeid);
2850 if (error) 2930 if (error)
2851 return error; 2931 return error;
2852 2932
2853 to_nodeid = r->res_nodeid;
2854
2855 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2933 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2856 if (error) 2934 if (error)
2857 goto fail; 2935 goto fail;
@@ -2882,9 +2960,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2882 /* down conversions go without a reply from the master */ 2960 /* down conversions go without a reply from the master */
2883 if (!error && down_conversion(lkb)) { 2961 if (!error && down_conversion(lkb)) {
2884 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 2962 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2885 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 2964 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2886 r->res_ls->ls_stub_ms.m_result = 0; 2965 r->res_ls->ls_stub_ms.m_result = 0;
2887 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2888 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2966 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2889 } 2967 }
2890 2968
@@ -2953,12 +3031,12 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2953 struct dlm_mhandle *mh; 3031 struct dlm_mhandle *mh;
2954 int to_nodeid, error; 3032 int to_nodeid, error;
2955 3033
2956 error = add_to_waiters(lkb, DLM_MSG_LOOKUP); 3034 to_nodeid = dlm_dir_nodeid(r);
3035
3036 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
2957 if (error) 3037 if (error)
2958 return error; 3038 return error;
2959 3039
2960 to_nodeid = dlm_dir_nodeid(r);
2961
2962 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 3040 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2963 if (error) 3041 if (error)
2964 goto fail; 3042 goto fail;
@@ -3072,6 +3150,9 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3072 3150
3073static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3151static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3074{ 3152{
3153 if (ms->m_flags == DLM_IFL_STUB_MS)
3154 return;
3155
3075 lkb->lkb_sbflags = ms->m_sbflags; 3156 lkb->lkb_sbflags = ms->m_sbflags;
3076 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3157 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3077 (ms->m_flags & 0x0000FFFF); 3158 (ms->m_flags & 0x0000FFFF);
@@ -3119,8 +3200,8 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3119 lkb->lkb_grmode = DLM_LOCK_IV; 3200 lkb->lkb_grmode = DLM_LOCK_IV;
3120 lkb->lkb_rqmode = ms->m_rqmode; 3201 lkb->lkb_rqmode = ms->m_rqmode;
3121 3202
3122 lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL; 3203 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3123 lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL; 3204 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3124 3205
3125 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3206 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3126 /* lkb was just created so there won't be an lvb yet */ 3207 /* lkb was just created so there won't be an lvb yet */
@@ -3614,7 +3695,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3614 /* convert was queued on remote master */ 3695 /* convert was queued on remote master */
3615 receive_flags_reply(lkb, ms); 3696 receive_flags_reply(lkb, ms);
3616 if (is_demoted(lkb)) 3697 if (is_demoted(lkb))
3617 munge_demoted(lkb, ms); 3698 munge_demoted(lkb);
3618 del_lkb(r, lkb); 3699 del_lkb(r, lkb);
3619 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3700 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3620 add_timeout(lkb); 3701 add_timeout(lkb);
@@ -3624,7 +3705,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3624 /* convert was granted on remote master */ 3705 /* convert was granted on remote master */
3625 receive_flags_reply(lkb, ms); 3706 receive_flags_reply(lkb, ms);
3626 if (is_demoted(lkb)) 3707 if (is_demoted(lkb))
3627 munge_demoted(lkb, ms); 3708 munge_demoted(lkb);
3628 grant_lock_pc(r, lkb, ms); 3709 grant_lock_pc(r, lkb, ms);
3629 queue_cast(r, lkb, 0); 3710 queue_cast(r, lkb, 0);
3630 break; 3711 break;
@@ -3998,15 +4079,17 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3998 dlm_put_lockspace(ls); 4079 dlm_put_lockspace(ls);
3999} 4080}
4000 4081
4001static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 4082static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4083 struct dlm_message *ms_stub)
4002{ 4084{
4003 if (middle_conversion(lkb)) { 4085 if (middle_conversion(lkb)) {
4004 hold_lkb(lkb); 4086 hold_lkb(lkb);
4005 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 4087 memset(ms_stub, 0, sizeof(struct dlm_message));
4006 ls->ls_stub_ms.m_result = -EINPROGRESS; 4088 ms_stub->m_flags = DLM_IFL_STUB_MS;
4007 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 4089 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4008 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; 4090 ms_stub->m_result = -EINPROGRESS;
4009 _receive_convert_reply(lkb, &ls->ls_stub_ms); 4091 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4092 _receive_convert_reply(lkb, ms_stub);
4010 4093
4011 /* Same special case as in receive_rcom_lock_args() */ 4094 /* Same special case as in receive_rcom_lock_args() */
4012 lkb->lkb_grmode = DLM_LOCK_IV; 4095 lkb->lkb_grmode = DLM_LOCK_IV;
@@ -4047,13 +4130,27 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4047void dlm_recover_waiters_pre(struct dlm_ls *ls) 4130void dlm_recover_waiters_pre(struct dlm_ls *ls)
4048{ 4131{
4049 struct dlm_lkb *lkb, *safe; 4132 struct dlm_lkb *lkb, *safe;
4133 struct dlm_message *ms_stub;
4050 int wait_type, stub_unlock_result, stub_cancel_result; 4134 int wait_type, stub_unlock_result, stub_cancel_result;
4051 4135
4136 ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
4137 if (!ms_stub) {
4138 log_error(ls, "dlm_recover_waiters_pre no mem");
4139 return;
4140 }
4141
4052 mutex_lock(&ls->ls_waiters_mutex); 4142 mutex_lock(&ls->ls_waiters_mutex);
4053 4143
4054 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 4144 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4055 log_debug(ls, "pre recover waiter lkid %x type %d flags %x", 4145
4056 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); 4146 /* exclude debug messages about unlocks because there can be so
4147 many and they aren't very interesting */
4148
4149 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4150 log_debug(ls, "recover_waiter %x nodeid %d "
4151 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4152 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4153 }
4057 4154
4058 /* all outstanding lookups, regardless of destination will be 4155 /* all outstanding lookups, regardless of destination will be
4059 resent after recovery is done */ 4156 resent after recovery is done */
@@ -4099,26 +4196,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4099 break; 4196 break;
4100 4197
4101 case DLM_MSG_CONVERT: 4198 case DLM_MSG_CONVERT:
4102 recover_convert_waiter(ls, lkb); 4199 recover_convert_waiter(ls, lkb, ms_stub);
4103 break; 4200 break;
4104 4201
4105 case DLM_MSG_UNLOCK: 4202 case DLM_MSG_UNLOCK:
4106 hold_lkb(lkb); 4203 hold_lkb(lkb);
4107 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; 4204 memset(ms_stub, 0, sizeof(struct dlm_message));
4108 ls->ls_stub_ms.m_result = stub_unlock_result; 4205 ms_stub->m_flags = DLM_IFL_STUB_MS;
4109 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 4206 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4110 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; 4207 ms_stub->m_result = stub_unlock_result;
4111 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 4208 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4209 _receive_unlock_reply(lkb, ms_stub);
4112 dlm_put_lkb(lkb); 4210 dlm_put_lkb(lkb);
4113 break; 4211 break;
4114 4212
4115 case DLM_MSG_CANCEL: 4213 case DLM_MSG_CANCEL:
4116 hold_lkb(lkb); 4214 hold_lkb(lkb);
4117 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; 4215 memset(ms_stub, 0, sizeof(struct dlm_message));
4118 ls->ls_stub_ms.m_result = stub_cancel_result; 4216 ms_stub->m_flags = DLM_IFL_STUB_MS;
4119 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 4217 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4120 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; 4218 ms_stub->m_result = stub_cancel_result;
4121 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 4219 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4220 _receive_cancel_reply(lkb, ms_stub);
4122 dlm_put_lkb(lkb); 4221 dlm_put_lkb(lkb);
4123 break; 4222 break;
4124 4223
@@ -4129,6 +4228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4129 schedule(); 4228 schedule();
4130 } 4229 }
4131 mutex_unlock(&ls->ls_waiters_mutex); 4230 mutex_unlock(&ls->ls_waiters_mutex);
4231 kfree(ms_stub);
4132} 4232}
4133 4233
4134static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 4234static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
@@ -4193,8 +4293,8 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
4193 ou = is_overlap_unlock(lkb); 4293 ou = is_overlap_unlock(lkb);
4194 err = 0; 4294 err = 0;
4195 4295
4196 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 4296 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4197 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 4297 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4198 4298
4199 /* At this point we assume that we won't get a reply to any 4299 /* At this point we assume that we won't get a reply to any
4200 previous op or overlap op on this lock. First, do a big 4300 previous op or overlap op on this lock. First, do a big
@@ -4409,8 +4509,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4409 lkb->lkb_grmode = rl->rl_grmode; 4509 lkb->lkb_grmode = rl->rl_grmode;
4410 /* don't set lkb_status because add_lkb wants to itself */ 4510 /* don't set lkb_status because add_lkb wants to itself */
4411 4511
4412 lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL; 4512 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4413 lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL; 4513 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4414 4514
4415 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 4515 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4416 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - 4516 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
@@ -4586,7 +4686,6 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4586 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, 4686 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4587 fake_astfn, ua, fake_bastfn, &args); 4687 fake_astfn, ua, fake_bastfn, &args);
4588 lkb->lkb_flags |= DLM_IFL_USER; 4688 lkb->lkb_flags |= DLM_IFL_USER;
4589 ua->old_mode = DLM_LOCK_IV;
4590 4689
4591 if (error) { 4690 if (error) {
4592 __put_lkb(ls, lkb); 4691 __put_lkb(ls, lkb);
@@ -4655,7 +4754,6 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4655 ua->bastparam = ua_tmp->bastparam; 4754 ua->bastparam = ua_tmp->bastparam;
4656 ua->bastaddr = ua_tmp->bastaddr; 4755 ua->bastaddr = ua_tmp->bastaddr;
4657 ua->user_lksb = ua_tmp->user_lksb; 4756 ua->user_lksb = ua_tmp->user_lksb;
4658 ua->old_mode = lkb->lkb_grmode;
4659 4757
4660 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, 4758 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4661 fake_astfn, ua, fake_bastfn, &args); 4759 fake_astfn, ua, fake_bastfn, &args);
@@ -4914,8 +5012,9 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4914 } 5012 }
4915 5013
4916 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 5014 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4917 lkb->lkb_ast_type = 0; 5015 memset(&lkb->lkb_callbacks, 0,
4918 list_del(&lkb->lkb_astqueue); 5016 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5017 list_del_init(&lkb->lkb_astqueue);
4919 dlm_put_lkb(lkb); 5018 dlm_put_lkb(lkb);
4920 } 5019 }
4921 5020
@@ -4955,7 +5054,9 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4955 5054
4956 spin_lock(&proc->asts_spin); 5055 spin_lock(&proc->asts_spin);
4957 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 5056 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4958 list_del(&lkb->lkb_astqueue); 5057 memset(&lkb->lkb_callbacks, 0,
5058 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5059 list_del_init(&lkb->lkb_astqueue);
4959 dlm_put_lkb(lkb); 5060 dlm_put_lkb(lkb);
4960 } 5061 }
4961 spin_unlock(&proc->asts_spin); 5062 spin_unlock(&proc->asts_spin);
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 88e93c80cc22..265017a7c3e7 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -24,6 +24,7 @@ int dlm_put_lkb(struct dlm_lkb *lkb);
24void dlm_scan_rsbs(struct dlm_ls *ls); 24void dlm_scan_rsbs(struct dlm_ls *ls);
25int dlm_lock_recovery_try(struct dlm_ls *ls); 25int dlm_lock_recovery_try(struct dlm_ls *ls);
26void dlm_unlock_recovery(struct dlm_ls *ls); 26void dlm_unlock_recovery(struct dlm_ls *ls);
27void dlm_scan_waiters(struct dlm_ls *ls);
27void dlm_scan_timeout(struct dlm_ls *ls); 28void dlm_scan_timeout(struct dlm_ls *ls);
28void dlm_adjust_timeouts(struct dlm_ls *ls); 29void dlm_adjust_timeouts(struct dlm_ls *ls);
29 30
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index f994a7dfda85..14cbf4099753 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -243,7 +243,6 @@ static struct dlm_ls *find_ls_to_scan(void)
243static int dlm_scand(void *data) 243static int dlm_scand(void *data)
244{ 244{
245 struct dlm_ls *ls; 245 struct dlm_ls *ls;
246 int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
247 246
248 while (!kthread_should_stop()) { 247 while (!kthread_should_stop()) {
249 ls = find_ls_to_scan(); 248 ls = find_ls_to_scan();
@@ -252,13 +251,14 @@ static int dlm_scand(void *data)
252 ls->ls_scan_time = jiffies; 251 ls->ls_scan_time = jiffies;
253 dlm_scan_rsbs(ls); 252 dlm_scan_rsbs(ls);
254 dlm_scan_timeout(ls); 253 dlm_scan_timeout(ls);
254 dlm_scan_waiters(ls);
255 dlm_unlock_recovery(ls); 255 dlm_unlock_recovery(ls);
256 } else { 256 } else {
257 ls->ls_scan_time += HZ; 257 ls->ls_scan_time += HZ;
258 } 258 }
259 } else { 259 continue;
260 schedule_timeout_interruptible(timeout_jiffies);
261 } 260 }
261 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262 } 262 }
263 return 0; 263 return 0;
264} 264}
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 37a34c2c622a..5e2c71f05e46 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,6 +63,9 @@
63#define NEEDED_RMEM (4*1024*1024) 63#define NEEDED_RMEM (4*1024*1024)
64#define CONN_HASH_SIZE 32 64#define CONN_HASH_SIZE 32
65 65
66/* Number of messages to send before rescheduling */
67#define MAX_SEND_MSG_COUNT 25
68
66struct cbuf { 69struct cbuf {
67 unsigned int base; 70 unsigned int base;
68 unsigned int len; 71 unsigned int len;
@@ -108,6 +111,7 @@ struct connection {
108#define CF_INIT_PENDING 4 111#define CF_INIT_PENDING 4
109#define CF_IS_OTHERCON 5 112#define CF_IS_OTHERCON 5
110#define CF_CLOSE 6 113#define CF_CLOSE 6
114#define CF_APP_LIMITED 7
111 struct list_head writequeue; /* List of outgoing writequeue_entries */ 115 struct list_head writequeue; /* List of outgoing writequeue_entries */
112 spinlock_t writequeue_lock; 116 spinlock_t writequeue_lock;
113 int (*rx_action) (struct connection *); /* What to do when active */ 117 int (*rx_action) (struct connection *); /* What to do when active */
@@ -295,7 +299,17 @@ static void lowcomms_write_space(struct sock *sk)
295{ 299{
296 struct connection *con = sock2con(sk); 300 struct connection *con = sock2con(sk);
297 301
298 if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 302 if (!con)
303 return;
304
305 clear_bit(SOCK_NOSPACE, &con->sock->flags);
306
307 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
308 con->sock->sk->sk_write_pending--;
309 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
310 }
311
312 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
299 queue_work(send_workqueue, &con->swork); 313 queue_work(send_workqueue, &con->swork);
300} 314}
301 315
@@ -796,7 +810,7 @@ static int tcp_accept_from_sock(struct connection *con)
796 810
797 /* 811 /*
798 * Add it to the active queue in case we got data 812 * Add it to the active queue in case we got data
799 * beween processing the accept adding the socket 813 * between processing the accept adding the socket
800 * to the read_sockets list 814 * to the read_sockets list
801 */ 815 */
802 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 816 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
@@ -915,6 +929,7 @@ static void tcp_connect_to_sock(struct connection *con)
915 struct sockaddr_storage saddr, src_addr; 929 struct sockaddr_storage saddr, src_addr;
916 int addr_len; 930 int addr_len;
917 struct socket *sock = NULL; 931 struct socket *sock = NULL;
932 int one = 1;
918 933
919 if (con->nodeid == 0) { 934 if (con->nodeid == 0) {
920 log_print("attempt to connect sock 0 foiled"); 935 log_print("attempt to connect sock 0 foiled");
@@ -960,6 +975,11 @@ static void tcp_connect_to_sock(struct connection *con)
960 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 975 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
961 976
962 log_print("connecting to %d", con->nodeid); 977 log_print("connecting to %d", con->nodeid);
978
979 /* Turn off Nagle's algorithm */
980 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
981 sizeof(one));
982
963 result = 983 result =
964 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 984 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
965 O_NONBLOCK); 985 O_NONBLOCK);
@@ -1011,6 +1031,10 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
1011 goto create_out; 1031 goto create_out;
1012 } 1032 }
1013 1033
1034 /* Turn off Nagle's algorithm */
1035 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1036 sizeof(one));
1037
1014 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1038 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1015 (char *)&one, sizeof(one)); 1039 (char *)&one, sizeof(one));
1016 1040
@@ -1297,6 +1321,7 @@ static void send_to_sock(struct connection *con)
1297 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1321 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1298 struct writequeue_entry *e; 1322 struct writequeue_entry *e;
1299 int len, offset; 1323 int len, offset;
1324 int count = 0;
1300 1325
1301 mutex_lock(&con->sock_mutex); 1326 mutex_lock(&con->sock_mutex);
1302 if (con->sock == NULL) 1327 if (con->sock == NULL)
@@ -1319,14 +1344,27 @@ static void send_to_sock(struct connection *con)
1319 ret = kernel_sendpage(con->sock, e->page, offset, len, 1344 ret = kernel_sendpage(con->sock, e->page, offset, len,
1320 msg_flags); 1345 msg_flags);
1321 if (ret == -EAGAIN || ret == 0) { 1346 if (ret == -EAGAIN || ret == 0) {
1347 if (ret == -EAGAIN &&
1348 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1349 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1350 /* Notify TCP that we're limited by the
1351 * application window size.
1352 */
1353 set_bit(SOCK_NOSPACE, &con->sock->flags);
1354 con->sock->sk->sk_write_pending++;
1355 }
1322 cond_resched(); 1356 cond_resched();
1323 goto out; 1357 goto out;
1324 } 1358 }
1325 if (ret <= 0) 1359 if (ret <= 0)
1326 goto send_error; 1360 goto send_error;
1327 } 1361 }
1328 /* Don't starve people filling buffers */ 1362
1363 /* Don't starve people filling buffers */
1364 if (++count >= MAX_SEND_MSG_COUNT) {
1329 cond_resched(); 1365 cond_resched();
1366 count = 0;
1367 }
1330 1368
1331 spin_lock(&con->writequeue_lock); 1369 spin_lock(&con->writequeue_lock);
1332 e->offset += ret; 1370 e->offset += ret;
@@ -1430,20 +1468,19 @@ static void work_stop(void)
1430 1468
1431static int work_start(void) 1469static int work_start(void)
1432{ 1470{
1433 int error; 1471 recv_workqueue = alloc_workqueue("dlm_recv",
1434 recv_workqueue = create_workqueue("dlm_recv"); 1472 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1435 error = IS_ERR(recv_workqueue); 1473 if (!recv_workqueue) {
1436 if (error) { 1474 log_print("can't start dlm_recv");
1437 log_print("can't start dlm_recv %d", error); 1475 return -ENOMEM;
1438 return error;
1439 } 1476 }
1440 1477
1441 send_workqueue = create_singlethread_workqueue("dlm_send"); 1478 send_workqueue = alloc_workqueue("dlm_send",
1442 error = IS_ERR(send_workqueue); 1479 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1443 if (error) { 1480 if (!send_workqueue) {
1444 log_print("can't start dlm_send %d", error); 1481 log_print("can't start dlm_send");
1445 destroy_workqueue(recv_workqueue); 1482 destroy_workqueue(recv_workqueue);
1446 return error; 1483 return -ENOMEM;
1447 } 1484 }
1448 1485
1449 return 0; 1486 return 0;
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index b80e0aa3cfa5..5a59efa0bb46 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -50,7 +50,7 @@ static int __init init_dlm(void)
50 if (error) 50 if (error)
51 goto out_netlink; 51 goto out_netlink;
52 52
53 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__); 53 printk("DLM installed\n");
54 54
55 return 0; 55 return 0;
56 56
diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c
index d45c02db6943..e2b878004364 100644
--- a/fs/dlm/plock.c
+++ b/fs/dlm/plock.c
@@ -71,6 +71,36 @@ static void send_op(struct plock_op *op)
71 wake_up(&send_wq); 71 wake_up(&send_wq);
72} 72}
73 73
74/* If a process was killed while waiting for the only plock on a file,
75 locks_remove_posix will not see any lock on the file so it won't
76 send an unlock-close to us to pass on to userspace to clean up the
77 abandoned waiter. So, we have to insert the unlock-close when the
78 lock call is interrupted. */
79
80static void do_unlock_close(struct dlm_ls *ls, u64 number,
81 struct file *file, struct file_lock *fl)
82{
83 struct plock_op *op;
84
85 op = kzalloc(sizeof(*op), GFP_NOFS);
86 if (!op)
87 return;
88
89 op->info.optype = DLM_PLOCK_OP_UNLOCK;
90 op->info.pid = fl->fl_pid;
91 op->info.fsid = ls->ls_global_id;
92 op->info.number = number;
93 op->info.start = 0;
94 op->info.end = OFFSET_MAX;
95 if (fl->fl_lmops && fl->fl_lmops->fl_grant)
96 op->info.owner = (__u64) fl->fl_pid;
97 else
98 op->info.owner = (__u64)(long) fl->fl_owner;
99
100 op->info.flags |= DLM_PLOCK_FL_CLOSE;
101 send_op(op);
102}
103
74int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, 104int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
75 int cmd, struct file_lock *fl) 105 int cmd, struct file_lock *fl)
76{ 106{
@@ -114,9 +144,19 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
114 144
115 send_op(op); 145 send_op(op);
116 146
117 if (xop->callback == NULL) 147 if (xop->callback == NULL) {
118 wait_event(recv_wq, (op->done != 0)); 148 rv = wait_event_killable(recv_wq, (op->done != 0));
119 else { 149 if (rv == -ERESTARTSYS) {
150 log_debug(ls, "dlm_posix_lock: wait killed %llx",
151 (unsigned long long)number);
152 spin_lock(&ops_lock);
153 list_del(&op->list);
154 spin_unlock(&ops_lock);
155 kfree(xop);
156 do_unlock_close(ls, number, file, fl);
157 goto out;
158 }
159 } else {
120 rv = FILE_LOCK_DEFERRED; 160 rv = FILE_LOCK_DEFERRED;
121 goto out; 161 goto out;
122 } 162 }
@@ -233,6 +273,13 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
233 else 273 else
234 op->info.owner = (__u64)(long) fl->fl_owner; 274 op->info.owner = (__u64)(long) fl->fl_owner;
235 275
276 if (fl->fl_flags & FL_CLOSE) {
277 op->info.flags |= DLM_PLOCK_FL_CLOSE;
278 send_op(op);
279 rv = 0;
280 goto out;
281 }
282
236 send_op(op); 283 send_op(op);
237 wait_event(recv_wq, (op->done != 0)); 284 wait_event(recv_wq, (op->done != 0));
238 285
@@ -334,7 +381,10 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count,
334 spin_lock(&ops_lock); 381 spin_lock(&ops_lock);
335 if (!list_empty(&send_list)) { 382 if (!list_empty(&send_list)) {
336 op = list_entry(send_list.next, struct plock_op, list); 383 op = list_entry(send_list.next, struct plock_op, list);
337 list_move(&op->list, &recv_list); 384 if (op->info.flags & DLM_PLOCK_FL_CLOSE)
385 list_del(&op->list);
386 else
387 list_move(&op->list, &recv_list);
338 memcpy(&info, &op->info, sizeof(info)); 388 memcpy(&info, &op->info, sizeof(info));
339 } 389 }
340 spin_unlock(&ops_lock); 390 spin_unlock(&ops_lock);
@@ -342,6 +392,13 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count,
342 if (!op) 392 if (!op)
343 return -EAGAIN; 393 return -EAGAIN;
344 394
395 /* there is no need to get a reply from userspace for unlocks
396 that were generated by the vfs cleaning up for a close
397 (the process did not make an unlock call). */
398
399 if (op->info.flags & DLM_PLOCK_FL_CLOSE)
400 kfree(op);
401
345 if (copy_to_user(u, &info, sizeof(info))) 402 if (copy_to_user(u, &info, sizeof(info)))
346 return -EFAULT; 403 return -EFAULT;
347 return sizeof(info); 404 return sizeof(info);
@@ -412,7 +469,8 @@ static const struct file_operations dev_fops = {
412 .read = dev_read, 469 .read = dev_read,
413 .write = dev_write, 470 .write = dev_write,
414 .poll = dev_poll, 471 .poll = dev_poll,
415 .owner = THIS_MODULE 472 .owner = THIS_MODULE,
473 .llseek = noop_llseek,
416}; 474};
417 475
418static struct miscdevice plock_dev_misc = { 476static struct miscdevice plock_dev_misc = {
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 3c83a49a48a3..f10a50f24e8f 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -321,9 +321,9 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
321 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type); 321 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
322 322
323 if (lkb->lkb_bastfn) 323 if (lkb->lkb_bastfn)
324 rl->rl_asts |= AST_BAST; 324 rl->rl_asts |= DLM_CB_BAST;
325 if (lkb->lkb_astfn) 325 if (lkb->lkb_astfn)
326 rl->rl_asts |= AST_COMP; 326 rl->rl_asts |= DLM_CB_CAST;
327 327
328 rl->rl_namelen = cpu_to_le16(r->res_length); 328 rl->rl_namelen = cpu_to_le16(r->res_length);
329 memcpy(rl->rl_name, r->res_name, r->res_length); 329 memcpy(rl->rl_name, r->res_name, r->res_length);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index eda43f362616..14638235f7b2 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -304,7 +304,7 @@ static void set_master_lkbs(struct dlm_rsb *r)
304} 304}
305 305
306/* 306/*
307 * Propogate the new master nodeid to locks 307 * Propagate the new master nodeid to locks
308 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. 308 * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
309 * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which 309 * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
310 * rsb's to consider. 310 * rsb's to consider.
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index b6272853130c..e96bf3e9be88 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -24,6 +24,7 @@
24#include "lock.h" 24#include "lock.h"
25#include "lvb_table.h" 25#include "lvb_table.h"
26#include "user.h" 26#include "user.h"
27#include "ast.h"
27 28
28static const char name_prefix[] = "dlm"; 29static const char name_prefix[] = "dlm";
29static const struct file_operations device_fops; 30static const struct file_operations device_fops;
@@ -152,19 +153,16 @@ static void compat_output(struct dlm_lock_result *res,
152 not related to the lifetime of the lkb struct which is managed 153 not related to the lifetime of the lkb struct which is managed
153 entirely by refcount. */ 154 entirely by refcount. */
154 155
155static int lkb_is_endoflife(struct dlm_lkb *lkb, int sb_status, int type) 156static int lkb_is_endoflife(int mode, int status)
156{ 157{
157 switch (sb_status) { 158 switch (status) {
158 case -DLM_EUNLOCK: 159 case -DLM_EUNLOCK:
159 return 1; 160 return 1;
160 case -DLM_ECANCEL: 161 case -DLM_ECANCEL:
161 case -ETIMEDOUT: 162 case -ETIMEDOUT:
162 case -EDEADLK: 163 case -EDEADLK:
163 if (lkb->lkb_grmode == DLM_LOCK_IV)
164 return 1;
165 break;
166 case -EAGAIN: 164 case -EAGAIN:
167 if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV) 165 if (mode == DLM_LOCK_IV)
168 return 1; 166 return 1;
169 break; 167 break;
170 } 168 }
@@ -174,12 +172,13 @@ static int lkb_is_endoflife(struct dlm_lkb *lkb, int sb_status, int type)
174/* we could possibly check if the cancel of an orphan has resulted in the lkb 172/* we could possibly check if the cancel of an orphan has resulted in the lkb
175 being removed and then remove that lkb from the orphans list and free it */ 173 being removed and then remove that lkb from the orphans list and free it */
176 174
177void dlm_user_add_ast(struct dlm_lkb *lkb, int type, int mode) 175void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
176 int status, uint32_t sbflags, uint64_t seq)
178{ 177{
179 struct dlm_ls *ls; 178 struct dlm_ls *ls;
180 struct dlm_user_args *ua; 179 struct dlm_user_args *ua;
181 struct dlm_user_proc *proc; 180 struct dlm_user_proc *proc;
182 int eol = 0, ast_type; 181 int rv;
183 182
184 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) 183 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
185 return; 184 return;
@@ -200,49 +199,29 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type, int mode)
200 ua = lkb->lkb_ua; 199 ua = lkb->lkb_ua;
201 proc = ua->proc; 200 proc = ua->proc;
202 201
203 if (type == AST_BAST && ua->bastaddr == NULL) 202 if ((flags & DLM_CB_BAST) && ua->bastaddr == NULL)
204 goto out; 203 goto out;
205 204
205 if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
206 lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
207
206 spin_lock(&proc->asts_spin); 208 spin_lock(&proc->asts_spin);
207 209
208 ast_type = lkb->lkb_ast_type; 210 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
209 lkb->lkb_ast_type |= type; 211 if (rv < 0) {
210 if (type == AST_BAST) 212 spin_unlock(&proc->asts_spin);
211 lkb->lkb_bastmode = mode; 213 goto out;
212 else 214 }
213 lkb->lkb_castmode = mode;
214 215
215 if (!ast_type) { 216 if (list_empty(&lkb->lkb_astqueue)) {
216 kref_get(&lkb->lkb_ref); 217 kref_get(&lkb->lkb_ref);
217 list_add_tail(&lkb->lkb_astqueue, &proc->asts); 218 list_add_tail(&lkb->lkb_astqueue, &proc->asts);
218 lkb->lkb_ast_first = type;
219 wake_up_interruptible(&proc->wait); 219 wake_up_interruptible(&proc->wait);
220 } 220 }
221 if (type == AST_COMP && (ast_type & AST_COMP))
222 log_debug(ls, "ast overlap %x status %x %x",
223 lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags);
224
225 eol = lkb_is_endoflife(lkb, ua->lksb.sb_status, type);
226 if (eol) {
227 lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
228 }
229
230 /* We want to copy the lvb to userspace when the completion
231 ast is read if the status is 0, the lock has an lvb and
232 lvb_ops says we should. We could probably have set_lvb_lock()
233 set update_user_lvb instead and not need old_mode */
234
235 if ((lkb->lkb_ast_type & AST_COMP) &&
236 (lkb->lkb_lksb->sb_status == 0) &&
237 lkb->lkb_lksb->sb_lvbptr &&
238 dlm_lvb_operations[ua->old_mode + 1][lkb->lkb_grmode + 1])
239 ua->update_user_lvb = 1;
240 else
241 ua->update_user_lvb = 0;
242
243 spin_unlock(&proc->asts_spin); 221 spin_unlock(&proc->asts_spin);
244 222
245 if (eol) { 223 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
224 /* N.B. spin_lock locks_spin, not asts_spin */
246 spin_lock(&proc->locks_spin); 225 spin_lock(&proc->locks_spin);
247 if (!list_empty(&lkb->lkb_ownqueue)) { 226 if (!list_empty(&lkb->lkb_ownqueue)) {
248 list_del_init(&lkb->lkb_ownqueue); 227 list_del_init(&lkb->lkb_ownqueue);
@@ -632,7 +611,6 @@ static ssize_t device_write(struct file *file, const char __user *buf,
632 611
633 out_sig: 612 out_sig:
634 sigprocmask(SIG_SETMASK, &tmpsig, NULL); 613 sigprocmask(SIG_SETMASK, &tmpsig, NULL);
635 recalc_sigpending();
636 out_free: 614 out_free:
637 kfree(kbuf); 615 kfree(kbuf);
638 return error; 616 return error;
@@ -705,8 +683,9 @@ static int device_close(struct inode *inode, struct file *file)
705 return 0; 683 return 0;
706} 684}
707 685
708static int copy_result_to_user(struct dlm_user_args *ua, int compat, int type, 686static int copy_result_to_user(struct dlm_user_args *ua, int compat,
709 int mode, char __user *buf, size_t count) 687 uint32_t flags, int mode, int copy_lvb,
688 char __user *buf, size_t count)
710{ 689{
711#ifdef CONFIG_COMPAT 690#ifdef CONFIG_COMPAT
712 struct dlm_lock_result32 result32; 691 struct dlm_lock_result32 result32;
@@ -730,7 +709,7 @@ static int copy_result_to_user(struct dlm_user_args *ua, int compat, int type,
730 notes that a new blocking AST address and parameter are set even if 709 notes that a new blocking AST address and parameter are set even if
731 the conversion fails, so maybe we should just do that. */ 710 the conversion fails, so maybe we should just do that. */
732 711
733 if (type == AST_BAST) { 712 if (flags & DLM_CB_BAST) {
734 result.user_astaddr = ua->bastaddr; 713 result.user_astaddr = ua->bastaddr;
735 result.user_astparam = ua->bastparam; 714 result.user_astparam = ua->bastparam;
736 result.bast_mode = mode; 715 result.bast_mode = mode;
@@ -750,8 +729,7 @@ static int copy_result_to_user(struct dlm_user_args *ua, int compat, int type,
750 /* copy lvb to userspace if there is one, it's been updated, and 729 /* copy lvb to userspace if there is one, it's been updated, and
751 the user buffer has space for it */ 730 the user buffer has space for it */
752 731
753 if (ua->update_user_lvb && ua->lksb.sb_lvbptr && 732 if (copy_lvb && ua->lksb.sb_lvbptr && count >= len + DLM_USER_LVB_LEN) {
754 count >= len + DLM_USER_LVB_LEN) {
755 if (copy_to_user(buf+len, ua->lksb.sb_lvbptr, 733 if (copy_to_user(buf+len, ua->lksb.sb_lvbptr,
756 DLM_USER_LVB_LEN)) { 734 DLM_USER_LVB_LEN)) {
757 error = -EFAULT; 735 error = -EFAULT;
@@ -801,13 +779,12 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
801 struct dlm_user_proc *proc = file->private_data; 779 struct dlm_user_proc *proc = file->private_data;
802 struct dlm_lkb *lkb; 780 struct dlm_lkb *lkb;
803 DECLARE_WAITQUEUE(wait, current); 781 DECLARE_WAITQUEUE(wait, current);
804 int error = 0, removed; 782 struct dlm_callback cb;
805 int ret_type, ret_mode; 783 int rv, resid, copy_lvb = 0;
806 int bastmode, castmode, do_bast, do_cast;
807 784
808 if (count == sizeof(struct dlm_device_version)) { 785 if (count == sizeof(struct dlm_device_version)) {
809 error = copy_version_to_user(buf, count); 786 rv = copy_version_to_user(buf, count);
810 return error; 787 return rv;
811 } 788 }
812 789
813 if (!proc) { 790 if (!proc) {
@@ -854,92 +831,57 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
854 } 831 }
855 } 832 }
856 833
857 /* there may be both completion and blocking asts to return for 834 /* if we empty lkb_callbacks, we don't want to unlock the spinlock
858 the lkb, don't remove lkb from asts list unless no asts remain */ 835 without removing lkb_astqueue; so empty lkb_astqueue is always
836 consistent with empty lkb_callbacks */
859 837
860 lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue); 838 lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue);
861 839
862 removed = 0; 840 rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
863 ret_type = 0; 841 if (rv < 0) {
864 ret_mode = 0; 842 /* this shouldn't happen; lkb should have been removed from
865 do_bast = lkb->lkb_ast_type & AST_BAST; 843 list when resid was zero */
866 do_cast = lkb->lkb_ast_type & AST_COMP; 844 log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
867 bastmode = lkb->lkb_bastmode; 845 list_del_init(&lkb->lkb_astqueue);
868 castmode = lkb->lkb_castmode; 846 spin_unlock(&proc->asts_spin);
869 847 /* removes ref for proc->asts, may cause lkb to be freed */
870 /* when both are queued figure out which to do first and 848 dlm_put_lkb(lkb);
871 switch first so the other goes in the next read */ 849 goto try_another;
872
873 if (do_cast && do_bast) {
874 if (lkb->lkb_ast_first == AST_COMP) {
875 ret_type = AST_COMP;
876 ret_mode = castmode;
877 lkb->lkb_ast_type &= ~AST_COMP;
878 lkb->lkb_ast_first = AST_BAST;
879 } else {
880 ret_type = AST_BAST;
881 ret_mode = bastmode;
882 lkb->lkb_ast_type &= ~AST_BAST;
883 lkb->lkb_ast_first = AST_COMP;
884 }
885 } else {
886 ret_type = lkb->lkb_ast_first;
887 ret_mode = (ret_type == AST_COMP) ? castmode : bastmode;
888 lkb->lkb_ast_type &= ~ret_type;
889 lkb->lkb_ast_first = 0;
890 } 850 }
851 if (!resid)
852 list_del_init(&lkb->lkb_astqueue);
853 spin_unlock(&proc->asts_spin);
891 854
892 /* if we're doing a bast but the bast is unnecessary, then 855 if (cb.flags & DLM_CB_SKIP) {
893 switch to do nothing or do a cast if that was needed next */ 856 /* removes ref for proc->asts, may cause lkb to be freed */
894 857 if (!resid)
895 if ((ret_type == AST_BAST) && 858 dlm_put_lkb(lkb);
896 dlm_modes_compat(bastmode, lkb->lkb_castmode_done)) { 859 goto try_another;
897 ret_type = 0;
898 ret_mode = 0;
899
900 if (do_cast) {
901 ret_type = AST_COMP;
902 ret_mode = castmode;
903 lkb->lkb_ast_type &= ~AST_COMP;
904 lkb->lkb_ast_first = 0;
905 }
906 } 860 }
907 861
908 if (lkb->lkb_ast_first != lkb->lkb_ast_type) { 862 if (cb.flags & DLM_CB_CAST) {
909 log_print("device_read %x ast_first %x ast_type %x", 863 int old_mode, new_mode;
910 lkb->lkb_id, lkb->lkb_ast_first, lkb->lkb_ast_type);
911 }
912 864
913 if (!lkb->lkb_ast_type) { 865 old_mode = lkb->lkb_last_cast.mode;
914 list_del(&lkb->lkb_astqueue); 866 new_mode = cb.mode;
915 removed = 1;
916 }
917 spin_unlock(&proc->asts_spin);
918 867
919 if (ret_type) { 868 if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr &&
920 error = copy_result_to_user(lkb->lkb_ua, 869 dlm_lvb_operations[old_mode + 1][new_mode + 1])
921 test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags), 870 copy_lvb = 1;
922 ret_type, ret_mode, buf, count);
923 871
924 if (ret_type == AST_COMP) 872 lkb->lkb_lksb->sb_status = cb.sb_status;
925 lkb->lkb_castmode_done = castmode; 873 lkb->lkb_lksb->sb_flags = cb.sb_flags;
926 if (ret_type == AST_BAST)
927 lkb->lkb_bastmode_done = bastmode;
928 } 874 }
929 875
930 /* removes reference for the proc->asts lists added by 876 rv = copy_result_to_user(lkb->lkb_ua,
931 dlm_user_add_ast() and may result in the lkb being freed */ 877 test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
878 cb.flags, cb.mode, copy_lvb, buf, count);
932 879
933 if (removed) 880 /* removes ref for proc->asts, may cause lkb to be freed */
881 if (!resid)
934 dlm_put_lkb(lkb); 882 dlm_put_lkb(lkb);
935 883
936 /* the bast that was queued was eliminated (see unnecessary above), 884 return rv;
937 leaving nothing to return */
938
939 if (!ret_type)
940 goto try_another;
941
942 return error;
943} 885}
944 886
945static unsigned int device_poll(struct file *file, poll_table *wait) 887static unsigned int device_poll(struct file *file, poll_table *wait)
@@ -1009,6 +951,7 @@ static const struct file_operations device_fops = {
1009 .write = device_write, 951 .write = device_write,
1010 .poll = device_poll, 952 .poll = device_poll,
1011 .owner = THIS_MODULE, 953 .owner = THIS_MODULE,
954 .llseek = noop_llseek,
1012}; 955};
1013 956
1014static const struct file_operations ctl_device_fops = { 957static const struct file_operations ctl_device_fops = {
@@ -1017,6 +960,7 @@ static const struct file_operations ctl_device_fops = {
1017 .read = device_read, 960 .read = device_read,
1018 .write = device_write, 961 .write = device_write,
1019 .owner = THIS_MODULE, 962 .owner = THIS_MODULE,
963 .llseek = noop_llseek,
1020}; 964};
1021 965
1022static struct miscdevice ctl_device = { 966static struct miscdevice ctl_device = {
@@ -1029,6 +973,7 @@ static const struct file_operations monitor_device_fops = {
1029 .open = monitor_device_open, 973 .open = monitor_device_open,
1030 .release = monitor_device_close, 974 .release = monitor_device_close,
1031 .owner = THIS_MODULE, 975 .owner = THIS_MODULE,
976 .llseek = noop_llseek,
1032}; 977};
1033 978
1034static struct miscdevice monitor_device = { 979static struct miscdevice monitor_device = {
diff --git a/fs/dlm/user.h b/fs/dlm/user.h
index f196091dd7ff..00499ab8835f 100644
--- a/fs/dlm/user.h
+++ b/fs/dlm/user.h
@@ -9,7 +9,8 @@
9#ifndef __USER_DOT_H__ 9#ifndef __USER_DOT_H__
10#define __USER_DOT_H__ 10#define __USER_DOT_H__
11 11
12void dlm_user_add_ast(struct dlm_lkb *lkb, int type, int mode); 12void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
13 int status, uint32_t sbflags, uint64_t seq);
13int dlm_user_init(void); 14int dlm_user_init(void);
14void dlm_user_exit(void); 15void dlm_user_exit(void);
15int dlm_device_deregister(struct dlm_ls *ls); 16int dlm_device_deregister(struct dlm_ls *ls);