aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/ast.c265
-rw-r--r--fs/dlm/ast.h15
-rw-r--r--fs/dlm/config.c75
-rw-r--r--fs/dlm/config.h2
-rw-r--r--fs/dlm/dlm_internal.h29
-rw-r--r--fs/dlm/lock.c225
-rw-r--r--fs/dlm/lockspace.c177
-rw-r--r--fs/dlm/lowcomms.c9
-rw-r--r--fs/dlm/memory.c22
-rw-r--r--fs/dlm/memory.h2
-rw-r--r--fs/dlm/plock.c10
-rw-r--r--fs/dlm/recoverd.c12
-rw-r--r--fs/dlm/user.c12
13 files changed, 452 insertions, 403 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index abc49f29245..90e5997262e 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -14,17 +14,9 @@
14#include "dlm_internal.h" 14#include "dlm_internal.h"
15#include "lock.h" 15#include "lock.h"
16#include "user.h" 16#include "user.h"
17#include "ast.h"
18
19#define WAKE_ASTS 0
20
21static uint64_t ast_seq_count;
22static struct list_head ast_queue;
23static spinlock_t ast_queue_lock;
24static struct task_struct * astd_task;
25static unsigned long astd_wakeflags;
26static struct mutex astd_running;
27 17
18static uint64_t dlm_cb_seq;
19static spinlock_t dlm_cb_seq_spin;
28 20
29static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) 21static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
30{ 22{
@@ -57,21 +49,13 @@ static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
57 } 49 }
58} 50}
59 51
60void dlm_del_ast(struct dlm_lkb *lkb)
61{
62 spin_lock(&ast_queue_lock);
63 if (!list_empty(&lkb->lkb_astqueue))
64 list_del_init(&lkb->lkb_astqueue);
65 spin_unlock(&ast_queue_lock);
66}
67
68int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 52int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
69 int status, uint32_t sbflags, uint64_t seq) 53 int status, uint32_t sbflags, uint64_t seq)
70{ 54{
71 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 55 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
72 uint64_t prev_seq; 56 uint64_t prev_seq;
73 int prev_mode; 57 int prev_mode;
74 int i; 58 int i, rv;
75 59
76 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 60 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
77 if (lkb->lkb_callbacks[i].seq) 61 if (lkb->lkb_callbacks[i].seq)
@@ -100,7 +84,8 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
100 mode, 84 mode,
101 (unsigned long long)prev_seq, 85 (unsigned long long)prev_seq,
102 prev_mode); 86 prev_mode);
103 return 0; 87 rv = 0;
88 goto out;
104 } 89 }
105 } 90 }
106 91
@@ -109,6 +94,7 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
109 lkb->lkb_callbacks[i].mode = mode; 94 lkb->lkb_callbacks[i].mode = mode;
110 lkb->lkb_callbacks[i].sb_status = status; 95 lkb->lkb_callbacks[i].sb_status = status;
111 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF); 96 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
97 rv = 0;
112 break; 98 break;
113 } 99 }
114 100
@@ -117,21 +103,24 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
117 lkb->lkb_id, (unsigned long long)seq, 103 lkb->lkb_id, (unsigned long long)seq,
118 flags, mode, status, sbflags); 104 flags, mode, status, sbflags);
119 dlm_dump_lkb_callbacks(lkb); 105 dlm_dump_lkb_callbacks(lkb);
120 return -1; 106 rv = -1;
107 goto out;
121 } 108 }
122 109 out:
123 return 0; 110 return rv;
124} 111}
125 112
126int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 113int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
127 struct dlm_callback *cb, int *resid) 114 struct dlm_callback *cb, int *resid)
128{ 115{
129 int i; 116 int i, rv;
130 117
131 *resid = 0; 118 *resid = 0;
132 119
133 if (!lkb->lkb_callbacks[0].seq) 120 if (!lkb->lkb_callbacks[0].seq) {
134 return -ENOENT; 121 rv = -ENOENT;
122 goto out;
123 }
135 124
136 /* oldest undelivered cb is callbacks[0] */ 125 /* oldest undelivered cb is callbacks[0] */
137 126
@@ -163,7 +152,8 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
163 cb->mode, 152 cb->mode,
164 (unsigned long long)lkb->lkb_last_cast.seq, 153 (unsigned long long)lkb->lkb_last_cast.seq,
165 lkb->lkb_last_cast.mode); 154 lkb->lkb_last_cast.mode);
166 return 0; 155 rv = 0;
156 goto out;
167 } 157 }
168 } 158 }
169 159
@@ -176,171 +166,150 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
176 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback)); 166 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
177 lkb->lkb_last_bast_time = ktime_get(); 167 lkb->lkb_last_bast_time = ktime_get();
178 } 168 }
179 169 rv = 0;
180 return 0; 170 out:
171 return rv;
181} 172}
182 173
183void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 174void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
184 uint32_t sbflags) 175 uint32_t sbflags)
185{ 176{
186 uint64_t seq; 177 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
178 uint64_t new_seq, prev_seq;
187 int rv; 179 int rv;
188 180
189 spin_lock(&ast_queue_lock); 181 spin_lock(&dlm_cb_seq_spin);
190 182 new_seq = ++dlm_cb_seq;
191 seq = ++ast_seq_count; 183 spin_unlock(&dlm_cb_seq_spin);
192 184
193 if (lkb->lkb_flags & DLM_IFL_USER) { 185 if (lkb->lkb_flags & DLM_IFL_USER) {
194 spin_unlock(&ast_queue_lock); 186 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
195 dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
196 return; 187 return;
197 } 188 }
198 189
199 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq); 190 mutex_lock(&lkb->lkb_cb_mutex);
200 if (rv < 0) { 191 prev_seq = lkb->lkb_callbacks[0].seq;
201 spin_unlock(&ast_queue_lock);
202 return;
203 }
204 192
205 if (list_empty(&lkb->lkb_astqueue)) { 193 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
194 if (rv < 0)
195 goto out;
196
197 if (!prev_seq) {
206 kref_get(&lkb->lkb_ref); 198 kref_get(&lkb->lkb_ref);
207 list_add_tail(&lkb->lkb_astqueue, &ast_queue);
208 }
209 spin_unlock(&ast_queue_lock);
210 199
211 set_bit(WAKE_ASTS, &astd_wakeflags); 200 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
212 wake_up_process(astd_task); 201 mutex_lock(&ls->ls_cb_mutex);
202 list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
203 mutex_unlock(&ls->ls_cb_mutex);
204 } else {
205 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
206 }
207 }
208 out:
209 mutex_unlock(&lkb->lkb_cb_mutex);
213} 210}
214 211
215static void process_asts(void) 212void dlm_callback_work(struct work_struct *work)
216{ 213{
217 struct dlm_ls *ls = NULL; 214 struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
218 struct dlm_rsb *r = NULL; 215 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
219 struct dlm_lkb *lkb;
220 void (*castfn) (void *astparam); 216 void (*castfn) (void *astparam);
221 void (*bastfn) (void *astparam, int mode); 217 void (*bastfn) (void *astparam, int mode);
222 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE]; 218 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
223 int i, rv, resid; 219 int i, rv, resid;
224 220
225repeat: 221 memset(&callbacks, 0, sizeof(callbacks));
226 spin_lock(&ast_queue_lock);
227 list_for_each_entry(lkb, &ast_queue, lkb_astqueue) {
228 r = lkb->lkb_resource;
229 ls = r->res_ls;
230 222
231 if (dlm_locking_stopped(ls)) 223 mutex_lock(&lkb->lkb_cb_mutex);
232 continue; 224 if (!lkb->lkb_callbacks[0].seq) {
233 225 /* no callback work exists, shouldn't happen */
234 /* we remove from astqueue list and remove everything in 226 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
235 lkb_callbacks before releasing the spinlock so empty 227 dlm_print_lkb(lkb);
236 lkb_astqueue is always consistent with empty lkb_callbacks */ 228 dlm_dump_lkb_callbacks(lkb);
237 229 }
238 list_del_init(&lkb->lkb_astqueue);
239
240 castfn = lkb->lkb_astfn;
241 bastfn = lkb->lkb_bastfn;
242 230
243 memset(&callbacks, 0, sizeof(callbacks)); 231 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
232 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
233 if (rv < 0)
234 break;
235 }
244 236
245 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 237 if (resid) {
246 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); 238 /* cbs remain, loop should have removed all, shouldn't happen */
247 if (rv < 0) 239 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id,
248 break; 240 resid);
249 } 241 dlm_print_lkb(lkb);
250 spin_unlock(&ast_queue_lock); 242 dlm_dump_lkb_callbacks(lkb);
243 }
244 mutex_unlock(&lkb->lkb_cb_mutex);
251 245
252 if (resid) { 246 castfn = lkb->lkb_astfn;
253 /* shouldn't happen, for loop should have removed all */ 247 bastfn = lkb->lkb_bastfn;
254 log_error(ls, "callback resid %d lkb %x",
255 resid, lkb->lkb_id);
256 }
257 248
258 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 249 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
259 if (!callbacks[i].seq) 250 if (!callbacks[i].seq)
260 break; 251 break;
261 if (callbacks[i].flags & DLM_CB_SKIP) { 252 if (callbacks[i].flags & DLM_CB_SKIP) {
262 continue; 253 continue;
263 } else if (callbacks[i].flags & DLM_CB_BAST) { 254 } else if (callbacks[i].flags & DLM_CB_BAST) {
264 bastfn(lkb->lkb_astparam, callbacks[i].mode); 255 bastfn(lkb->lkb_astparam, callbacks[i].mode);
265 } else if (callbacks[i].flags & DLM_CB_CAST) { 256 } else if (callbacks[i].flags & DLM_CB_CAST) {
266 lkb->lkb_lksb->sb_status = callbacks[i].sb_status; 257 lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
267 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; 258 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
268 castfn(lkb->lkb_astparam); 259 castfn(lkb->lkb_astparam);
269 }
270 } 260 }
271
272 /* removes ref for ast_queue, may cause lkb to be freed */
273 dlm_put_lkb(lkb);
274
275 cond_resched();
276 goto repeat;
277 } 261 }
278 spin_unlock(&ast_queue_lock);
279}
280
281static inline int no_asts(void)
282{
283 int ret;
284 262
285 spin_lock(&ast_queue_lock); 263 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */
286 ret = list_empty(&ast_queue); 264 dlm_put_lkb(lkb);
287 spin_unlock(&ast_queue_lock);
288 return ret;
289} 265}
290 266
291static int dlm_astd(void *data) 267int dlm_callback_start(struct dlm_ls *ls)
292{ 268{
293 while (!kthread_should_stop()) { 269 ls->ls_callback_wq = alloc_workqueue("dlm_callback",
294 set_current_state(TASK_INTERRUPTIBLE); 270 WQ_UNBOUND |
295 if (!test_bit(WAKE_ASTS, &astd_wakeflags)) 271 WQ_MEM_RECLAIM |
296 schedule(); 272 WQ_NON_REENTRANT,
297 set_current_state(TASK_RUNNING); 273 0);
298 274 if (!ls->ls_callback_wq) {
299 mutex_lock(&astd_running); 275 log_print("can't start dlm_callback workqueue");
300 if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags)) 276 return -ENOMEM;
301 process_asts();
302 mutex_unlock(&astd_running);
303 } 277 }
304 return 0; 278 return 0;
305} 279}
306 280
307void dlm_astd_wake(void) 281void dlm_callback_stop(struct dlm_ls *ls)
308{ 282{
309 if (!no_asts()) { 283 if (ls->ls_callback_wq)
310 set_bit(WAKE_ASTS, &astd_wakeflags); 284 destroy_workqueue(ls->ls_callback_wq);
311 wake_up_process(astd_task);
312 }
313} 285}
314 286
315int dlm_astd_start(void) 287void dlm_callback_suspend(struct dlm_ls *ls)
316{ 288{
317 struct task_struct *p; 289 set_bit(LSFL_CB_DELAY, &ls->ls_flags);
318 int error = 0;
319
320 INIT_LIST_HEAD(&ast_queue);
321 spin_lock_init(&ast_queue_lock);
322 mutex_init(&astd_running);
323
324 p = kthread_run(dlm_astd, NULL, "dlm_astd");
325 if (IS_ERR(p))
326 error = PTR_ERR(p);
327 else
328 astd_task = p;
329 return error;
330}
331 290
332void dlm_astd_stop(void) 291 if (ls->ls_callback_wq)
333{ 292 flush_workqueue(ls->ls_callback_wq);
334 kthread_stop(astd_task);
335} 293}
336 294
337void dlm_astd_suspend(void) 295void dlm_callback_resume(struct dlm_ls *ls)
338{ 296{
339 mutex_lock(&astd_running); 297 struct dlm_lkb *lkb, *safe;
340} 298 int count = 0;
341 299
342void dlm_astd_resume(void) 300 clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
343{ 301
344 mutex_unlock(&astd_running); 302 if (!ls->ls_callback_wq)
303 return;
304
305 mutex_lock(&ls->ls_cb_mutex);
306 list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
307 list_del_init(&lkb->lkb_cb_list);
308 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
309 count++;
310 }
311 mutex_unlock(&ls->ls_cb_mutex);
312
313 log_debug(ls, "dlm_callback_resume %d", count);
345} 314}
346 315
diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h
index 8aa89c9b561..757b551c682 100644
--- a/fs/dlm/ast.h
+++ b/fs/dlm/ast.h
@@ -18,14 +18,15 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
18 int status, uint32_t sbflags, uint64_t seq); 18 int status, uint32_t sbflags, uint64_t seq);
19int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 19int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
20 struct dlm_callback *cb, int *resid); 20 struct dlm_callback *cb, int *resid);
21void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 21void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
22 uint32_t sbflags); 22 uint32_t sbflags);
23 23
24void dlm_astd_wake(void); 24void dlm_callback_work(struct work_struct *work);
25int dlm_astd_start(void); 25int dlm_callback_start(struct dlm_ls *ls);
26void dlm_astd_stop(void); 26void dlm_callback_stop(struct dlm_ls *ls);
27void dlm_astd_suspend(void); 27void dlm_callback_suspend(struct dlm_ls *ls);
28void dlm_astd_resume(void); 28void dlm_callback_resume(struct dlm_ls *ls);
29 29
30#endif 30#endif
31 31
32
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 9b026ea8baa..6cf72fcc0d0 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -28,7 +28,8 @@
28 * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight 28 * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight
29 * /config/dlm/<cluster>/comms/<comm>/nodeid 29 * /config/dlm/<cluster>/comms/<comm>/nodeid
30 * /config/dlm/<cluster>/comms/<comm>/local 30 * /config/dlm/<cluster>/comms/<comm>/local
31 * /config/dlm/<cluster>/comms/<comm>/addr 31 * /config/dlm/<cluster>/comms/<comm>/addr (write only)
32 * /config/dlm/<cluster>/comms/<comm>/addr_list (read only)
32 * The <cluster> level is useless, but I haven't figured out how to avoid it. 33 * The <cluster> level is useless, but I haven't figured out how to avoid it.
33 */ 34 */
34 35
@@ -80,6 +81,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf,
80 size_t len); 81 size_t len);
81static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, 82static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf,
82 size_t len); 83 size_t len);
84static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf);
83static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf); 85static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf);
84static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf, 86static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf,
85 size_t len); 87 size_t len);
@@ -92,7 +94,6 @@ struct dlm_cluster {
92 unsigned int cl_tcp_port; 94 unsigned int cl_tcp_port;
93 unsigned int cl_buffer_size; 95 unsigned int cl_buffer_size;
94 unsigned int cl_rsbtbl_size; 96 unsigned int cl_rsbtbl_size;
95 unsigned int cl_lkbtbl_size;
96 unsigned int cl_dirtbl_size; 97 unsigned int cl_dirtbl_size;
97 unsigned int cl_recover_timer; 98 unsigned int cl_recover_timer;
98 unsigned int cl_toss_secs; 99 unsigned int cl_toss_secs;
@@ -101,13 +102,13 @@ struct dlm_cluster {
101 unsigned int cl_protocol; 102 unsigned int cl_protocol;
102 unsigned int cl_timewarn_cs; 103 unsigned int cl_timewarn_cs;
103 unsigned int cl_waitwarn_us; 104 unsigned int cl_waitwarn_us;
105 unsigned int cl_new_rsb_count;
104}; 106};
105 107
106enum { 108enum {
107 CLUSTER_ATTR_TCP_PORT = 0, 109 CLUSTER_ATTR_TCP_PORT = 0,
108 CLUSTER_ATTR_BUFFER_SIZE, 110 CLUSTER_ATTR_BUFFER_SIZE,
109 CLUSTER_ATTR_RSBTBL_SIZE, 111 CLUSTER_ATTR_RSBTBL_SIZE,
110 CLUSTER_ATTR_LKBTBL_SIZE,
111 CLUSTER_ATTR_DIRTBL_SIZE, 112 CLUSTER_ATTR_DIRTBL_SIZE,
112 CLUSTER_ATTR_RECOVER_TIMER, 113 CLUSTER_ATTR_RECOVER_TIMER,
113 CLUSTER_ATTR_TOSS_SECS, 114 CLUSTER_ATTR_TOSS_SECS,
@@ -116,6 +117,7 @@ enum {
116 CLUSTER_ATTR_PROTOCOL, 117 CLUSTER_ATTR_PROTOCOL,
117 CLUSTER_ATTR_TIMEWARN_CS, 118 CLUSTER_ATTR_TIMEWARN_CS,
118 CLUSTER_ATTR_WAITWARN_US, 119 CLUSTER_ATTR_WAITWARN_US,
120 CLUSTER_ATTR_NEW_RSB_COUNT,
119}; 121};
120 122
121struct cluster_attribute { 123struct cluster_attribute {
@@ -160,7 +162,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
160CLUSTER_ATTR(tcp_port, 1); 162CLUSTER_ATTR(tcp_port, 1);
161CLUSTER_ATTR(buffer_size, 1); 163CLUSTER_ATTR(buffer_size, 1);
162CLUSTER_ATTR(rsbtbl_size, 1); 164CLUSTER_ATTR(rsbtbl_size, 1);
163CLUSTER_ATTR(lkbtbl_size, 1);
164CLUSTER_ATTR(dirtbl_size, 1); 165CLUSTER_ATTR(dirtbl_size, 1);
165CLUSTER_ATTR(recover_timer, 1); 166CLUSTER_ATTR(recover_timer, 1);
166CLUSTER_ATTR(toss_secs, 1); 167CLUSTER_ATTR(toss_secs, 1);
@@ -169,12 +170,12 @@ CLUSTER_ATTR(log_debug, 0);
169CLUSTER_ATTR(protocol, 0); 170CLUSTER_ATTR(protocol, 0);
170CLUSTER_ATTR(timewarn_cs, 1); 171CLUSTER_ATTR(timewarn_cs, 1);
171CLUSTER_ATTR(waitwarn_us, 0); 172CLUSTER_ATTR(waitwarn_us, 0);
173CLUSTER_ATTR(new_rsb_count, 0);
172 174
173static struct configfs_attribute *cluster_attrs[] = { 175static struct configfs_attribute *cluster_attrs[] = {
174 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 176 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
175 [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, 177 [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
176 [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, 178 [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
177 [CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr,
178 [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr, 179 [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
179 [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, 180 [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
180 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, 181 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
@@ -183,6 +184,7 @@ static struct configfs_attribute *cluster_attrs[] = {
183 [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, 184 [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
184 [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr, 185 [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
185 [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr, 186 [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
187 [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
186 NULL, 188 NULL,
187}; 189};
188 190
@@ -190,6 +192,7 @@ enum {
190 COMM_ATTR_NODEID = 0, 192 COMM_ATTR_NODEID = 0,
191 COMM_ATTR_LOCAL, 193 COMM_ATTR_LOCAL,
192 COMM_ATTR_ADDR, 194 COMM_ATTR_ADDR,
195 COMM_ATTR_ADDR_LIST,
193}; 196};
194 197
195struct comm_attribute { 198struct comm_attribute {
@@ -217,14 +220,22 @@ static struct comm_attribute comm_attr_local = {
217static struct comm_attribute comm_attr_addr = { 220static struct comm_attribute comm_attr_addr = {
218 .attr = { .ca_owner = THIS_MODULE, 221 .attr = { .ca_owner = THIS_MODULE,
219 .ca_name = "addr", 222 .ca_name = "addr",
220 .ca_mode = S_IRUGO | S_IWUSR }, 223 .ca_mode = S_IWUSR },
221 .store = comm_addr_write, 224 .store = comm_addr_write,
222}; 225};
223 226
227static struct comm_attribute comm_attr_addr_list = {
228 .attr = { .ca_owner = THIS_MODULE,
229 .ca_name = "addr_list",
230 .ca_mode = S_IRUGO },
231 .show = comm_addr_list_read,
232};
233
224static struct configfs_attribute *comm_attrs[] = { 234static struct configfs_attribute *comm_attrs[] = {
225 [COMM_ATTR_NODEID] = &comm_attr_nodeid.attr, 235 [COMM_ATTR_NODEID] = &comm_attr_nodeid.attr,
226 [COMM_ATTR_LOCAL] = &comm_attr_local.attr, 236 [COMM_ATTR_LOCAL] = &comm_attr_local.attr,
227 [COMM_ATTR_ADDR] = &comm_attr_addr.attr, 237 [COMM_ATTR_ADDR] = &comm_attr_addr.attr,
238 [COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list.attr,
228 NULL, 239 NULL,
229}; 240};
230 241
@@ -435,7 +446,6 @@ static struct config_group *make_cluster(struct config_group *g,
435 cl->cl_tcp_port = dlm_config.ci_tcp_port; 446 cl->cl_tcp_port = dlm_config.ci_tcp_port;
436 cl->cl_buffer_size = dlm_config.ci_buffer_size; 447 cl->cl_buffer_size = dlm_config.ci_buffer_size;
437 cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; 448 cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
438 cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size;
439 cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size; 449 cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
440 cl->cl_recover_timer = dlm_config.ci_recover_timer; 450 cl->cl_recover_timer = dlm_config.ci_recover_timer;
441 cl->cl_toss_secs = dlm_config.ci_toss_secs; 451 cl->cl_toss_secs = dlm_config.ci_toss_secs;
@@ -444,6 +454,7 @@ static struct config_group *make_cluster(struct config_group *g,
444 cl->cl_protocol = dlm_config.ci_protocol; 454 cl->cl_protocol = dlm_config.ci_protocol;
445 cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs; 455 cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
446 cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us; 456 cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
457 cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
447 458
448 space_list = &sps->ss_group; 459 space_list = &sps->ss_group;
449 comm_list = &cms->cs_group; 460 comm_list = &cms->cs_group;
@@ -720,6 +731,50 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
720 return len; 731 return len;
721} 732}
722 733
734static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf)
735{
736 ssize_t s;
737 ssize_t allowance;
738 int i;
739 struct sockaddr_storage *addr;
740 struct sockaddr_in *addr_in;
741 struct sockaddr_in6 *addr_in6;
742
743 /* Taken from ip6_addr_string() defined in lib/vsprintf.c */
744 char buf0[sizeof("AF_INET6 xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:255.255.255.255\n")];
745
746
747 /* Derived from SIMPLE_ATTR_SIZE of fs/configfs/file.c */
748 allowance = 4096;
749 buf[0] = '\0';
750
751 for (i = 0; i < cm->addr_count; i++) {
752 addr = cm->addr[i];
753
754 switch(addr->ss_family) {
755 case AF_INET:
756 addr_in = (struct sockaddr_in *)addr;
757 s = sprintf(buf0, "AF_INET %pI4\n", &addr_in->sin_addr.s_addr);
758 break;
759 case AF_INET6:
760 addr_in6 = (struct sockaddr_in6 *)addr;
761 s = sprintf(buf0, "AF_INET6 %pI6\n", &addr_in6->sin6_addr);
762 break;
763 default:
764 s = sprintf(buf0, "%s\n", "<UNKNOWN>");
765 break;
766 }
767 allowance -= s;
768 if (allowance >= 0)
769 strcat(buf, buf0);
770 else {
771 allowance += s;
772 break;
773 }
774 }
775 return 4096 - allowance;
776}
777
723static ssize_t show_node(struct config_item *i, struct configfs_attribute *a, 778static ssize_t show_node(struct config_item *i, struct configfs_attribute *a,
724 char *buf) 779 char *buf)
725{ 780{
@@ -983,7 +1038,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
983#define DEFAULT_TCP_PORT 21064 1038#define DEFAULT_TCP_PORT 21064
984#define DEFAULT_BUFFER_SIZE 4096 1039#define DEFAULT_BUFFER_SIZE 4096
985#define DEFAULT_RSBTBL_SIZE 1024 1040#define DEFAULT_RSBTBL_SIZE 1024
986#define DEFAULT_LKBTBL_SIZE 1024
987#define DEFAULT_DIRTBL_SIZE 1024 1041#define DEFAULT_DIRTBL_SIZE 1024
988#define DEFAULT_RECOVER_TIMER 5 1042#define DEFAULT_RECOVER_TIMER 5
989#define DEFAULT_TOSS_SECS 10 1043#define DEFAULT_TOSS_SECS 10
@@ -992,12 +1046,12 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
992#define DEFAULT_PROTOCOL 0 1046#define DEFAULT_PROTOCOL 0
993#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */ 1047#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
994#define DEFAULT_WAITWARN_US 0 1048#define DEFAULT_WAITWARN_US 0
1049#define DEFAULT_NEW_RSB_COUNT 128
995 1050
996struct dlm_config_info dlm_config = { 1051struct dlm_config_info dlm_config = {
997 .ci_tcp_port = DEFAULT_TCP_PORT, 1052 .ci_tcp_port = DEFAULT_TCP_PORT,
998 .ci_buffer_size = DEFAULT_BUFFER_SIZE, 1053 .ci_buffer_size = DEFAULT_BUFFER_SIZE,
999 .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, 1054 .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
1000 .ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE,
1001 .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE, 1055 .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
1002 .ci_recover_timer = DEFAULT_RECOVER_TIMER, 1056 .ci_recover_timer = DEFAULT_RECOVER_TIMER,
1003 .ci_toss_secs = DEFAULT_TOSS_SECS, 1057 .ci_toss_secs = DEFAULT_TOSS_SECS,
@@ -1005,6 +1059,7 @@ struct dlm_config_info dlm_config = {
1005 .ci_log_debug = DEFAULT_LOG_DEBUG, 1059 .ci_log_debug = DEFAULT_LOG_DEBUG,
1006 .ci_protocol = DEFAULT_PROTOCOL, 1060 .ci_protocol = DEFAULT_PROTOCOL,
1007 .ci_timewarn_cs = DEFAULT_TIMEWARN_CS, 1061 .ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
1008 .ci_waitwarn_us = DEFAULT_WAITWARN_US 1062 .ci_waitwarn_us = DEFAULT_WAITWARN_US,
1063 .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT
1009}; 1064};
1010 1065
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index dd0ce24d5a8..3099d0dd26c 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -20,7 +20,6 @@ struct dlm_config_info {
20 int ci_tcp_port; 20 int ci_tcp_port;
21 int ci_buffer_size; 21 int ci_buffer_size;
22 int ci_rsbtbl_size; 22 int ci_rsbtbl_size;
23 int ci_lkbtbl_size;
24 int ci_dirtbl_size; 23 int ci_dirtbl_size;
25 int ci_recover_timer; 24 int ci_recover_timer;
26 int ci_toss_secs; 25 int ci_toss_secs;
@@ -29,6 +28,7 @@ struct dlm_config_info {
29 int ci_protocol; 28 int ci_protocol;
30 int ci_timewarn_cs; 29 int ci_timewarn_cs;
31 int ci_waitwarn_us; 30 int ci_waitwarn_us;
31 int ci_new_rsb_count;
32}; 32};
33 33
34extern struct dlm_config_info dlm_config; 34extern struct dlm_config_info dlm_config;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 0262451eb9c..fe2860c0244 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -37,6 +37,7 @@
37#include <linux/jhash.h> 37#include <linux/jhash.h>
38#include <linux/miscdevice.h> 38#include <linux/miscdevice.h>
39#include <linux/mutex.h> 39#include <linux/mutex.h>
40#include <linux/idr.h>
40#include <asm/uaccess.h> 41#include <asm/uaccess.h>
41 42
42#include <linux/dlm.h> 43#include <linux/dlm.h>
@@ -52,7 +53,6 @@ struct dlm_ls;
52struct dlm_lkb; 53struct dlm_lkb;
53struct dlm_rsb; 54struct dlm_rsb;
54struct dlm_member; 55struct dlm_member;
55struct dlm_lkbtable;
56struct dlm_rsbtable; 56struct dlm_rsbtable;
57struct dlm_dirtable; 57struct dlm_dirtable;
58struct dlm_direntry; 58struct dlm_direntry;
@@ -108,11 +108,6 @@ struct dlm_rsbtable {
108 spinlock_t lock; 108 spinlock_t lock;
109}; 109};
110 110
111struct dlm_lkbtable {
112 struct list_head list;
113 rwlock_t lock;
114 uint16_t counter;
115};
116 111
117/* 112/*
118 * Lockspace member (per node in a ls) 113 * Lockspace member (per node in a ls)
@@ -248,17 +243,18 @@ struct dlm_lkb {
248 int8_t lkb_wait_count; 243 int8_t lkb_wait_count;
249 int lkb_wait_nodeid; /* for debugging */ 244 int lkb_wait_nodeid; /* for debugging */
250 245
251 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */
252 struct list_head lkb_statequeue; /* rsb g/c/w list */ 246 struct list_head lkb_statequeue; /* rsb g/c/w list */
253 struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */ 247 struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */
254 struct list_head lkb_wait_reply; /* waiting for remote reply */ 248 struct list_head lkb_wait_reply; /* waiting for remote reply */
255 struct list_head lkb_astqueue; /* need ast to be sent */
256 struct list_head lkb_ownqueue; /* list of locks for a process */ 249 struct list_head lkb_ownqueue; /* list of locks for a process */
257 struct list_head lkb_time_list; 250 struct list_head lkb_time_list;
258 ktime_t lkb_timestamp; 251 ktime_t lkb_timestamp;
259 ktime_t lkb_wait_time; 252 ktime_t lkb_wait_time;
260 unsigned long lkb_timeout_cs; 253 unsigned long lkb_timeout_cs;
261 254
255 struct mutex lkb_cb_mutex;
256 struct work_struct lkb_cb_work;
257 struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */
262 struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE]; 258 struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE];
263 struct dlm_callback lkb_last_cast; 259 struct dlm_callback lkb_last_cast;
264 struct dlm_callback lkb_last_bast; 260 struct dlm_callback lkb_last_bast;
@@ -299,7 +295,7 @@ struct dlm_rsb {
299 int res_recover_locks_count; 295 int res_recover_locks_count;
300 296
301 char *res_lvbptr; 297 char *res_lvbptr;
302 char res_name[1]; 298 char res_name[DLM_RESNAME_MAXLEN+1];
303}; 299};
304 300
305/* find_rsb() flags */ 301/* find_rsb() flags */
@@ -465,12 +461,12 @@ struct dlm_ls {
465 unsigned long ls_scan_time; 461 unsigned long ls_scan_time;
466 struct kobject ls_kobj; 462 struct kobject ls_kobj;
467 463
464 struct idr ls_lkbidr;
465 spinlock_t ls_lkbidr_spin;
466
468 struct dlm_rsbtable *ls_rsbtbl; 467 struct dlm_rsbtable *ls_rsbtbl;
469 uint32_t ls_rsbtbl_size; 468 uint32_t ls_rsbtbl_size;
470 469
471 struct dlm_lkbtable *ls_lkbtbl;
472 uint32_t ls_lkbtbl_size;
473
474 struct dlm_dirtable *ls_dirtbl; 470 struct dlm_dirtable *ls_dirtbl;
475 uint32_t ls_dirtbl_size; 471 uint32_t ls_dirtbl_size;
476 472
@@ -483,6 +479,10 @@ struct dlm_ls {
483 struct mutex ls_timeout_mutex; 479 struct mutex ls_timeout_mutex;
484 struct list_head ls_timeout; 480 struct list_head ls_timeout;
485 481
482 spinlock_t ls_new_rsb_spin;
483 int ls_new_rsb_count;
484 struct list_head ls_new_rsb; /* new rsb structs */
485
486 struct list_head ls_nodes; /* current nodes in ls */ 486 struct list_head ls_nodes; /* current nodes in ls */
487 struct list_head ls_nodes_gone; /* dead node list, recovery */ 487 struct list_head ls_nodes_gone; /* dead node list, recovery */
488 int ls_num_nodes; /* number of nodes in ls */ 488 int ls_num_nodes; /* number of nodes in ls */
@@ -506,8 +506,12 @@ struct dlm_ls {
506 506
507 struct miscdevice ls_device; 507 struct miscdevice ls_device;
508 508
509 struct workqueue_struct *ls_callback_wq;
510
509 /* recovery related */ 511 /* recovery related */
510 512
513 struct mutex ls_cb_mutex;
514 struct list_head ls_cb_delay; /* save for queue_work later */
511 struct timer_list ls_timer; 515 struct timer_list ls_timer;
512 struct task_struct *ls_recoverd_task; 516 struct task_struct *ls_recoverd_task;
513 struct mutex ls_recoverd_active; 517 struct mutex ls_recoverd_active;
@@ -544,6 +548,7 @@ struct dlm_ls {
544#define LSFL_RCOM_WAIT 4 548#define LSFL_RCOM_WAIT 4
545#define LSFL_UEVENT_WAIT 5 549#define LSFL_UEVENT_WAIT 5
546#define LSFL_TIMEWARN 6 550#define LSFL_TIMEWARN 6
551#define LSFL_CB_DELAY 7
547 552
548/* much of this is just saving user space pointers associated with the 553/* much of this is just saving user space pointers associated with the
549 lock that we pass back to the user lib with an ast */ 554 lock that we pass back to the user lib with an ast */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f71d0b5abd9..83b5e32514e 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -305,7 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
305 rv = -EDEADLK; 305 rv = -EDEADLK;
306 } 306 }
307 307
308 dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); 308 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309} 309}
310 310
311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -319,7 +319,7 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 if (is_master_copy(lkb)) { 319 if (is_master_copy(lkb)) {
320 send_bast(r, lkb, rqmode); 320 send_bast(r, lkb, rqmode);
321 } else { 321 } else {
322 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0); 322 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
323 } 323 }
324} 324}
325 325
@@ -327,19 +327,68 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
327 * Basic operations on rsb's and lkb's 327 * Basic operations on rsb's and lkb's
328 */ 328 */
329 329
330static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) 330static int pre_rsb_struct(struct dlm_ls *ls)
331{
332 struct dlm_rsb *r1, *r2;
333 int count = 0;
334
335 spin_lock(&ls->ls_new_rsb_spin);
336 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337 spin_unlock(&ls->ls_new_rsb_spin);
338 return 0;
339 }
340 spin_unlock(&ls->ls_new_rsb_spin);
341
342 r1 = dlm_allocate_rsb(ls);
343 r2 = dlm_allocate_rsb(ls);
344
345 spin_lock(&ls->ls_new_rsb_spin);
346 if (r1) {
347 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348 ls->ls_new_rsb_count++;
349 }
350 if (r2) {
351 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352 ls->ls_new_rsb_count++;
353 }
354 count = ls->ls_new_rsb_count;
355 spin_unlock(&ls->ls_new_rsb_spin);
356
357 if (!count)
358 return -ENOMEM;
359 return 0;
360}
361
362/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363 unlock any spinlocks, go back and call pre_rsb_struct again.
364 Otherwise, take an rsb off the list and return it. */
365
366static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367 struct dlm_rsb **r_ret)
331{ 368{
332 struct dlm_rsb *r; 369 struct dlm_rsb *r;
370 int count;
333 371
334 r = dlm_allocate_rsb(ls, len); 372 spin_lock(&ls->ls_new_rsb_spin);
335 if (!r) 373 if (list_empty(&ls->ls_new_rsb)) {
336 return NULL; 374 count = ls->ls_new_rsb_count;
375 spin_unlock(&ls->ls_new_rsb_spin);
376 log_debug(ls, "find_rsb retry %d %d %s",
377 count, dlm_config.ci_new_rsb_count, name);
378 return -EAGAIN;
379 }
380
381 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382 list_del(&r->res_hashchain);
383 ls->ls_new_rsb_count--;
384 spin_unlock(&ls->ls_new_rsb_spin);
337 385
338 r->res_ls = ls; 386 r->res_ls = ls;
339 r->res_length = len; 387 r->res_length = len;
340 memcpy(r->res_name, name, len); 388 memcpy(r->res_name, name, len);
341 mutex_init(&r->res_mutex); 389 mutex_init(&r->res_mutex);
342 390
391 INIT_LIST_HEAD(&r->res_hashchain);
343 INIT_LIST_HEAD(&r->res_lookup); 392 INIT_LIST_HEAD(&r->res_lookup);
344 INIT_LIST_HEAD(&r->res_grantqueue); 393 INIT_LIST_HEAD(&r->res_grantqueue);
345 INIT_LIST_HEAD(&r->res_convertqueue); 394 INIT_LIST_HEAD(&r->res_convertqueue);
@@ -347,7 +396,8 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
347 INIT_LIST_HEAD(&r->res_root_list); 396 INIT_LIST_HEAD(&r->res_root_list);
348 INIT_LIST_HEAD(&r->res_recover_list); 397 INIT_LIST_HEAD(&r->res_recover_list);
349 398
350 return r; 399 *r_ret = r;
400 return 0;
351} 401}
352 402
353static int search_rsb_list(struct list_head *head, char *name, int len, 403static int search_rsb_list(struct list_head *head, char *name, int len,
@@ -405,16 +455,6 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
405 return error; 455 return error;
406} 456}
407 457
408static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409 unsigned int flags, struct dlm_rsb **r_ret)
410{
411 int error;
412 spin_lock(&ls->ls_rsbtbl[b].lock);
413 error = _search_rsb(ls, name, len, b, flags, r_ret);
414 spin_unlock(&ls->ls_rsbtbl[b].lock);
415 return error;
416}
417
418/* 458/*
419 * Find rsb in rsbtbl and potentially create/add one 459 * Find rsb in rsbtbl and potentially create/add one
420 * 460 *
@@ -432,35 +472,48 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
432static int find_rsb(struct dlm_ls *ls, char *name, int namelen, 472static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433 unsigned int flags, struct dlm_rsb **r_ret) 473 unsigned int flags, struct dlm_rsb **r_ret)
434{ 474{
435 struct dlm_rsb *r = NULL, *tmp; 475 struct dlm_rsb *r = NULL;
436 uint32_t hash, bucket; 476 uint32_t hash, bucket;
437 int error = -EINVAL; 477 int error;
438 478
439 if (namelen > DLM_RESNAME_MAXLEN) 479 if (namelen > DLM_RESNAME_MAXLEN) {
480 error = -EINVAL;
440 goto out; 481 goto out;
482 }
441 483
442 if (dlm_no_directory(ls)) 484 if (dlm_no_directory(ls))
443 flags |= R_CREATE; 485 flags |= R_CREATE;
444 486
445 error = 0;
446 hash = jhash(name, namelen, 0); 487 hash = jhash(name, namelen, 0);
447 bucket = hash & (ls->ls_rsbtbl_size - 1); 488 bucket = hash & (ls->ls_rsbtbl_size - 1);
448 489
449 error = search_rsb(ls, name, namelen, bucket, flags, &r); 490 retry:
491 if (flags & R_CREATE) {
492 error = pre_rsb_struct(ls);
493 if (error < 0)
494 goto out;
495 }
496
497 spin_lock(&ls->ls_rsbtbl[bucket].lock);
498
499 error = _search_rsb(ls, name, namelen, bucket, flags, &r);
450 if (!error) 500 if (!error)
451 goto out; 501 goto out_unlock;
452 502
453 if (error == -EBADR && !(flags & R_CREATE)) 503 if (error == -EBADR && !(flags & R_CREATE))
454 goto out; 504 goto out_unlock;
455 505
456 /* the rsb was found but wasn't a master copy */ 506 /* the rsb was found but wasn't a master copy */
457 if (error == -ENOTBLK) 507 if (error == -ENOTBLK)
458 goto out; 508 goto out_unlock;
459 509
460 error = -ENOMEM; 510 error = get_rsb_struct(ls, name, namelen, &r);
461 r = create_rsb(ls, name, namelen); 511 if (error == -EAGAIN) {
462 if (!r) 512 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
463 goto out; 513 goto retry;
514 }
515 if (error)
516 goto out_unlock;
464 517
465 r->res_hash = hash; 518 r->res_hash = hash;
466 r->res_bucket = bucket; 519 r->res_bucket = bucket;
@@ -474,18 +527,10 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
474 nodeid = 0; 527 nodeid = 0;
475 r->res_nodeid = nodeid; 528 r->res_nodeid = nodeid;
476 } 529 }
477
478 spin_lock(&ls->ls_rsbtbl[bucket].lock);
479 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480 if (!error) {
481 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
482 dlm_free_rsb(r);
483 r = tmp;
484 goto out;
485 }
486 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); 530 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
487 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488 error = 0; 531 error = 0;
532 out_unlock:
533 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
489 out: 534 out:
490 *r_ret = r; 535 *r_ret = r;
491 return error; 536 return error;
@@ -580,9 +625,8 @@ static void detach_lkb(struct dlm_lkb *lkb)
580 625
581static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 626static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582{ 627{
583 struct dlm_lkb *lkb, *tmp; 628 struct dlm_lkb *lkb;
584 uint32_t lkid = 0; 629 int rv, id;
585 uint16_t bucket;
586 630
587 lkb = dlm_allocate_lkb(ls); 631 lkb = dlm_allocate_lkb(ls);
588 if (!lkb) 632 if (!lkb)
@@ -594,60 +638,42 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
594 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 638 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 639 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596 INIT_LIST_HEAD(&lkb->lkb_time_list); 640 INIT_LIST_HEAD(&lkb->lkb_time_list);
597 INIT_LIST_HEAD(&lkb->lkb_astqueue); 641 INIT_LIST_HEAD(&lkb->lkb_cb_list);
642 mutex_init(&lkb->lkb_cb_mutex);
643 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
598 644
599 get_random_bytes(&bucket, sizeof(bucket)); 645 retry:
600 bucket &= (ls->ls_lkbtbl_size - 1); 646 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
601 647 if (!rv)
602 write_lock(&ls->ls_lkbtbl[bucket].lock); 648 return -ENOMEM;
603 649
604 /* counter can roll over so we must verify lkid is not in use */ 650 spin_lock(&ls->ls_lkbidr_spin);
651 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
652 if (!rv)
653 lkb->lkb_id = id;
654 spin_unlock(&ls->ls_lkbidr_spin);
605 655
606 while (lkid == 0) { 656 if (rv == -EAGAIN)
607 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; 657 goto retry;
608 658
609 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, 659 if (rv < 0) {
610 lkb_idtbl_list) { 660 log_error(ls, "create_lkb idr error %d", rv);
611 if (tmp->lkb_id != lkid) 661 return rv;
612 continue;
613 lkid = 0;
614 break;
615 }
616 } 662 }
617 663
618 lkb->lkb_id = lkid;
619 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620 write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622 *lkb_ret = lkb; 664 *lkb_ret = lkb;
623 return 0; 665 return 0;
624} 666}
625 667
626static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627{
628 struct dlm_lkb *lkb;
629 uint16_t bucket = (lkid >> 16);
630
631 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632 if (lkb->lkb_id == lkid)
633 return lkb;
634 }
635 return NULL;
636}
637
638static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 668static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639{ 669{
640 struct dlm_lkb *lkb; 670 struct dlm_lkb *lkb;
641 uint16_t bucket = (lkid >> 16);
642
643 if (bucket >= ls->ls_lkbtbl_size)
644 return -EBADSLT;
645 671
646 read_lock(&ls->ls_lkbtbl[bucket].lock); 672 spin_lock(&ls->ls_lkbidr_spin);
647 lkb = __find_lkb(ls, lkid); 673 lkb = idr_find(&ls->ls_lkbidr, lkid);
648 if (lkb) 674 if (lkb)
649 kref_get(&lkb->lkb_ref); 675 kref_get(&lkb->lkb_ref);
650 read_unlock(&ls->ls_lkbtbl[bucket].lock); 676 spin_unlock(&ls->ls_lkbidr_spin);
651 677
652 *lkb_ret = lkb; 678 *lkb_ret = lkb;
653 return lkb ? 0 : -ENOENT; 679 return lkb ? 0 : -ENOENT;
@@ -668,12 +694,12 @@ static void kill_lkb(struct kref *kref)
668 694
669static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 695static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670{ 696{
671 uint16_t bucket = (lkb->lkb_id >> 16); 697 uint32_t lkid = lkb->lkb_id;
672 698
673 write_lock(&ls->ls_lkbtbl[bucket].lock); 699 spin_lock(&ls->ls_lkbidr_spin);
674 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 700 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675 list_del(&lkb->lkb_idtbl_list); 701 idr_remove(&ls->ls_lkbidr, lkid);
676 write_unlock(&ls->ls_lkbtbl[bucket].lock); 702 spin_unlock(&ls->ls_lkbidr_spin);
677 703
678 detach_lkb(lkb); 704 detach_lkb(lkb);
679 705
@@ -683,7 +709,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
683 dlm_free_lkb(lkb); 709 dlm_free_lkb(lkb);
684 return 1; 710 return 1;
685 } else { 711 } else {
686 write_unlock(&ls->ls_lkbtbl[bucket].lock); 712 spin_unlock(&ls->ls_lkbidr_spin);
687 return 0; 713 return 0;
688 } 714 }
689} 715}
@@ -849,9 +875,7 @@ void dlm_scan_waiters(struct dlm_ls *ls)
849 875
850 if (!num_nodes) { 876 if (!num_nodes) {
851 num_nodes = ls->ls_num_nodes; 877 num_nodes = ls->ls_num_nodes;
852 warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int)); 878 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
853 if (warned)
854 memset(warned, 0, num_nodes * sizeof(int));
855 } 879 }
856 if (!warned) 880 if (!warned)
857 continue; 881 continue;
@@ -863,9 +887,7 @@ void dlm_scan_waiters(struct dlm_ls *ls)
863 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid); 887 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864 } 888 }
865 mutex_unlock(&ls->ls_waiters_mutex); 889 mutex_unlock(&ls->ls_waiters_mutex);
866 890 kfree(warned);
867 if (warned)
868 kfree(warned);
869 891
870 if (debug_expired) 892 if (debug_expired)
871 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us", 893 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
@@ -2401,9 +2423,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2401 2423
2402 if (deadlk) { 2424 if (deadlk) {
2403 /* it's left on the granted queue */ 2425 /* it's left on the granted queue */
2404 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2405 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2406 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2407 revert_lock(r, lkb); 2426 revert_lock(r, lkb);
2408 queue_cast(r, lkb, -EDEADLK); 2427 queue_cast(r, lkb, -EDEADLK);
2409 error = -EDEADLK; 2428 error = -EDEADLK;
@@ -3993,8 +4012,6 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3993 default: 4012 default:
3994 log_error(ls, "unknown message type %d", ms->m_type); 4013 log_error(ls, "unknown message type %d", ms->m_type);
3995 } 4014 }
3996
3997 dlm_astd_wake();
3998} 4015}
3999 4016
4000/* If the lockspace is in recovery mode (locking stopped), then normal 4017/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4133,7 +4150,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
4133 struct dlm_message *ms_stub; 4150 struct dlm_message *ms_stub;
4134 int wait_type, stub_unlock_result, stub_cancel_result; 4151 int wait_type, stub_unlock_result, stub_cancel_result;
4135 4152
4136 ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message)); 4153 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4137 if (!ms_stub) { 4154 if (!ms_stub) {
4138 log_error(ls, "dlm_recover_waiters_pre no mem"); 4155 log_error(ls, "dlm_recover_waiters_pre no mem");
4139 return; 4156 return;
@@ -4809,7 +4826,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4809 goto out_put; 4826 goto out_put;
4810 4827
4811 spin_lock(&ua->proc->locks_spin); 4828 spin_lock(&ua->proc->locks_spin);
4812 /* dlm_user_add_ast() may have already taken lkb off the proc list */ 4829 /* dlm_user_add_cb() may have already taken lkb off the proc list */
4813 if (!list_empty(&lkb->lkb_ownqueue)) 4830 if (!list_empty(&lkb->lkb_ownqueue))
4814 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 4831 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4815 spin_unlock(&ua->proc->locks_spin); 4832 spin_unlock(&ua->proc->locks_spin);
@@ -4946,7 +4963,7 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4946 4963
4947/* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 4964/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4948 (which does lock_rsb) due to deadlock with receiving a message that does 4965 (which does lock_rsb) due to deadlock with receiving a message that does
4949 lock_rsb followed by dlm_user_add_ast() */ 4966 lock_rsb followed by dlm_user_add_cb() */
4950 4967
4951static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 4968static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4952 struct dlm_user_proc *proc) 4969 struct dlm_user_proc *proc)
@@ -4969,7 +4986,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4969 return lkb; 4986 return lkb;
4970} 4987}
4971 4988
4972/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4989/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
4973 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4990 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4974 which we clear here. */ 4991 which we clear here. */
4975 4992
@@ -5011,10 +5028,10 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5011 dlm_put_lkb(lkb); 5028 dlm_put_lkb(lkb);
5012 } 5029 }
5013 5030
5014 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 5031 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5015 memset(&lkb->lkb_callbacks, 0, 5032 memset(&lkb->lkb_callbacks, 0,
5016 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 5033 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5017 list_del_init(&lkb->lkb_astqueue); 5034 list_del_init(&lkb->lkb_cb_list);
5018 dlm_put_lkb(lkb); 5035 dlm_put_lkb(lkb);
5019 } 5036 }
5020 5037
@@ -5053,10 +5070,10 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5053 spin_unlock(&proc->locks_spin); 5070 spin_unlock(&proc->locks_spin);
5054 5071
5055 spin_lock(&proc->asts_spin); 5072 spin_lock(&proc->asts_spin);
5056 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 5073 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5057 memset(&lkb->lkb_callbacks, 0, 5074 memset(&lkb->lkb_callbacks, 0,
5058 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 5075 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5059 list_del_init(&lkb->lkb_astqueue); 5076 list_del_init(&lkb->lkb_cb_list);
5060 dlm_put_lkb(lkb); 5077 dlm_put_lkb(lkb);
5061 } 5078 }
5062 spin_unlock(&proc->asts_spin); 5079 spin_unlock(&proc->asts_spin);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 14cbf409975..a1d8f1af144 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -15,7 +15,6 @@
15#include "lockspace.h" 15#include "lockspace.h"
16#include "member.h" 16#include "member.h"
17#include "recoverd.h" 17#include "recoverd.h"
18#include "ast.h"
19#include "dir.h" 18#include "dir.h"
20#include "lowcomms.h" 19#include "lowcomms.h"
21#include "config.h" 20#include "config.h"
@@ -24,6 +23,7 @@
24#include "recover.h" 23#include "recover.h"
25#include "requestqueue.h" 24#include "requestqueue.h"
26#include "user.h" 25#include "user.h"
26#include "ast.h"
27 27
28static int ls_count; 28static int ls_count;
29static struct mutex ls_lock; 29static struct mutex ls_lock;
@@ -359,17 +359,10 @@ static int threads_start(void)
359{ 359{
360 int error; 360 int error;
361 361
362 /* Thread which process lock requests for all lockspace's */
363 error = dlm_astd_start();
364 if (error) {
365 log_print("cannot start dlm_astd thread %d", error);
366 goto fail;
367 }
368
369 error = dlm_scand_start(); 362 error = dlm_scand_start();
370 if (error) { 363 if (error) {
371 log_print("cannot start dlm_scand thread %d", error); 364 log_print("cannot start dlm_scand thread %d", error);
372 goto astd_fail; 365 goto fail;
373 } 366 }
374 367
375 /* Thread for sending/receiving messages for all lockspace's */ 368 /* Thread for sending/receiving messages for all lockspace's */
@@ -383,8 +376,6 @@ static int threads_start(void)
383 376
384 scand_fail: 377 scand_fail:
385 dlm_scand_stop(); 378 dlm_scand_stop();
386 astd_fail:
387 dlm_astd_stop();
388 fail: 379 fail:
389 return error; 380 return error;
390} 381}
@@ -393,7 +384,6 @@ static void threads_stop(void)
393{ 384{
394 dlm_scand_stop(); 385 dlm_scand_stop();
395 dlm_lowcomms_stop(); 386 dlm_lowcomms_stop();
396 dlm_astd_stop();
397} 387}
398 388
399static int new_lockspace(const char *name, int namelen, void **lockspace, 389static int new_lockspace(const char *name, int namelen, void **lockspace,
@@ -463,7 +453,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
463 size = dlm_config.ci_rsbtbl_size; 453 size = dlm_config.ci_rsbtbl_size;
464 ls->ls_rsbtbl_size = size; 454 ls->ls_rsbtbl_size = size;
465 455
466 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS); 456 ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
467 if (!ls->ls_rsbtbl) 457 if (!ls->ls_rsbtbl)
468 goto out_lsfree; 458 goto out_lsfree;
469 for (i = 0; i < size; i++) { 459 for (i = 0; i < size; i++) {
@@ -472,22 +462,13 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
472 spin_lock_init(&ls->ls_rsbtbl[i].lock); 462 spin_lock_init(&ls->ls_rsbtbl[i].lock);
473 } 463 }
474 464
475 size = dlm_config.ci_lkbtbl_size; 465 idr_init(&ls->ls_lkbidr);
476 ls->ls_lkbtbl_size = size; 466 spin_lock_init(&ls->ls_lkbidr_spin);
477
478 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
479 if (!ls->ls_lkbtbl)
480 goto out_rsbfree;
481 for (i = 0; i < size; i++) {
482 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
483 rwlock_init(&ls->ls_lkbtbl[i].lock);
484 ls->ls_lkbtbl[i].counter = 1;
485 }
486 467
487 size = dlm_config.ci_dirtbl_size; 468 size = dlm_config.ci_dirtbl_size;
488 ls->ls_dirtbl_size = size; 469 ls->ls_dirtbl_size = size;
489 470
490 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS); 471 ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
491 if (!ls->ls_dirtbl) 472 if (!ls->ls_dirtbl)
492 goto out_lkbfree; 473 goto out_lkbfree;
493 for (i = 0; i < size; i++) { 474 for (i = 0; i < size; i++) {
@@ -502,6 +483,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
502 INIT_LIST_HEAD(&ls->ls_timeout); 483 INIT_LIST_HEAD(&ls->ls_timeout);
503 mutex_init(&ls->ls_timeout_mutex); 484 mutex_init(&ls->ls_timeout_mutex);
504 485
486 INIT_LIST_HEAD(&ls->ls_new_rsb);
487 spin_lock_init(&ls->ls_new_rsb_spin);
488
505 INIT_LIST_HEAD(&ls->ls_nodes); 489 INIT_LIST_HEAD(&ls->ls_nodes);
506 INIT_LIST_HEAD(&ls->ls_nodes_gone); 490 INIT_LIST_HEAD(&ls->ls_nodes_gone);
507 ls->ls_num_nodes = 0; 491 ls->ls_num_nodes = 0;
@@ -520,6 +504,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
520 init_completion(&ls->ls_members_done); 504 init_completion(&ls->ls_members_done);
521 ls->ls_members_result = -1; 505 ls->ls_members_result = -1;
522 506
507 mutex_init(&ls->ls_cb_mutex);
508 INIT_LIST_HEAD(&ls->ls_cb_delay);
509
523 ls->ls_recoverd_task = NULL; 510 ls->ls_recoverd_task = NULL;
524 mutex_init(&ls->ls_recoverd_active); 511 mutex_init(&ls->ls_recoverd_active);
525 spin_lock_init(&ls->ls_recover_lock); 512 spin_lock_init(&ls->ls_recover_lock);
@@ -553,18 +540,26 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
553 list_add(&ls->ls_list, &lslist); 540 list_add(&ls->ls_list, &lslist);
554 spin_unlock(&lslist_lock); 541 spin_unlock(&lslist_lock);
555 542
543 if (flags & DLM_LSFL_FS) {
544 error = dlm_callback_start(ls);
545 if (error) {
546 log_error(ls, "can't start dlm_callback %d", error);
547 goto out_delist;
548 }
549 }
550
556 /* needs to find ls in lslist */ 551 /* needs to find ls in lslist */
557 error = dlm_recoverd_start(ls); 552 error = dlm_recoverd_start(ls);
558 if (error) { 553 if (error) {
559 log_error(ls, "can't start dlm_recoverd %d", error); 554 log_error(ls, "can't start dlm_recoverd %d", error);
560 goto out_delist; 555 goto out_callback;
561 } 556 }
562 557
563 ls->ls_kobj.kset = dlm_kset; 558 ls->ls_kobj.kset = dlm_kset;
564 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 559 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
565 "%s", ls->ls_name); 560 "%s", ls->ls_name);
566 if (error) 561 if (error)
567 goto out_stop; 562 goto out_recoverd;
568 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 563 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
569 564
570 /* let kobject handle freeing of ls if there's an error */ 565 /* let kobject handle freeing of ls if there's an error */
@@ -578,7 +573,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
578 573
579 error = do_uevent(ls, 1); 574 error = do_uevent(ls, 1);
580 if (error) 575 if (error)
581 goto out_stop; 576 goto out_recoverd;
582 577
583 wait_for_completion(&ls->ls_members_done); 578 wait_for_completion(&ls->ls_members_done);
584 error = ls->ls_members_result; 579 error = ls->ls_members_result;
@@ -595,19 +590,20 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
595 do_uevent(ls, 0); 590 do_uevent(ls, 0);
596 dlm_clear_members(ls); 591 dlm_clear_members(ls);
597 kfree(ls->ls_node_array); 592 kfree(ls->ls_node_array);
598 out_stop: 593 out_recoverd:
599 dlm_recoverd_stop(ls); 594 dlm_recoverd_stop(ls);
595 out_callback:
596 dlm_callback_stop(ls);
600 out_delist: 597 out_delist:
601 spin_lock(&lslist_lock); 598 spin_lock(&lslist_lock);
602 list_del(&ls->ls_list); 599 list_del(&ls->ls_list);
603 spin_unlock(&lslist_lock); 600 spin_unlock(&lslist_lock);
604 kfree(ls->ls_recover_buf); 601 kfree(ls->ls_recover_buf);
605 out_dirfree: 602 out_dirfree:
606 kfree(ls->ls_dirtbl); 603 vfree(ls->ls_dirtbl);
607 out_lkbfree: 604 out_lkbfree:
608 kfree(ls->ls_lkbtbl); 605 idr_destroy(&ls->ls_lkbidr);
609 out_rsbfree: 606 vfree(ls->ls_rsbtbl);
610 kfree(ls->ls_rsbtbl);
611 out_lsfree: 607 out_lsfree:
612 if (do_unreg) 608 if (do_unreg)
613 kobject_put(&ls->ls_kobj); 609 kobject_put(&ls->ls_kobj);
@@ -641,50 +637,64 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
641 return error; 637 return error;
642} 638}
643 639
644/* Return 1 if the lockspace still has active remote locks, 640static int lkb_idr_is_local(int id, void *p, void *data)
645 * 2 if the lockspace still has active local locks. 641{
646 */ 642 struct dlm_lkb *lkb = p;
647static int lockspace_busy(struct dlm_ls *ls) 643
648{ 644 if (!lkb->lkb_nodeid)
649 int i, lkb_found = 0; 645 return 1;
650 struct dlm_lkb *lkb; 646 return 0;
651 647}
652 /* NOTE: We check the lockidtbl here rather than the resource table. 648
653 This is because there may be LKBs queued as ASTs that have been 649static int lkb_idr_is_any(int id, void *p, void *data)
654 unlinked from their RSBs and are pending deletion once the AST has 650{
655 been delivered */ 651 return 1;
656 652}
657 for (i = 0; i < ls->ls_lkbtbl_size; i++) { 653
658 read_lock(&ls->ls_lkbtbl[i].lock); 654static int lkb_idr_free(int id, void *p, void *data)
659 if (!list_empty(&ls->ls_lkbtbl[i].list)) { 655{
660 lkb_found = 1; 656 struct dlm_lkb *lkb = p;
661 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list, 657
662 lkb_idtbl_list) { 658 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
663 if (!lkb->lkb_nodeid) { 659 dlm_free_lvb(lkb->lkb_lvbptr);
664 read_unlock(&ls->ls_lkbtbl[i].lock); 660
665 return 2; 661 dlm_free_lkb(lkb);
666 } 662 return 0;
667 } 663}
668 } 664
669 read_unlock(&ls->ls_lkbtbl[i].lock); 665/* NOTE: We check the lkbidr here rather than the resource table.
666 This is because there may be LKBs queued as ASTs that have been unlinked
667 from their RSBs and are pending deletion once the AST has been delivered */
668
669static int lockspace_busy(struct dlm_ls *ls, int force)
670{
671 int rv;
672
673 spin_lock(&ls->ls_lkbidr_spin);
674 if (force == 0) {
675 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
676 } else if (force == 1) {
677 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
678 } else {
679 rv = 0;
670 } 680 }
671 return lkb_found; 681 spin_unlock(&ls->ls_lkbidr_spin);
682 return rv;
672} 683}
673 684
674static int release_lockspace(struct dlm_ls *ls, int force) 685static int release_lockspace(struct dlm_ls *ls, int force)
675{ 686{
676 struct dlm_lkb *lkb;
677 struct dlm_rsb *rsb; 687 struct dlm_rsb *rsb;
678 struct list_head *head; 688 struct list_head *head;
679 int i, busy, rv; 689 int i, busy, rv;
680 690
681 busy = lockspace_busy(ls); 691 busy = lockspace_busy(ls, force);
682 692
683 spin_lock(&lslist_lock); 693 spin_lock(&lslist_lock);
684 if (ls->ls_create_count == 1) { 694 if (ls->ls_create_count == 1) {
685 if (busy > force) 695 if (busy) {
686 rv = -EBUSY; 696 rv = -EBUSY;
687 else { 697 } else {
688 /* remove_lockspace takes ls off lslist */ 698 /* remove_lockspace takes ls off lslist */
689 ls->ls_create_count = 0; 699 ls->ls_create_count = 0;
690 rv = 0; 700 rv = 0;
@@ -708,12 +718,12 @@ static int release_lockspace(struct dlm_ls *ls, int force)
708 718
709 dlm_recoverd_stop(ls); 719 dlm_recoverd_stop(ls);
710 720
721 dlm_callback_stop(ls);
722
711 remove_lockspace(ls); 723 remove_lockspace(ls);
712 724
713 dlm_delete_debug_file(ls); 725 dlm_delete_debug_file(ls);
714 726
715 dlm_astd_suspend();
716
717 kfree(ls->ls_recover_buf); 727 kfree(ls->ls_recover_buf);
718 728
719 /* 729 /*
@@ -721,31 +731,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
721 */ 731 */
722 732
723 dlm_dir_clear(ls); 733 dlm_dir_clear(ls);
724 kfree(ls->ls_dirtbl); 734 vfree(ls->ls_dirtbl);
725 735
726 /* 736 /*
727 * Free all lkb's on lkbtbl[] lists. 737 * Free all lkb's in idr
728 */ 738 */
729 739
730 for (i = 0; i < ls->ls_lkbtbl_size; i++) { 740 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
731 head = &ls->ls_lkbtbl[i].list; 741 idr_remove_all(&ls->ls_lkbidr);
732 while (!list_empty(head)) { 742 idr_destroy(&ls->ls_lkbidr);
733 lkb = list_entry(head->next, struct dlm_lkb,
734 lkb_idtbl_list);
735
736 list_del(&lkb->lkb_idtbl_list);
737
738 dlm_del_ast(lkb);
739
740 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
741 dlm_free_lvb(lkb->lkb_lvbptr);
742
743 dlm_free_lkb(lkb);
744 }
745 }
746 dlm_astd_resume();
747
748 kfree(ls->ls_lkbtbl);
749 743
750 /* 744 /*
751 * Free all rsb's on rsbtbl[] lists 745 * Free all rsb's on rsbtbl[] lists
@@ -770,7 +764,14 @@ static int release_lockspace(struct dlm_ls *ls, int force)
770 } 764 }
771 } 765 }
772 766
773 kfree(ls->ls_rsbtbl); 767 vfree(ls->ls_rsbtbl);
768
769 while (!list_empty(&ls->ls_new_rsb)) {
770 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
771 res_hashchain);
772 list_del(&rsb->res_hashchain);
773 dlm_free_rsb(rsb);
774 }
774 775
775 /* 776 /*
776 * Free structures on any other lists 777 * Free structures on any other lists
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 5e2c71f05e4..990626e7da8 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -512,12 +512,10 @@ static void process_sctp_notification(struct connection *con,
512 } 512 }
513 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 513 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
514 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 514 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
515 int i;
516 unsigned char *b=(unsigned char *)&prim.ssp_addr; 515 unsigned char *b=(unsigned char *)&prim.ssp_addr;
517 log_print("reject connect from unknown addr"); 516 log_print("reject connect from unknown addr");
518 for (i=0; i<sizeof(struct sockaddr_storage);i++) 517 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
519 printk("%02x ", b[i]); 518 b, sizeof(struct sockaddr_storage));
520 printk("\n");
521 sctp_send_shutdown(prim.ssp_assoc_id); 519 sctp_send_shutdown(prim.ssp_assoc_id);
522 return; 520 return;
523 } 521 }
@@ -748,7 +746,10 @@ static int tcp_accept_from_sock(struct connection *con)
748 /* Get the new node's NODEID */ 746 /* Get the new node's NODEID */
749 make_sockaddr(&peeraddr, 0, &len); 747 make_sockaddr(&peeraddr, 0, &len);
750 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 748 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
749 unsigned char *b=(unsigned char *)&peeraddr;
751 log_print("connect from non cluster node"); 750 log_print("connect from non cluster node");
751 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
752 b, sizeof(struct sockaddr_storage));
752 sock_release(newsock); 753 sock_release(newsock);
753 mutex_unlock(&con->sock_mutex); 754 mutex_unlock(&con->sock_mutex);
754 return -1; 755 return -1;
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index 8e0d00db004..da64df7576e 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -16,6 +16,7 @@
16#include "memory.h" 16#include "memory.h"
17 17
18static struct kmem_cache *lkb_cache; 18static struct kmem_cache *lkb_cache;
19static struct kmem_cache *rsb_cache;
19 20
20 21
21int __init dlm_memory_init(void) 22int __init dlm_memory_init(void)
@@ -26,6 +27,14 @@ int __init dlm_memory_init(void)
26 __alignof__(struct dlm_lkb), 0, NULL); 27 __alignof__(struct dlm_lkb), 0, NULL);
27 if (!lkb_cache) 28 if (!lkb_cache)
28 ret = -ENOMEM; 29 ret = -ENOMEM;
30
31 rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
32 __alignof__(struct dlm_rsb), 0, NULL);
33 if (!rsb_cache) {
34 kmem_cache_destroy(lkb_cache);
35 ret = -ENOMEM;
36 }
37
29 return ret; 38 return ret;
30} 39}
31 40
@@ -33,6 +42,8 @@ void dlm_memory_exit(void)
33{ 42{
34 if (lkb_cache) 43 if (lkb_cache)
35 kmem_cache_destroy(lkb_cache); 44 kmem_cache_destroy(lkb_cache);
45 if (rsb_cache)
46 kmem_cache_destroy(rsb_cache);
36} 47}
37 48
38char *dlm_allocate_lvb(struct dlm_ls *ls) 49char *dlm_allocate_lvb(struct dlm_ls *ls)
@@ -48,16 +59,11 @@ void dlm_free_lvb(char *p)
48 kfree(p); 59 kfree(p);
49} 60}
50 61
51/* FIXME: have some minimal space built-in to rsb for the name and 62struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
52 kmalloc a separate name if needed, like dentries are done */
53
54struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen)
55{ 63{
56 struct dlm_rsb *r; 64 struct dlm_rsb *r;
57 65
58 DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN,); 66 r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
59
60 r = kzalloc(sizeof(*r) + namelen, GFP_NOFS);
61 return r; 67 return r;
62} 68}
63 69
@@ -65,7 +71,7 @@ void dlm_free_rsb(struct dlm_rsb *r)
65{ 71{
66 if (r->res_lvbptr) 72 if (r->res_lvbptr)
67 dlm_free_lvb(r->res_lvbptr); 73 dlm_free_lvb(r->res_lvbptr);
68 kfree(r); 74 kmem_cache_free(rsb_cache, r);
69} 75}
70 76
71struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls) 77struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h
index 485fb29143b..177c11cbb0a 100644
--- a/fs/dlm/memory.h
+++ b/fs/dlm/memory.h
@@ -16,7 +16,7 @@
16 16
17int dlm_memory_init(void); 17int dlm_memory_init(void);
18void dlm_memory_exit(void); 18void dlm_memory_exit(void);
19struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen); 19struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls);
20void dlm_free_rsb(struct dlm_rsb *r); 20void dlm_free_rsb(struct dlm_rsb *r);
21struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls); 21struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls);
22void dlm_free_lkb(struct dlm_lkb *l); 22void dlm_free_lkb(struct dlm_lkb *l);
diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c
index e2b87800436..01fd5c11a7f 100644
--- a/fs/dlm/plock.c
+++ b/fs/dlm/plock.c
@@ -92,7 +92,7 @@ static void do_unlock_close(struct dlm_ls *ls, u64 number,
92 op->info.number = number; 92 op->info.number = number;
93 op->info.start = 0; 93 op->info.start = 0;
94 op->info.end = OFFSET_MAX; 94 op->info.end = OFFSET_MAX;
95 if (fl->fl_lmops && fl->fl_lmops->fl_grant) 95 if (fl->fl_lmops && fl->fl_lmops->lm_grant)
96 op->info.owner = (__u64) fl->fl_pid; 96 op->info.owner = (__u64) fl->fl_pid;
97 else 97 else
98 op->info.owner = (__u64)(long) fl->fl_owner; 98 op->info.owner = (__u64)(long) fl->fl_owner;
@@ -128,11 +128,11 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
128 op->info.number = number; 128 op->info.number = number;
129 op->info.start = fl->fl_start; 129 op->info.start = fl->fl_start;
130 op->info.end = fl->fl_end; 130 op->info.end = fl->fl_end;
131 if (fl->fl_lmops && fl->fl_lmops->fl_grant) { 131 if (fl->fl_lmops && fl->fl_lmops->lm_grant) {
132 /* fl_owner is lockd which doesn't distinguish 132 /* fl_owner is lockd which doesn't distinguish
133 processes on the nfs client */ 133 processes on the nfs client */
134 op->info.owner = (__u64) fl->fl_pid; 134 op->info.owner = (__u64) fl->fl_pid;
135 xop->callback = fl->fl_lmops->fl_grant; 135 xop->callback = fl->fl_lmops->lm_grant;
136 locks_init_lock(&xop->flc); 136 locks_init_lock(&xop->flc);
137 locks_copy_lock(&xop->flc, fl); 137 locks_copy_lock(&xop->flc, fl);
138 xop->fl = fl; 138 xop->fl = fl;
@@ -268,7 +268,7 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
268 op->info.number = number; 268 op->info.number = number;
269 op->info.start = fl->fl_start; 269 op->info.start = fl->fl_start;
270 op->info.end = fl->fl_end; 270 op->info.end = fl->fl_end;
271 if (fl->fl_lmops && fl->fl_lmops->fl_grant) 271 if (fl->fl_lmops && fl->fl_lmops->lm_grant)
272 op->info.owner = (__u64) fl->fl_pid; 272 op->info.owner = (__u64) fl->fl_pid;
273 else 273 else
274 op->info.owner = (__u64)(long) fl->fl_owner; 274 op->info.owner = (__u64)(long) fl->fl_owner;
@@ -327,7 +327,7 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file,
327 op->info.number = number; 327 op->info.number = number;
328 op->info.start = fl->fl_start; 328 op->info.start = fl->fl_start;
329 op->info.end = fl->fl_end; 329 op->info.end = fl->fl_end;
330 if (fl->fl_lmops && fl->fl_lmops->fl_grant) 330 if (fl->fl_lmops && fl->fl_lmops->lm_grant)
331 op->info.owner = (__u64) fl->fl_pid; 331 op->info.owner = (__u64) fl->fl_pid;
332 else 332 else
333 op->info.owner = (__u64)(long) fl->fl_owner; 333 op->info.owner = (__u64)(long) fl->fl_owner;
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index fd677c8c3d3..774da3cf92c 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -58,13 +58,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
58 58
59 mutex_lock(&ls->ls_recoverd_active); 59 mutex_lock(&ls->ls_recoverd_active);
60 60
61 /* 61 dlm_callback_suspend(ls);
62 * Suspending and resuming dlm_astd ensures that no lkb's from this ls
63 * will be processed by dlm_astd during recovery.
64 */
65
66 dlm_astd_suspend();
67 dlm_astd_resume();
68 62
69 /* 63 /*
70 * Free non-master tossed rsb's. Master rsb's are kept on toss 64 * Free non-master tossed rsb's. Master rsb's are kept on toss
@@ -202,6 +196,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
202 196
203 dlm_adjust_timeouts(ls); 197 dlm_adjust_timeouts(ls);
204 198
199 dlm_callback_resume(ls);
200
205 error = enable_locking(ls, rv->seq); 201 error = enable_locking(ls, rv->seq);
206 if (error) { 202 if (error) {
207 log_debug(ls, "enable_locking failed %d", error); 203 log_debug(ls, "enable_locking failed %d", error);
@@ -222,8 +218,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
222 218
223 dlm_grant_after_purge(ls); 219 dlm_grant_after_purge(ls);
224 220
225 dlm_astd_wake();
226
227 log_debug(ls, "recover %llx done: %u ms", 221 log_debug(ls, "recover %llx done: %u ms",
228 (unsigned long long)rv->seq, 222 (unsigned long long)rv->seq,
229 jiffies_to_msecs(jiffies - start)); 223 jiffies_to_msecs(jiffies - start));
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index e96bf3e9be8..d8ea6075640 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -213,9 +213,9 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
213 goto out; 213 goto out;
214 } 214 }
215 215
216 if (list_empty(&lkb->lkb_astqueue)) { 216 if (list_empty(&lkb->lkb_cb_list)) {
217 kref_get(&lkb->lkb_ref); 217 kref_get(&lkb->lkb_ref);
218 list_add_tail(&lkb->lkb_astqueue, &proc->asts); 218 list_add_tail(&lkb->lkb_cb_list, &proc->asts);
219 wake_up_interruptible(&proc->wait); 219 wake_up_interruptible(&proc->wait);
220 } 220 }
221 spin_unlock(&proc->asts_spin); 221 spin_unlock(&proc->asts_spin);
@@ -832,24 +832,24 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
832 } 832 }
833 833
834 /* if we empty lkb_callbacks, we don't want to unlock the spinlock 834 /* if we empty lkb_callbacks, we don't want to unlock the spinlock
835 without removing lkb_astqueue; so empty lkb_astqueue is always 835 without removing lkb_cb_list; so empty lkb_cb_list is always
836 consistent with empty lkb_callbacks */ 836 consistent with empty lkb_callbacks */
837 837
838 lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue); 838 lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_cb_list);
839 839
840 rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid); 840 rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
841 if (rv < 0) { 841 if (rv < 0) {
842 /* this shouldn't happen; lkb should have been removed from 842 /* this shouldn't happen; lkb should have been removed from
843 list when resid was zero */ 843 list when resid was zero */
844 log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id); 844 log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
845 list_del_init(&lkb->lkb_astqueue); 845 list_del_init(&lkb->lkb_cb_list);
846 spin_unlock(&proc->asts_spin); 846 spin_unlock(&proc->asts_spin);
847 /* removes ref for proc->asts, may cause lkb to be freed */ 847 /* removes ref for proc->asts, may cause lkb to be freed */
848 dlm_put_lkb(lkb); 848 dlm_put_lkb(lkb);
849 goto try_another; 849 goto try_another;
850 } 850 }
851 if (!resid) 851 if (!resid)
852 list_del_init(&lkb->lkb_astqueue); 852 list_del_init(&lkb->lkb_cb_list);
853 spin_unlock(&proc->asts_spin); 853 spin_unlock(&proc->asts_spin);
854 854
855 if (cb.flags & DLM_CB_SKIP) { 855 if (cb.flags & DLM_CB_SKIP) {