aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2')
-rw-r--r--fs/ocfs2/cluster/heartbeat.c158
-rw-r--r--fs/ocfs2/cluster/tcp.c35
-rw-r--r--fs/ocfs2/cluster/tcp.h6
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h12
-rw-r--r--fs/ocfs2/dlm/dlmast.c14
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h130
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c40
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c30
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c253
-rw-r--r--fs/ocfs2/dlm/dlmlock.c7
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c579
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c182
-rw-r--r--fs/ocfs2/dlm/dlmthread.c200
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c15
-rw-r--r--fs/ocfs2/vote.c8
15 files changed, 1205 insertions, 464 deletions
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 277ca67a2ad6..5a9779bb9236 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -184,10 +184,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
184 flush_scheduled_work(); 184 flush_scheduled_work();
185} 185}
186 186
187static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, 187static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
188 unsigned int num_ios)
189{ 188{
190 atomic_set(&wc->wc_num_reqs, num_ios); 189 atomic_set(&wc->wc_num_reqs, 1);
191 init_completion(&wc->wc_io_complete); 190 init_completion(&wc->wc_io_complete);
192 wc->wc_error = 0; 191 wc->wc_error = 0;
193} 192}
@@ -212,6 +211,7 @@ static void o2hb_wait_on_io(struct o2hb_region *reg,
212 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 211 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
213 212
214 blk_run_address_space(mapping); 213 blk_run_address_space(mapping);
214 o2hb_bio_wait_dec(wc, 1);
215 215
216 wait_for_completion(&wc->wc_io_complete); 216 wait_for_completion(&wc->wc_io_complete);
217} 217}
@@ -231,6 +231,7 @@ static int o2hb_bio_end_io(struct bio *bio,
231 return 1; 231 return 1;
232 232
233 o2hb_bio_wait_dec(wc, 1); 233 o2hb_bio_wait_dec(wc, 1);
234 bio_put(bio);
234 return 0; 235 return 0;
235} 236}
236 237
@@ -238,23 +239,22 @@ static int o2hb_bio_end_io(struct bio *bio,
238 * start_slot. */ 239 * start_slot. */
239static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 240static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
240 struct o2hb_bio_wait_ctxt *wc, 241 struct o2hb_bio_wait_ctxt *wc,
241 unsigned int start_slot, 242 unsigned int *current_slot,
242 unsigned int num_slots) 243 unsigned int max_slots)
243{ 244{
244 int i, nr_vecs, len, first_page, last_page; 245 int len, current_page;
245 unsigned int vec_len, vec_start; 246 unsigned int vec_len, vec_start;
246 unsigned int bits = reg->hr_block_bits; 247 unsigned int bits = reg->hr_block_bits;
247 unsigned int spp = reg->hr_slots_per_page; 248 unsigned int spp = reg->hr_slots_per_page;
249 unsigned int cs = *current_slot;
248 struct bio *bio; 250 struct bio *bio;
249 struct page *page; 251 struct page *page;
250 252
251 nr_vecs = (num_slots + spp - 1) / spp;
252
253 /* Testing has shown this allocation to take long enough under 253 /* Testing has shown this allocation to take long enough under
254 * GFP_KERNEL that the local node can get fenced. It would be 254 * GFP_KERNEL that the local node can get fenced. It would be
255 * nicest if we could pre-allocate these bios and avoid this 255 * nicest if we could pre-allocate these bios and avoid this
256 * all together. */ 256 * all together. */
257 bio = bio_alloc(GFP_ATOMIC, nr_vecs); 257 bio = bio_alloc(GFP_ATOMIC, 16);
258 if (!bio) { 258 if (!bio) {
259 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 259 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
260 bio = ERR_PTR(-ENOMEM); 260 bio = ERR_PTR(-ENOMEM);
@@ -262,137 +262,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
262 } 262 }
263 263
264 /* Must put everything in 512 byte sectors for the bio... */ 264 /* Must put everything in 512 byte sectors for the bio... */
265 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); 265 bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
266 bio->bi_bdev = reg->hr_bdev; 266 bio->bi_bdev = reg->hr_bdev;
267 bio->bi_private = wc; 267 bio->bi_private = wc;
268 bio->bi_end_io = o2hb_bio_end_io; 268 bio->bi_end_io = o2hb_bio_end_io;
269 269
270 first_page = start_slot / spp; 270 vec_start = (cs << bits) % PAGE_CACHE_SIZE;
271 last_page = first_page + nr_vecs; 271 while(cs < max_slots) {
272 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; 272 current_page = cs / spp;
273 for(i = first_page; i < last_page; i++) { 273 page = reg->hr_slot_data[current_page];
274 page = reg->hr_slot_data[i];
275 274
276 vec_len = PAGE_CACHE_SIZE; 275 vec_len = min(PAGE_CACHE_SIZE,
277 /* last page might be short */ 276 (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
278 if (((i + 1) * spp) > (start_slot + num_slots))
279 vec_len = ((num_slots + start_slot) % spp) << bits;
280 vec_len -= vec_start;
281 277
282 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 278 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
283 i, vec_len, vec_start); 279 current_page, vec_len, vec_start);
284 280
285 len = bio_add_page(bio, page, vec_len, vec_start); 281 len = bio_add_page(bio, page, vec_len, vec_start);
286 if (len != vec_len) { 282 if (len != vec_len) break;
287 bio_put(bio);
288 bio = ERR_PTR(-EIO);
289
290 mlog(ML_ERROR, "Error adding page to bio i = %d, "
291 "vec_len = %u, len = %d\n, start = %u\n",
292 i, vec_len, len, vec_start);
293 goto bail;
294 }
295 283
284 cs += vec_len / (PAGE_CACHE_SIZE/spp);
296 vec_start = 0; 285 vec_start = 0;
297 } 286 }
298 287
299bail: 288bail:
289 *current_slot = cs;
300 return bio; 290 return bio;
301} 291}
302 292
303/*
304 * Compute the maximum number of sectors the bdev can handle in one bio,
305 * as a power of two.
306 *
307 * Stolen from oracleasm, thanks Joel!
308 */
309static int compute_max_sectors(struct block_device *bdev)
310{
311 int max_pages, max_sectors, pow_two_sectors;
312
313 struct request_queue *q;
314
315 q = bdev_get_queue(bdev);
316 max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
317 if (max_pages > BIO_MAX_PAGES)
318 max_pages = BIO_MAX_PAGES;
319 if (max_pages > q->max_phys_segments)
320 max_pages = q->max_phys_segments;
321 if (max_pages > q->max_hw_segments)
322 max_pages = q->max_hw_segments;
323 max_pages--; /* Handle I/Os that straddle a page */
324
325 if (max_pages) {
326 max_sectors = max_pages << (PAGE_SHIFT - 9);
327 } else {
328 /* If BIO contains 1 or less than 1 page. */
329 max_sectors = q->max_sectors;
330 }
331 /* Why is fls() 1-based???? */
332 pow_two_sectors = 1 << (fls(max_sectors) - 1);
333
334 return pow_two_sectors;
335}
336
337static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
338 unsigned int num_slots,
339 unsigned int *num_bios,
340 unsigned int *slots_per_bio)
341{
342 unsigned int max_sectors, io_sectors;
343
344 max_sectors = compute_max_sectors(reg->hr_bdev);
345
346 io_sectors = num_slots << (reg->hr_block_bits - 9);
347
348 *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
349 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
350
351 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
352 "device can handle %u sectors of I/O\n", io_sectors, num_slots,
353 max_sectors);
354 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
355 *num_bios, *slots_per_bio);
356}
357
358static int o2hb_read_slots(struct o2hb_region *reg, 293static int o2hb_read_slots(struct o2hb_region *reg,
359 unsigned int max_slots) 294 unsigned int max_slots)
360{ 295{
361 unsigned int num_bios, slots_per_bio, start_slot, num_slots; 296 unsigned int current_slot=0;
362 int i, status; 297 int status;
363 struct o2hb_bio_wait_ctxt wc; 298 struct o2hb_bio_wait_ctxt wc;
364 struct bio **bios;
365 struct bio *bio; 299 struct bio *bio;
366 300
367 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); 301 o2hb_bio_wait_init(&wc);
368 302
369 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); 303 while(current_slot < max_slots) {
370 if (!bios) { 304 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
371 status = -ENOMEM;
372 mlog_errno(status);
373 return status;
374 }
375
376 o2hb_bio_wait_init(&wc, num_bios);
377
378 num_slots = slots_per_bio;
379 for(i = 0; i < num_bios; i++) {
380 start_slot = i * slots_per_bio;
381
382 /* adjust num_slots at last bio */
383 if (max_slots < (start_slot + num_slots))
384 num_slots = max_slots - start_slot;
385
386 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
387 if (IS_ERR(bio)) { 305 if (IS_ERR(bio)) {
388 o2hb_bio_wait_dec(&wc, num_bios - i);
389
390 status = PTR_ERR(bio); 306 status = PTR_ERR(bio);
391 mlog_errno(status); 307 mlog_errno(status);
392 goto bail_and_wait; 308 goto bail_and_wait;
393 } 309 }
394 bios[i] = bio;
395 310
311 atomic_inc(&wc.wc_num_reqs);
396 submit_bio(READ, bio); 312 submit_bio(READ, bio);
397 } 313 }
398 314
@@ -403,38 +319,30 @@ bail_and_wait:
403 if (wc.wc_error && !status) 319 if (wc.wc_error && !status)
404 status = wc.wc_error; 320 status = wc.wc_error;
405 321
406 if (bios) {
407 for(i = 0; i < num_bios; i++)
408 if (bios[i])
409 bio_put(bios[i]);
410 kfree(bios);
411 }
412
413 return status; 322 return status;
414} 323}
415 324
416static int o2hb_issue_node_write(struct o2hb_region *reg, 325static int o2hb_issue_node_write(struct o2hb_region *reg,
417 struct bio **write_bio,
418 struct o2hb_bio_wait_ctxt *write_wc) 326 struct o2hb_bio_wait_ctxt *write_wc)
419{ 327{
420 int status; 328 int status;
421 unsigned int slot; 329 unsigned int slot;
422 struct bio *bio; 330 struct bio *bio;
423 331
424 o2hb_bio_wait_init(write_wc, 1); 332 o2hb_bio_wait_init(write_wc);
425 333
426 slot = o2nm_this_node(); 334 slot = o2nm_this_node();
427 335
428 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); 336 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
429 if (IS_ERR(bio)) { 337 if (IS_ERR(bio)) {
430 status = PTR_ERR(bio); 338 status = PTR_ERR(bio);
431 mlog_errno(status); 339 mlog_errno(status);
432 goto bail; 340 goto bail;
433 } 341 }
434 342
343 atomic_inc(&write_wc->wc_num_reqs);
435 submit_bio(WRITE, bio); 344 submit_bio(WRITE, bio);
436 345
437 *write_bio = bio;
438 status = 0; 346 status = 0;
439bail: 347bail:
440 return status; 348 return status;
@@ -826,7 +734,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
826{ 734{
827 int i, ret, highest_node, change = 0; 735 int i, ret, highest_node, change = 0;
828 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 736 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
829 struct bio *write_bio;
830 struct o2hb_bio_wait_ctxt write_wc; 737 struct o2hb_bio_wait_ctxt write_wc;
831 738
832 ret = o2nm_configured_node_map(configured_nodes, 739 ret = o2nm_configured_node_map(configured_nodes,
@@ -864,7 +771,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
864 771
865 /* And fire off the write. Note that we don't wait on this I/O 772 /* And fire off the write. Note that we don't wait on this I/O
866 * until later. */ 773 * until later. */
867 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 774 ret = o2hb_issue_node_write(reg, &write_wc);
868 if (ret < 0) { 775 if (ret < 0) {
869 mlog_errno(ret); 776 mlog_errno(ret);
870 return ret; 777 return ret;
@@ -882,7 +789,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
882 * people we find in our steady state have seen us. 789 * people we find in our steady state have seen us.
883 */ 790 */
884 o2hb_wait_on_io(reg, &write_wc); 791 o2hb_wait_on_io(reg, &write_wc);
885 bio_put(write_bio);
886 if (write_wc.wc_error) { 792 if (write_wc.wc_error) {
887 /* Do not re-arm the write timeout on I/O error - we 793 /* Do not re-arm the write timeout on I/O error - we
888 * can't be sure that the new block ever made it to 794 * can't be sure that the new block ever made it to
@@ -943,7 +849,6 @@ static int o2hb_thread(void *data)
943{ 849{
944 int i, ret; 850 int i, ret;
945 struct o2hb_region *reg = data; 851 struct o2hb_region *reg = data;
946 struct bio *write_bio;
947 struct o2hb_bio_wait_ctxt write_wc; 852 struct o2hb_bio_wait_ctxt write_wc;
948 struct timeval before_hb, after_hb; 853 struct timeval before_hb, after_hb;
949 unsigned int elapsed_msec; 854 unsigned int elapsed_msec;
@@ -993,10 +898,9 @@ static int o2hb_thread(void *data)
993 * 898 *
994 * XXX: Should we skip this on unclean_stop? */ 899 * XXX: Should we skip this on unclean_stop? */
995 o2hb_prepare_block(reg, 0); 900 o2hb_prepare_block(reg, 0);
996 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 901 ret = o2hb_issue_node_write(reg, &write_wc);
997 if (ret == 0) { 902 if (ret == 0) {
998 o2hb_wait_on_io(reg, &write_wc); 903 o2hb_wait_on_io(reg, &write_wc);
999 bio_put(write_bio);
1000 } else { 904 } else {
1001 mlog_errno(ret); 905 mlog_errno(ret);
1002 } 906 }
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ae4ff4a6636b..1718215fc018 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -556,6 +556,8 @@ static void o2net_register_callbacks(struct sock *sk,
556 sk->sk_data_ready = o2net_data_ready; 556 sk->sk_data_ready = o2net_data_ready;
557 sk->sk_state_change = o2net_state_change; 557 sk->sk_state_change = o2net_state_change;
558 558
559 mutex_init(&sc->sc_send_lock);
560
559 write_unlock_bh(&sk->sk_callback_lock); 561 write_unlock_bh(&sk->sk_callback_lock);
560} 562}
561 563
@@ -688,6 +690,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh)
688 * be given to the handler if their payload is longer than the max. */ 690 * be given to the handler if their payload is longer than the max. */
689int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, 691int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
690 o2net_msg_handler_func *func, void *data, 692 o2net_msg_handler_func *func, void *data,
693 o2net_post_msg_handler_func *post_func,
691 struct list_head *unreg_list) 694 struct list_head *unreg_list)
692{ 695{
693 struct o2net_msg_handler *nmh = NULL; 696 struct o2net_msg_handler *nmh = NULL;
@@ -722,6 +725,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
722 725
723 nmh->nh_func = func; 726 nmh->nh_func = func;
724 nmh->nh_func_data = data; 727 nmh->nh_func_data = data;
728 nmh->nh_post_func = post_func;
725 nmh->nh_msg_type = msg_type; 729 nmh->nh_msg_type = msg_type;
726 nmh->nh_max_len = max_len; 730 nmh->nh_max_len = max_len;
727 nmh->nh_key = key; 731 nmh->nh_key = key;
@@ -856,10 +860,12 @@ static void o2net_sendpage(struct o2net_sock_container *sc,
856 ssize_t ret; 860 ssize_t ret;
857 861
858 862
863 mutex_lock(&sc->sc_send_lock);
859 ret = sc->sc_sock->ops->sendpage(sc->sc_sock, 864 ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
860 virt_to_page(kmalloced_virt), 865 virt_to_page(kmalloced_virt),
861 (long)kmalloced_virt & ~PAGE_MASK, 866 (long)kmalloced_virt & ~PAGE_MASK,
862 size, MSG_DONTWAIT); 867 size, MSG_DONTWAIT);
868 mutex_unlock(&sc->sc_send_lock);
863 if (ret != size) { 869 if (ret != size) {
864 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT 870 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
865 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret); 871 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
@@ -974,8 +980,10 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
974 980
975 /* finally, convert the message header to network byte-order 981 /* finally, convert the message header to network byte-order
976 * and send */ 982 * and send */
983 mutex_lock(&sc->sc_send_lock);
977 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen, 984 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
978 sizeof(struct o2net_msg) + caller_bytes); 985 sizeof(struct o2net_msg) + caller_bytes);
986 mutex_unlock(&sc->sc_send_lock);
979 msglog(msg, "sending returned %d\n", ret); 987 msglog(msg, "sending returned %d\n", ret);
980 if (ret < 0) { 988 if (ret < 0) {
981 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret); 989 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
@@ -1049,6 +1057,7 @@ static int o2net_process_message(struct o2net_sock_container *sc,
1049 int ret = 0, handler_status; 1057 int ret = 0, handler_status;
1050 enum o2net_system_error syserr; 1058 enum o2net_system_error syserr;
1051 struct o2net_msg_handler *nmh = NULL; 1059 struct o2net_msg_handler *nmh = NULL;
1060 void *ret_data = NULL;
1052 1061
1053 msglog(hdr, "processing message\n"); 1062 msglog(hdr, "processing message\n");
1054 1063
@@ -1101,17 +1110,26 @@ static int o2net_process_message(struct o2net_sock_container *sc,
1101 sc->sc_msg_type = be16_to_cpu(hdr->msg_type); 1110 sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
1102 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + 1111 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
1103 be16_to_cpu(hdr->data_len), 1112 be16_to_cpu(hdr->data_len),
1104 nmh->nh_func_data); 1113 nmh->nh_func_data, &ret_data);
1105 do_gettimeofday(&sc->sc_tv_func_stop); 1114 do_gettimeofday(&sc->sc_tv_func_stop);
1106 1115
1107out_respond: 1116out_respond:
1108 /* this destroys the hdr, so don't use it after this */ 1117 /* this destroys the hdr, so don't use it after this */
1118 mutex_lock(&sc->sc_send_lock);
1109 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, 1119 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
1110 handler_status); 1120 handler_status);
1121 mutex_unlock(&sc->sc_send_lock);
1111 hdr = NULL; 1122 hdr = NULL;
1112 mlog(0, "sending handler status %d, syserr %d returned %d\n", 1123 mlog(0, "sending handler status %d, syserr %d returned %d\n",
1113 handler_status, syserr, ret); 1124 handler_status, syserr, ret);
1114 1125
1126 if (nmh) {
1127 BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
1128 if (nmh->nh_post_func)
1129 (nmh->nh_post_func)(handler_status, nmh->nh_func_data,
1130 ret_data);
1131 }
1132
1115out: 1133out:
1116 if (nmh) 1134 if (nmh)
1117 o2net_handler_put(nmh); 1135 o2net_handler_put(nmh);
@@ -1795,13 +1813,13 @@ out:
1795 ready(sk, bytes); 1813 ready(sk, bytes);
1796} 1814}
1797 1815
1798static int o2net_open_listening_sock(__be16 port) 1816static int o2net_open_listening_sock(__be32 addr, __be16 port)
1799{ 1817{
1800 struct socket *sock = NULL; 1818 struct socket *sock = NULL;
1801 int ret; 1819 int ret;
1802 struct sockaddr_in sin = { 1820 struct sockaddr_in sin = {
1803 .sin_family = PF_INET, 1821 .sin_family = PF_INET,
1804 .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) }, 1822 .sin_addr = { .s_addr = (__force u32)addr },
1805 .sin_port = (__force u16)port, 1823 .sin_port = (__force u16)port,
1806 }; 1824 };
1807 1825
@@ -1824,15 +1842,15 @@ static int o2net_open_listening_sock(__be16 port)
1824 sock->sk->sk_reuse = 1; 1842 sock->sk->sk_reuse = 1;
1825 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); 1843 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
1826 if (ret < 0) { 1844 if (ret < 0) {
1827 mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", 1845 mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, "
1828 ntohs(port), ret); 1846 "ret=%d\n", NIPQUAD(addr), ntohs(port), ret);
1829 goto out; 1847 goto out;
1830 } 1848 }
1831 1849
1832 ret = sock->ops->listen(sock, 64); 1850 ret = sock->ops->listen(sock, 64);
1833 if (ret < 0) { 1851 if (ret < 0) {
1834 mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", 1852 mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n",
1835 ntohs(port), ret); 1853 NIPQUAD(addr), ntohs(port), ret);
1836 } 1854 }
1837 1855
1838out: 1856out:
@@ -1865,7 +1883,8 @@ int o2net_start_listening(struct o2nm_node *node)
1865 return -ENOMEM; /* ? */ 1883 return -ENOMEM; /* ? */
1866 } 1884 }
1867 1885
1868 ret = o2net_open_listening_sock(node->nd_ipv4_port); 1886 ret = o2net_open_listening_sock(node->nd_ipv4_address,
1887 node->nd_ipv4_port);
1869 if (ret) { 1888 if (ret) {
1870 destroy_workqueue(o2net_wq); 1889 destroy_workqueue(o2net_wq);
1871 o2net_wq = NULL; 1890 o2net_wq = NULL;
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 21a4e43df836..da880fc215f0 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -50,7 +50,10 @@ struct o2net_msg
50 __u8 buf[0]; 50 __u8 buf[0];
51}; 51};
52 52
53typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data); 53typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data,
54 void **ret_data);
55typedef void (o2net_post_msg_handler_func)(int status, void *data,
56 void *ret_data);
54 57
55#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) 58#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg))
56 59
@@ -99,6 +102,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
99 102
100int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, 103int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
101 o2net_msg_handler_func *func, void *data, 104 o2net_msg_handler_func *func, void *data,
105 o2net_post_msg_handler_func *post_func,
102 struct list_head *unreg_list); 106 struct list_head *unreg_list);
103void o2net_unregister_handler_list(struct list_head *list); 107void o2net_unregister_handler_list(struct list_head *list);
104 108
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index b700dc9624d1..4dae5df5e467 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,12 @@
38 * locking semantics of the file system using the protocol. It should 38 * locking semantics of the file system using the protocol. It should
39 * be somewhere else, I'm sure, but right now it isn't. 39 * be somewhere else, I'm sure, but right now it isn't.
40 * 40 *
41 * New in version 7:
42 * - DLM join domain includes the live nodemap
43 *
44 * New in version 6:
45 * - DLM lockres remote refcount fixes.
46 *
41 * New in version 5: 47 * New in version 5:
42 * - Network timeout checking protocol 48 * - Network timeout checking protocol
43 * 49 *
@@ -51,7 +57,7 @@
51 * - full 64 bit i_size in the metadata lock lvbs 57 * - full 64 bit i_size in the metadata lock lvbs
52 * - introduction of "rw" lock and pushing meta/data locking down 58 * - introduction of "rw" lock and pushing meta/data locking down
53 */ 59 */
54#define O2NET_PROTOCOL_VERSION 5ULL 60#define O2NET_PROTOCOL_VERSION 7ULL
55struct o2net_handshake { 61struct o2net_handshake {
56 __be64 protocol_version; 62 __be64 protocol_version;
57 __be64 connector_id; 63 __be64 connector_id;
@@ -149,6 +155,8 @@ struct o2net_sock_container {
149 struct timeval sc_tv_func_stop; 155 struct timeval sc_tv_func_stop;
150 u32 sc_msg_key; 156 u32 sc_msg_key;
151 u16 sc_msg_type; 157 u16 sc_msg_type;
158
159 struct mutex sc_send_lock;
152}; 160};
153 161
154struct o2net_msg_handler { 162struct o2net_msg_handler {
@@ -158,6 +166,8 @@ struct o2net_msg_handler {
158 u32 nh_key; 166 u32 nh_key;
159 o2net_msg_handler_func *nh_func; 167 o2net_msg_handler_func *nh_func;
160 o2net_msg_handler_func *nh_func_data; 168 o2net_msg_handler_func *nh_func_data;
169 o2net_post_msg_handler_func
170 *nh_post_func;
161 struct kref nh_kref; 171 struct kref nh_kref;
162 struct list_head nh_unregister_item; 172 struct list_head nh_unregister_item;
163}; 173};
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 681046d51393..241cad342a48 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -263,7 +263,8 @@ void dlm_do_local_bast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
263 263
264 264
265 265
266int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data) 266int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
267 void **ret_data)
267{ 268{
268 int ret; 269 int ret;
269 unsigned int locklen; 270 unsigned int locklen;
@@ -311,8 +312,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
311 past->type != DLM_BAST) { 312 past->type != DLM_BAST) {
312 mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu" 313 mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu"
313 "name=%.*s\n", past->type, 314 "name=%.*s\n", past->type,
314 dlm_get_lock_cookie_node(cookie), 315 dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
315 dlm_get_lock_cookie_seq(cookie), 316 dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
316 locklen, name); 317 locklen, name);
317 ret = DLM_IVLOCKID; 318 ret = DLM_IVLOCKID;
318 goto leave; 319 goto leave;
@@ -323,8 +324,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
323 mlog(0, "got %sast for unknown lockres! " 324 mlog(0, "got %sast for unknown lockres! "
324 "cookie=%u:%llu, name=%.*s, namelen=%u\n", 325 "cookie=%u:%llu, name=%.*s, namelen=%u\n",
325 past->type == DLM_AST ? "" : "b", 326 past->type == DLM_AST ? "" : "b",
326 dlm_get_lock_cookie_node(cookie), 327 dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
327 dlm_get_lock_cookie_seq(cookie), 328 dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
328 locklen, name, locklen); 329 locklen, name, locklen);
329 ret = DLM_IVLOCKID; 330 ret = DLM_IVLOCKID;
330 goto leave; 331 goto leave;
@@ -369,7 +370,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
369 370
370 mlog(0, "got %sast for unknown lock! cookie=%u:%llu, " 371 mlog(0, "got %sast for unknown lock! cookie=%u:%llu, "
371 "name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b", 372 "name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b",
372 dlm_get_lock_cookie_node(cookie), dlm_get_lock_cookie_seq(cookie), 373 dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
374 dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
373 locklen, name, locklen); 375 locklen, name, locklen);
374 376
375 ret = DLM_NORMAL; 377 ret = DLM_NORMAL;
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 6b6ff76538c5..e90b92f9ece1 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -180,6 +180,11 @@ struct dlm_assert_master_priv
180 unsigned ignore_higher:1; 180 unsigned ignore_higher:1;
181}; 181};
182 182
183struct dlm_deref_lockres_priv
184{
185 struct dlm_lock_resource *deref_res;
186 u8 deref_node;
187};
183 188
184struct dlm_work_item 189struct dlm_work_item
185{ 190{
@@ -191,6 +196,7 @@ struct dlm_work_item
191 struct dlm_request_all_locks_priv ral; 196 struct dlm_request_all_locks_priv ral;
192 struct dlm_mig_lockres_priv ml; 197 struct dlm_mig_lockres_priv ml;
193 struct dlm_assert_master_priv am; 198 struct dlm_assert_master_priv am;
199 struct dlm_deref_lockres_priv dl;
194 } u; 200 } u;
195}; 201};
196 202
@@ -222,6 +228,9 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
222#define DLM_LOCK_RES_DIRTY 0x00000008 228#define DLM_LOCK_RES_DIRTY 0x00000008
223#define DLM_LOCK_RES_IN_PROGRESS 0x00000010 229#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
224#define DLM_LOCK_RES_MIGRATING 0x00000020 230#define DLM_LOCK_RES_MIGRATING 0x00000020
231#define DLM_LOCK_RES_DROPPING_REF 0x00000040
232#define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000
233#define DLM_LOCK_RES_SETREF_INPROG 0x00002000
225 234
226/* max milliseconds to wait to sync up a network failure with a node death */ 235/* max milliseconds to wait to sync up a network failure with a node death */
227#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) 236#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
@@ -265,6 +274,8 @@ struct dlm_lock_resource
265 u8 owner; //node which owns the lock resource, or unknown 274 u8 owner; //node which owns the lock resource, or unknown
266 u16 state; 275 u16 state;
267 char lvb[DLM_LVB_LEN]; 276 char lvb[DLM_LVB_LEN];
277 unsigned int inflight_locks;
278 unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
268}; 279};
269 280
270struct dlm_migratable_lock 281struct dlm_migratable_lock
@@ -367,7 +378,7 @@ enum {
367 DLM_CONVERT_LOCK_MSG, /* 504 */ 378 DLM_CONVERT_LOCK_MSG, /* 504 */
368 DLM_PROXY_AST_MSG, /* 505 */ 379 DLM_PROXY_AST_MSG, /* 505 */
369 DLM_UNLOCK_LOCK_MSG, /* 506 */ 380 DLM_UNLOCK_LOCK_MSG, /* 506 */
370 DLM_UNUSED_MSG2, /* 507 */ 381 DLM_DEREF_LOCKRES_MSG, /* 507 */
371 DLM_MIGRATE_REQUEST_MSG, /* 508 */ 382 DLM_MIGRATE_REQUEST_MSG, /* 508 */
372 DLM_MIG_LOCKRES_MSG, /* 509 */ 383 DLM_MIG_LOCKRES_MSG, /* 509 */
373 DLM_QUERY_JOIN_MSG, /* 510 */ 384 DLM_QUERY_JOIN_MSG, /* 510 */
@@ -417,6 +428,9 @@ struct dlm_master_request
417 u8 name[O2NM_MAX_NAME_LEN]; 428 u8 name[O2NM_MAX_NAME_LEN];
418}; 429};
419 430
431#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
432#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002
433
420#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001 434#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
421#define DLM_ASSERT_MASTER_REQUERY 0x00000002 435#define DLM_ASSERT_MASTER_REQUERY 0x00000002
422#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004 436#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
@@ -430,6 +444,8 @@ struct dlm_assert_master
430 u8 name[O2NM_MAX_NAME_LEN]; 444 u8 name[O2NM_MAX_NAME_LEN];
431}; 445};
432 446
447#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001
448
433struct dlm_migrate_request 449struct dlm_migrate_request
434{ 450{
435 u8 master; 451 u8 master;
@@ -609,12 +625,16 @@ struct dlm_begin_reco
609}; 625};
610 626
611 627
628#define BITS_PER_BYTE 8
629#define BITS_TO_BYTES(bits) (((bits)+BITS_PER_BYTE-1)/BITS_PER_BYTE)
630
612struct dlm_query_join_request 631struct dlm_query_join_request
613{ 632{
614 u8 node_idx; 633 u8 node_idx;
615 u8 pad1[2]; 634 u8 pad1[2];
616 u8 name_len; 635 u8 name_len;
617 u8 domain[O2NM_MAX_NAME_LEN]; 636 u8 domain[O2NM_MAX_NAME_LEN];
637 u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
618}; 638};
619 639
620struct dlm_assert_joined 640struct dlm_assert_joined
@@ -648,6 +668,16 @@ struct dlm_finalize_reco
648 __be32 pad2; 668 __be32 pad2;
649}; 669};
650 670
671struct dlm_deref_lockres
672{
673 u32 pad1;
674 u16 pad2;
675 u8 node_idx;
676 u8 namelen;
677
678 u8 name[O2NM_MAX_NAME_LEN];
679};
680
651static inline enum dlm_status 681static inline enum dlm_status
652__dlm_lockres_state_to_status(struct dlm_lock_resource *res) 682__dlm_lockres_state_to_status(struct dlm_lock_resource *res)
653{ 683{
@@ -688,16 +718,20 @@ void dlm_lock_put(struct dlm_lock *lock);
688void dlm_lock_attach_lockres(struct dlm_lock *lock, 718void dlm_lock_attach_lockres(struct dlm_lock *lock,
689 struct dlm_lock_resource *res); 719 struct dlm_lock_resource *res);
690 720
691int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data); 721int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
692int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data); 722 void **ret_data);
693int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data); 723int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
724 void **ret_data);
725int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
726 void **ret_data);
694 727
695void dlm_revert_pending_convert(struct dlm_lock_resource *res, 728void dlm_revert_pending_convert(struct dlm_lock_resource *res,
696 struct dlm_lock *lock); 729 struct dlm_lock *lock);
697void dlm_revert_pending_lock(struct dlm_lock_resource *res, 730void dlm_revert_pending_lock(struct dlm_lock_resource *res,
698 struct dlm_lock *lock); 731 struct dlm_lock *lock);
699 732
700int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data); 733int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
734 void **ret_data);
701void dlm_commit_pending_cancel(struct dlm_lock_resource *res, 735void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
702 struct dlm_lock *lock); 736 struct dlm_lock *lock);
703void dlm_commit_pending_unlock(struct dlm_lock_resource *res, 737void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
@@ -721,8 +755,6 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
721 struct dlm_lock_resource *res); 755 struct dlm_lock_resource *res);
722void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 756void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
723 struct dlm_lock_resource *res); 757 struct dlm_lock_resource *res);
724void dlm_purge_lockres(struct dlm_ctxt *dlm,
725 struct dlm_lock_resource *lockres);
726static inline void dlm_lockres_get(struct dlm_lock_resource *res) 758static inline void dlm_lockres_get(struct dlm_lock_resource *res)
727{ 759{
728 /* This is called on every lookup, so it might be worth 760 /* This is called on every lookup, so it might be worth
@@ -733,6 +765,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
733void __dlm_unhash_lockres(struct dlm_lock_resource *res); 765void __dlm_unhash_lockres(struct dlm_lock_resource *res);
734void __dlm_insert_lockres(struct dlm_ctxt *dlm, 766void __dlm_insert_lockres(struct dlm_ctxt *dlm,
735 struct dlm_lock_resource *res); 767 struct dlm_lock_resource *res);
768struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
769 const char *name,
770 unsigned int len,
771 unsigned int hash);
736struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 772struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
737 const char *name, 773 const char *name,
738 unsigned int len, 774 unsigned int len,
@@ -753,6 +789,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
753 const char *name, 789 const char *name,
754 unsigned int namelen); 790 unsigned int namelen);
755 791
792#define dlm_lockres_set_refmap_bit(bit,res) \
793 __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
794#define dlm_lockres_clear_refmap_bit(bit,res) \
795 __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
796
797static inline void __dlm_lockres_set_refmap_bit(int bit,
798 struct dlm_lock_resource *res,
799 const char *file,
800 int line)
801{
802 //printk("%s:%d:%.*s: setting bit %d\n", file, line,
803 // res->lockname.len, res->lockname.name, bit);
804 set_bit(bit, res->refmap);
805}
806
807static inline void __dlm_lockres_clear_refmap_bit(int bit,
808 struct dlm_lock_resource *res,
809 const char *file,
810 int line)
811{
812 //printk("%s:%d:%.*s: clearing bit %d\n", file, line,
813 // res->lockname.len, res->lockname.name, bit);
814 clear_bit(bit, res->refmap);
815}
816
817void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
818 struct dlm_lock_resource *res,
819 const char *file,
820 int line);
821void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
822 struct dlm_lock_resource *res,
823 int new_lockres,
824 const char *file,
825 int line);
826#define dlm_lockres_drop_inflight_ref(d,r) \
827 __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
828#define dlm_lockres_grab_inflight_ref(d,r) \
829 __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
830#define dlm_lockres_grab_inflight_ref_new(d,r) \
831 __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
832
756void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 833void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
757void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 834void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
758void dlm_do_local_ast(struct dlm_ctxt *dlm, 835void dlm_do_local_ast(struct dlm_ctxt *dlm,
@@ -801,10 +878,7 @@ int dlm_heartbeat_init(struct dlm_ctxt *dlm);
801void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data); 878void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data);
802void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data); 879void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data);
803 880
804int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); 881int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
805int dlm_migrate_lockres(struct dlm_ctxt *dlm,
806 struct dlm_lock_resource *res,
807 u8 target);
808int dlm_finish_migration(struct dlm_ctxt *dlm, 882int dlm_finish_migration(struct dlm_ctxt *dlm,
809 struct dlm_lock_resource *res, 883 struct dlm_lock_resource *res,
810 u8 old_master); 884 u8 old_master);
@@ -812,15 +886,27 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
812 struct dlm_lock_resource *res); 886 struct dlm_lock_resource *res);
813void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res); 887void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
814 888
815int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); 889int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
816int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); 890 void **ret_data);
817int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); 891int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
818int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); 892 void **ret_data);
819int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); 893void dlm_assert_master_post_handler(int status, void *data, void *ret_data);
820int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data); 894int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
821int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data); 895 void **ret_data);
822int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data); 896int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
823int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data); 897 void **ret_data);
898int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
899 void **ret_data);
900int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
901 void **ret_data);
902int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
903 void **ret_data);
904int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
905 void **ret_data);
906int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
907 void **ret_data);
908int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
909 void **ret_data);
824int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 910int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
825 u8 nodenum, u8 *real_master); 911 u8 nodenum, u8 *real_master);
826 912
@@ -856,10 +942,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
856int dlm_init_mle_cache(void); 942int dlm_init_mle_cache(void);
857void dlm_destroy_mle_cache(void); 943void dlm_destroy_mle_cache(void);
858void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up); 944void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
945int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
946 struct dlm_lock_resource *res);
859void dlm_clean_master_list(struct dlm_ctxt *dlm, 947void dlm_clean_master_list(struct dlm_ctxt *dlm,
860 u8 dead_node); 948 u8 dead_node);
861int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); 949int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
862 950int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
863int __dlm_lockres_unused(struct dlm_lock_resource *res); 951int __dlm_lockres_unused(struct dlm_lock_resource *res);
864 952
865static inline const char * dlm_lock_mode_name(int mode) 953static inline const char * dlm_lock_mode_name(int mode)
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index c764dc8e40a2..ecb4d997221e 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -286,8 +286,8 @@ enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
286 __dlm_print_one_lock_resource(res); 286 __dlm_print_one_lock_resource(res);
287 mlog(ML_ERROR, "converting a remote lock that is already " 287 mlog(ML_ERROR, "converting a remote lock that is already "
288 "converting! (cookie=%u:%llu, conv=%d)\n", 288 "converting! (cookie=%u:%llu, conv=%d)\n",
289 dlm_get_lock_cookie_node(lock->ml.cookie), 289 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
290 dlm_get_lock_cookie_seq(lock->ml.cookie), 290 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
291 lock->ml.convert_type); 291 lock->ml.convert_type);
292 status = DLM_DENIED; 292 status = DLM_DENIED;
293 goto bail; 293 goto bail;
@@ -418,7 +418,8 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
418 * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS, 418 * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
419 * status from __dlmconvert_master 419 * status from __dlmconvert_master
420 */ 420 */
421int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) 421int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
422 void **ret_data)
422{ 423{
423 struct dlm_ctxt *dlm = data; 424 struct dlm_ctxt *dlm = data;
424 struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf; 425 struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
@@ -428,7 +429,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
428 struct dlm_lockstatus *lksb; 429 struct dlm_lockstatus *lksb;
429 enum dlm_status status = DLM_NORMAL; 430 enum dlm_status status = DLM_NORMAL;
430 u32 flags; 431 u32 flags;
431 int call_ast = 0, kick_thread = 0, ast_reserved = 0; 432 int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
432 433
433 if (!dlm_grab(dlm)) { 434 if (!dlm_grab(dlm)) {
434 dlm_error(DLM_REJECTED); 435 dlm_error(DLM_REJECTED);
@@ -479,25 +480,14 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
479 } 480 }
480 lock = NULL; 481 lock = NULL;
481 } 482 }
482 if (!lock) {
483 __dlm_print_one_lock_resource(res);
484 list_for_each(iter, &res->granted) {
485 lock = list_entry(iter, struct dlm_lock, list);
486 if (lock->ml.node == cnv->node_idx) {
487 mlog(ML_ERROR, "There is something here "
488 "for node %u, lock->ml.cookie=%llu, "
489 "cnv->cookie=%llu\n", cnv->node_idx,
490 (unsigned long long)lock->ml.cookie,
491 (unsigned long long)cnv->cookie);
492 break;
493 }
494 }
495 lock = NULL;
496 }
497 spin_unlock(&res->spinlock); 483 spin_unlock(&res->spinlock);
498 if (!lock) { 484 if (!lock) {
499 status = DLM_IVLOCKID; 485 status = DLM_IVLOCKID;
500 dlm_error(status); 486 mlog(ML_ERROR, "did not find lock to convert on grant queue! "
487 "cookie=%u:%llu\n",
488 dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
489 dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
490 __dlm_print_one_lock_resource(res);
501 goto leave; 491 goto leave;
502 } 492 }
503 493
@@ -524,8 +514,11 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
524 cnv->requested_type, 514 cnv->requested_type,
525 &call_ast, &kick_thread); 515 &call_ast, &kick_thread);
526 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 516 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
517 wake = 1;
527 } 518 }
528 spin_unlock(&res->spinlock); 519 spin_unlock(&res->spinlock);
520 if (wake)
521 wake_up(&res->wq);
529 522
530 if (status != DLM_NORMAL) { 523 if (status != DLM_NORMAL) {
531 if (status != DLM_NOTQUEUED) 524 if (status != DLM_NOTQUEUED)
@@ -534,12 +527,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
534 } 527 }
535 528
536leave: 529leave:
537 if (!lock) 530 if (lock)
538 mlog(ML_ERROR, "did not find lock to convert on grant queue! "
539 "cookie=%u:%llu\n",
540 dlm_get_lock_cookie_node(cnv->cookie),
541 dlm_get_lock_cookie_seq(cnv->cookie));
542 else
543 dlm_lock_put(lock); 531 dlm_lock_put(lock);
544 532
545 /* either queue the ast or release it, if reserved */ 533 /* either queue the ast or release it, if reserved */
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 3f6c8d88f7af..64239b37e5d4 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res)
53 spin_unlock(&res->spinlock); 53 spin_unlock(&res->spinlock);
54} 54}
55 55
56static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
57{
58 int bit;
59 assert_spin_locked(&res->spinlock);
60
61 mlog(ML_NOTICE, " refmap nodes: [ ");
62 bit = 0;
63 while (1) {
64 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
65 if (bit >= O2NM_MAX_NODES)
66 break;
67 printk("%u ", bit);
68 bit++;
69 }
70 printk("], inflight=%u\n", res->inflight_locks);
71}
72
56void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) 73void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
57{ 74{
58 struct list_head *iter2; 75 struct list_head *iter2;
@@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
65 res->owner, res->state); 82 res->owner, res->state);
66 mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n", 83 mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n",
67 res->last_used, list_empty(&res->purge) ? "no" : "yes"); 84 res->last_used, list_empty(&res->purge) ? "no" : "yes");
85 dlm_print_lockres_refmap(res);
68 mlog(ML_NOTICE, " granted queue: \n"); 86 mlog(ML_NOTICE, " granted queue: \n");
69 list_for_each(iter2, &res->granted) { 87 list_for_each(iter2, &res->granted) {
70 lock = list_entry(iter2, struct dlm_lock, list); 88 lock = list_entry(iter2, struct dlm_lock, list);
@@ -72,8 +90,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
72 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " 90 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
73 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 91 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
74 lock->ml.type, lock->ml.convert_type, lock->ml.node, 92 lock->ml.type, lock->ml.convert_type, lock->ml.node,
75 dlm_get_lock_cookie_node(lock->ml.cookie), 93 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
76 dlm_get_lock_cookie_seq(lock->ml.cookie), 94 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
77 list_empty(&lock->ast_list) ? 'y' : 'n', 95 list_empty(&lock->ast_list) ? 'y' : 'n',
78 lock->ast_pending ? 'y' : 'n', 96 lock->ast_pending ? 'y' : 'n',
79 list_empty(&lock->bast_list) ? 'y' : 'n', 97 list_empty(&lock->bast_list) ? 'y' : 'n',
@@ -87,8 +105,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
87 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " 105 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
88 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 106 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
89 lock->ml.type, lock->ml.convert_type, lock->ml.node, 107 lock->ml.type, lock->ml.convert_type, lock->ml.node,
90 dlm_get_lock_cookie_node(lock->ml.cookie), 108 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
91 dlm_get_lock_cookie_seq(lock->ml.cookie), 109 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
92 list_empty(&lock->ast_list) ? 'y' : 'n', 110 list_empty(&lock->ast_list) ? 'y' : 'n',
93 lock->ast_pending ? 'y' : 'n', 111 lock->ast_pending ? 'y' : 'n',
94 list_empty(&lock->bast_list) ? 'y' : 'n', 112 list_empty(&lock->bast_list) ? 'y' : 'n',
@@ -102,8 +120,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
102 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " 120 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
103 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 121 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
104 lock->ml.type, lock->ml.convert_type, lock->ml.node, 122 lock->ml.type, lock->ml.convert_type, lock->ml.node,
105 dlm_get_lock_cookie_node(lock->ml.cookie), 123 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
106 dlm_get_lock_cookie_seq(lock->ml.cookie), 124 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
107 list_empty(&lock->ast_list) ? 'y' : 'n', 125 list_empty(&lock->ast_list) ? 'y' : 'n',
108 lock->ast_pending ? 'y' : 'n', 126 lock->ast_pending ? 'y' : 'n',
109 list_empty(&lock->bast_list) ? 'y' : 'n', 127 list_empty(&lock->bast_list) ? 'y' : 'n',
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index f0b25f2dd205..6087c4749fee 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -48,6 +48,36 @@
48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49#include "cluster/masklog.h" 49#include "cluster/masklog.h"
50 50
51/*
52 * ocfs2 node maps are array of long int, which limits to send them freely
53 * across the wire due to endianness issues. To workaround this, we convert
54 * long ints to byte arrays. Following 3 routines are helper functions to
55 * set/test/copy bits within those array of bytes
56 */
57static inline void byte_set_bit(u8 nr, u8 map[])
58{
59 map[nr >> 3] |= (1UL << (nr & 7));
60}
61
62static inline int byte_test_bit(u8 nr, u8 map[])
63{
64 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65}
66
67static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68 unsigned int sz)
69{
70 unsigned int nn;
71
72 if (!sz)
73 return;
74
75 memset(dmap, 0, ((sz + 7) >> 3));
76 for (nn = 0 ; nn < sz; nn++)
77 if (test_bit(nn, smap))
78 byte_set_bit(nn, dmap);
79}
80
51static void dlm_free_pagevec(void **vec, int pages) 81static void dlm_free_pagevec(void **vec, int pages)
52{ 82{
53 while (pages--) 83 while (pages--)
@@ -95,10 +125,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
95 125
96#define DLM_DOMAIN_BACKOFF_MS 200 126#define DLM_DOMAIN_BACKOFF_MS 200
97 127
98static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data); 128static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
99static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data); 129 void **ret_data);
100static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data); 130static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
101static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data); 131 void **ret_data);
132static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
133 void **ret_data);
134static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
135 void **ret_data);
102 136
103static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 137static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
104 138
@@ -125,10 +159,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
125 hlist_add_head(&res->hash_node, bucket); 159 hlist_add_head(&res->hash_node, bucket);
126} 160}
127 161
128struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 162struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
129 const char *name, 163 const char *name,
130 unsigned int len, 164 unsigned int len,
131 unsigned int hash) 165 unsigned int hash)
132{ 166{
133 struct hlist_head *bucket; 167 struct hlist_head *bucket;
134 struct hlist_node *list; 168 struct hlist_node *list;
@@ -154,6 +188,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
154 return NULL; 188 return NULL;
155} 189}
156 190
191/* intended to be called by functions which do not care about lock
192 * resources which are being purged (most net _handler functions).
193 * this will return NULL for any lock resource which is found but
194 * currently in the process of dropping its mastery reference.
195 * use __dlm_lookup_lockres_full when you need the lock resource
196 * regardless (e.g. dlm_get_lock_resource) */
197struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
198 const char *name,
199 unsigned int len,
200 unsigned int hash)
201{
202 struct dlm_lock_resource *res = NULL;
203
204 mlog_entry("%.*s\n", len, name);
205
206 assert_spin_locked(&dlm->spinlock);
207
208 res = __dlm_lookup_lockres_full(dlm, name, len, hash);
209 if (res) {
210 spin_lock(&res->spinlock);
211 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
212 spin_unlock(&res->spinlock);
213 dlm_lockres_put(res);
214 return NULL;
215 }
216 spin_unlock(&res->spinlock);
217 }
218
219 return res;
220}
221
157struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 222struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
158 const char *name, 223 const char *name,
159 unsigned int len) 224 unsigned int len)
@@ -330,43 +395,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
330 wake_up(&dlm_domain_events); 395 wake_up(&dlm_domain_events);
331} 396}
332 397
333static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) 398static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
334{ 399{
335 int i; 400 int i, num, n, ret = 0;
336 struct dlm_lock_resource *res; 401 struct dlm_lock_resource *res;
402 struct hlist_node *iter;
403 struct hlist_head *bucket;
404 int dropped;
337 405
338 mlog(0, "Migrating locks from domain %s\n", dlm->name); 406 mlog(0, "Migrating locks from domain %s\n", dlm->name);
339restart: 407
408 num = 0;
340 spin_lock(&dlm->spinlock); 409 spin_lock(&dlm->spinlock);
341 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 410 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
342 while (!hlist_empty(dlm_lockres_hash(dlm, i))) { 411redo_bucket:
343 res = hlist_entry(dlm_lockres_hash(dlm, i)->first, 412 n = 0;
344 struct dlm_lock_resource, hash_node); 413 bucket = dlm_lockres_hash(dlm, i);
345 /* need reference when manually grabbing lockres */ 414 iter = bucket->first;
415 while (iter) {
416 n++;
417 res = hlist_entry(iter, struct dlm_lock_resource,
418 hash_node);
346 dlm_lockres_get(res); 419 dlm_lockres_get(res);
347 /* this should unhash the lockres 420 /* migrate, if necessary. this will drop the dlm
348 * and exit with dlm->spinlock */ 421 * spinlock and retake it if it does migration. */
349 mlog(0, "purging res=%p\n", res); 422 dropped = dlm_empty_lockres(dlm, res);
350 if (dlm_lockres_is_dirty(dlm, res)) { 423
351 /* HACK! this should absolutely go. 424 spin_lock(&res->spinlock);
352 * need to figure out why some empty 425 __dlm_lockres_calc_usage(dlm, res);
353 * lockreses are still marked dirty */ 426 iter = res->hash_node.next;
354 mlog(ML_ERROR, "lockres %.*s dirty!\n", 427 spin_unlock(&res->spinlock);
355 res->lockname.len, res->lockname.name); 428
356
357 spin_unlock(&dlm->spinlock);
358 dlm_kick_thread(dlm, res);
359 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
360 dlm_lockres_put(res);
361 goto restart;
362 }
363 dlm_purge_lockres(dlm, res);
364 dlm_lockres_put(res); 429 dlm_lockres_put(res);
430
431 cond_resched_lock(&dlm->spinlock);
432
433 if (dropped)
434 goto redo_bucket;
365 } 435 }
436 num += n;
437 mlog(0, "%s: touched %d lockreses in bucket %d "
438 "(tot=%d)\n", dlm->name, n, i, num);
366 } 439 }
367 spin_unlock(&dlm->spinlock); 440 spin_unlock(&dlm->spinlock);
368 441 wake_up(&dlm->dlm_thread_wq);
442
443 /* let the dlm thread take care of purging, keep scanning until
444 * nothing remains in the hash */
445 if (num) {
446 mlog(0, "%s: %d lock resources in hash last pass\n",
447 dlm->name, num);
448 ret = -EAGAIN;
449 }
369 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 450 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
451 return ret;
370} 452}
371 453
372static int dlm_no_joining_node(struct dlm_ctxt *dlm) 454static int dlm_no_joining_node(struct dlm_ctxt *dlm)
@@ -418,7 +500,8 @@ static void __dlm_print_nodes(struct dlm_ctxt *dlm)
418 printk("\n"); 500 printk("\n");
419} 501}
420 502
421static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data) 503static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
504 void **ret_data)
422{ 505{
423 struct dlm_ctxt *dlm = data; 506 struct dlm_ctxt *dlm = data;
424 unsigned int node; 507 unsigned int node;
@@ -571,7 +654,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
571 /* We changed dlm state, notify the thread */ 654 /* We changed dlm state, notify the thread */
572 dlm_kick_thread(dlm, NULL); 655 dlm_kick_thread(dlm, NULL);
573 656
574 dlm_migrate_all_locks(dlm); 657 while (dlm_migrate_all_locks(dlm)) {
658 mlog(0, "%s: more migration to do\n", dlm->name);
659 }
575 dlm_mark_domain_leaving(dlm); 660 dlm_mark_domain_leaving(dlm);
576 dlm_leave_domain(dlm); 661 dlm_leave_domain(dlm);
577 dlm_complete_dlm_shutdown(dlm); 662 dlm_complete_dlm_shutdown(dlm);
@@ -580,11 +665,13 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
580} 665}
581EXPORT_SYMBOL_GPL(dlm_unregister_domain); 666EXPORT_SYMBOL_GPL(dlm_unregister_domain);
582 667
583static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) 668static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
669 void **ret_data)
584{ 670{
585 struct dlm_query_join_request *query; 671 struct dlm_query_join_request *query;
586 enum dlm_query_join_response response; 672 enum dlm_query_join_response response;
587 struct dlm_ctxt *dlm = NULL; 673 struct dlm_ctxt *dlm = NULL;
674 u8 nodenum;
588 675
589 query = (struct dlm_query_join_request *) msg->buf; 676 query = (struct dlm_query_join_request *) msg->buf;
590 677
@@ -608,6 +695,28 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
608 695
609 spin_lock(&dlm_domain_lock); 696 spin_lock(&dlm_domain_lock);
610 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 697 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
698 if (!dlm)
699 goto unlock_respond;
700
701 /*
702 * There is a small window where the joining node may not see the
703 * node(s) that just left but still part of the cluster. DISALLOW
704 * join request if joining node has different node map.
705 */
706 nodenum=0;
707 while (nodenum < O2NM_MAX_NODES) {
708 if (test_bit(nodenum, dlm->domain_map)) {
709 if (!byte_test_bit(nodenum, query->node_map)) {
710 mlog(0, "disallow join as node %u does not "
711 "have node %u in its nodemap\n",
712 query->node_idx, nodenum);
713 response = JOIN_DISALLOW;
714 goto unlock_respond;
715 }
716 }
717 nodenum++;
718 }
719
611 /* Once the dlm ctxt is marked as leaving then we don't want 720 /* Once the dlm ctxt is marked as leaving then we don't want
612 * to be put in someone's domain map. 721 * to be put in someone's domain map.
613 * Also, explicitly disallow joining at certain troublesome 722 * Also, explicitly disallow joining at certain troublesome
@@ -626,15 +735,15 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
626 /* Disallow parallel joins. */ 735 /* Disallow parallel joins. */
627 response = JOIN_DISALLOW; 736 response = JOIN_DISALLOW;
628 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 737 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
629 mlog(ML_NOTICE, "node %u trying to join, but recovery " 738 mlog(0, "node %u trying to join, but recovery "
630 "is ongoing.\n", bit); 739 "is ongoing.\n", bit);
631 response = JOIN_DISALLOW; 740 response = JOIN_DISALLOW;
632 } else if (test_bit(bit, dlm->recovery_map)) { 741 } else if (test_bit(bit, dlm->recovery_map)) {
633 mlog(ML_NOTICE, "node %u trying to join, but it " 742 mlog(0, "node %u trying to join, but it "
634 "still needs recovery.\n", bit); 743 "still needs recovery.\n", bit);
635 response = JOIN_DISALLOW; 744 response = JOIN_DISALLOW;
636 } else if (test_bit(bit, dlm->domain_map)) { 745 } else if (test_bit(bit, dlm->domain_map)) {
637 mlog(ML_NOTICE, "node %u trying to join, but it " 746 mlog(0, "node %u trying to join, but it "
638 "is still in the domain! needs recovery?\n", 747 "is still in the domain! needs recovery?\n",
639 bit); 748 bit);
640 response = JOIN_DISALLOW; 749 response = JOIN_DISALLOW;
@@ -649,6 +758,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
649 758
650 spin_unlock(&dlm->spinlock); 759 spin_unlock(&dlm->spinlock);
651 } 760 }
761unlock_respond:
652 spin_unlock(&dlm_domain_lock); 762 spin_unlock(&dlm_domain_lock);
653 763
654respond: 764respond:
@@ -657,7 +767,8 @@ respond:
657 return response; 767 return response;
658} 768}
659 769
660static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data) 770static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
771 void **ret_data)
661{ 772{
662 struct dlm_assert_joined *assert; 773 struct dlm_assert_joined *assert;
663 struct dlm_ctxt *dlm = NULL; 774 struct dlm_ctxt *dlm = NULL;
@@ -694,7 +805,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
694 return 0; 805 return 0;
695} 806}
696 807
697static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data) 808static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
809 void **ret_data)
698{ 810{
699 struct dlm_cancel_join *cancel; 811 struct dlm_cancel_join *cancel;
700 struct dlm_ctxt *dlm = NULL; 812 struct dlm_ctxt *dlm = NULL;
@@ -796,6 +908,9 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
796 join_msg.name_len = strlen(dlm->name); 908 join_msg.name_len = strlen(dlm->name);
797 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 909 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
798 910
911 /* copy live node map to join message */
912 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
913
799 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 914 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
800 sizeof(join_msg), node, &retval); 915 sizeof(join_msg), node, &retval);
801 if (status < 0 && status != -ENOPROTOOPT) { 916 if (status < 0 && status != -ENOPROTOOPT) {
@@ -1036,98 +1151,106 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1036 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1151 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1037 sizeof(struct dlm_master_request), 1152 sizeof(struct dlm_master_request),
1038 dlm_master_request_handler, 1153 dlm_master_request_handler,
1039 dlm, &dlm->dlm_domain_handlers); 1154 dlm, NULL, &dlm->dlm_domain_handlers);
1040 if (status) 1155 if (status)
1041 goto bail; 1156 goto bail;
1042 1157
1043 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1158 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1044 sizeof(struct dlm_assert_master), 1159 sizeof(struct dlm_assert_master),
1045 dlm_assert_master_handler, 1160 dlm_assert_master_handler,
1046 dlm, &dlm->dlm_domain_handlers); 1161 dlm, dlm_assert_master_post_handler,
1162 &dlm->dlm_domain_handlers);
1047 if (status) 1163 if (status)
1048 goto bail; 1164 goto bail;
1049 1165
1050 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1166 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1051 sizeof(struct dlm_create_lock), 1167 sizeof(struct dlm_create_lock),
1052 dlm_create_lock_handler, 1168 dlm_create_lock_handler,
1053 dlm, &dlm->dlm_domain_handlers); 1169 dlm, NULL, &dlm->dlm_domain_handlers);
1054 if (status) 1170 if (status)
1055 goto bail; 1171 goto bail;
1056 1172
1057 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1173 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1058 DLM_CONVERT_LOCK_MAX_LEN, 1174 DLM_CONVERT_LOCK_MAX_LEN,
1059 dlm_convert_lock_handler, 1175 dlm_convert_lock_handler,
1060 dlm, &dlm->dlm_domain_handlers); 1176 dlm, NULL, &dlm->dlm_domain_handlers);
1061 if (status) 1177 if (status)
1062 goto bail; 1178 goto bail;
1063 1179
1064 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1180 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1065 DLM_UNLOCK_LOCK_MAX_LEN, 1181 DLM_UNLOCK_LOCK_MAX_LEN,
1066 dlm_unlock_lock_handler, 1182 dlm_unlock_lock_handler,
1067 dlm, &dlm->dlm_domain_handlers); 1183 dlm, NULL, &dlm->dlm_domain_handlers);
1068 if (status) 1184 if (status)
1069 goto bail; 1185 goto bail;
1070 1186
1071 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1187 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1072 DLM_PROXY_AST_MAX_LEN, 1188 DLM_PROXY_AST_MAX_LEN,
1073 dlm_proxy_ast_handler, 1189 dlm_proxy_ast_handler,
1074 dlm, &dlm->dlm_domain_handlers); 1190 dlm, NULL, &dlm->dlm_domain_handlers);
1075 if (status) 1191 if (status)
1076 goto bail; 1192 goto bail;
1077 1193
1078 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1194 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1079 sizeof(struct dlm_exit_domain), 1195 sizeof(struct dlm_exit_domain),
1080 dlm_exit_domain_handler, 1196 dlm_exit_domain_handler,
1081 dlm, &dlm->dlm_domain_handlers); 1197 dlm, NULL, &dlm->dlm_domain_handlers);
1198 if (status)
1199 goto bail;
1200
1201 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1202 sizeof(struct dlm_deref_lockres),
1203 dlm_deref_lockres_handler,
1204 dlm, NULL, &dlm->dlm_domain_handlers);
1082 if (status) 1205 if (status)
1083 goto bail; 1206 goto bail;
1084 1207
1085 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1208 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1086 sizeof(struct dlm_migrate_request), 1209 sizeof(struct dlm_migrate_request),
1087 dlm_migrate_request_handler, 1210 dlm_migrate_request_handler,
1088 dlm, &dlm->dlm_domain_handlers); 1211 dlm, NULL, &dlm->dlm_domain_handlers);
1089 if (status) 1212 if (status)
1090 goto bail; 1213 goto bail;
1091 1214
1092 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1215 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1093 DLM_MIG_LOCKRES_MAX_LEN, 1216 DLM_MIG_LOCKRES_MAX_LEN,
1094 dlm_mig_lockres_handler, 1217 dlm_mig_lockres_handler,
1095 dlm, &dlm->dlm_domain_handlers); 1218 dlm, NULL, &dlm->dlm_domain_handlers);
1096 if (status) 1219 if (status)
1097 goto bail; 1220 goto bail;
1098 1221
1099 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1222 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1100 sizeof(struct dlm_master_requery), 1223 sizeof(struct dlm_master_requery),
1101 dlm_master_requery_handler, 1224 dlm_master_requery_handler,
1102 dlm, &dlm->dlm_domain_handlers); 1225 dlm, NULL, &dlm->dlm_domain_handlers);
1103 if (status) 1226 if (status)
1104 goto bail; 1227 goto bail;
1105 1228
1106 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1229 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1107 sizeof(struct dlm_lock_request), 1230 sizeof(struct dlm_lock_request),
1108 dlm_request_all_locks_handler, 1231 dlm_request_all_locks_handler,
1109 dlm, &dlm->dlm_domain_handlers); 1232 dlm, NULL, &dlm->dlm_domain_handlers);
1110 if (status) 1233 if (status)
1111 goto bail; 1234 goto bail;
1112 1235
1113 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1236 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1114 sizeof(struct dlm_reco_data_done), 1237 sizeof(struct dlm_reco_data_done),
1115 dlm_reco_data_done_handler, 1238 dlm_reco_data_done_handler,
1116 dlm, &dlm->dlm_domain_handlers); 1239 dlm, NULL, &dlm->dlm_domain_handlers);
1117 if (status) 1240 if (status)
1118 goto bail; 1241 goto bail;
1119 1242
1120 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1243 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1121 sizeof(struct dlm_begin_reco), 1244 sizeof(struct dlm_begin_reco),
1122 dlm_begin_reco_handler, 1245 dlm_begin_reco_handler,
1123 dlm, &dlm->dlm_domain_handlers); 1246 dlm, NULL, &dlm->dlm_domain_handlers);
1124 if (status) 1247 if (status)
1125 goto bail; 1248 goto bail;
1126 1249
1127 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1250 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1128 sizeof(struct dlm_finalize_reco), 1251 sizeof(struct dlm_finalize_reco),
1129 dlm_finalize_reco_handler, 1252 dlm_finalize_reco_handler,
1130 dlm, &dlm->dlm_domain_handlers); 1253 dlm, NULL, &dlm->dlm_domain_handlers);
1131 if (status) 1254 if (status)
1132 goto bail; 1255 goto bail;
1133 1256
@@ -1141,6 +1264,8 @@ bail:
1141static int dlm_join_domain(struct dlm_ctxt *dlm) 1264static int dlm_join_domain(struct dlm_ctxt *dlm)
1142{ 1265{
1143 int status; 1266 int status;
1267 unsigned int backoff;
1268 unsigned int total_backoff = 0;
1144 1269
1145 BUG_ON(!dlm); 1270 BUG_ON(!dlm);
1146 1271
@@ -1172,18 +1297,27 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1172 } 1297 }
1173 1298
1174 do { 1299 do {
1175 unsigned int backoff;
1176 status = dlm_try_to_join_domain(dlm); 1300 status = dlm_try_to_join_domain(dlm);
1177 1301
1178 /* If we're racing another node to the join, then we 1302 /* If we're racing another node to the join, then we
1179 * need to back off temporarily and let them 1303 * need to back off temporarily and let them
1180 * complete. */ 1304 * complete. */
1305#define DLM_JOIN_TIMEOUT_MSECS 90000
1181 if (status == -EAGAIN) { 1306 if (status == -EAGAIN) {
1182 if (signal_pending(current)) { 1307 if (signal_pending(current)) {
1183 status = -ERESTARTSYS; 1308 status = -ERESTARTSYS;
1184 goto bail; 1309 goto bail;
1185 } 1310 }
1186 1311
1312 if (total_backoff >
1313 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1314 status = -ERESTARTSYS;
1315 mlog(ML_NOTICE, "Timed out joining dlm domain "
1316 "%s after %u msecs\n", dlm->name,
1317 jiffies_to_msecs(total_backoff));
1318 goto bail;
1319 }
1320
1187 /* 1321 /*
1188 * <chip> After you! 1322 * <chip> After you!
1189 * <dale> No, after you! 1323 * <dale> No, after you!
@@ -1193,6 +1327,7 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1193 */ 1327 */
1194 backoff = (unsigned int)(jiffies & 0x3); 1328 backoff = (unsigned int)(jiffies & 0x3);
1195 backoff *= DLM_DOMAIN_BACKOFF_MS; 1329 backoff *= DLM_DOMAIN_BACKOFF_MS;
1330 total_backoff += backoff;
1196 mlog(0, "backoff %d\n", backoff); 1331 mlog(0, "backoff %d\n", backoff);
1197 msleep(backoff); 1332 msleep(backoff);
1198 } 1333 }
@@ -1421,21 +1556,21 @@ static int dlm_register_net_handlers(void)
1421 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1556 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1422 sizeof(struct dlm_query_join_request), 1557 sizeof(struct dlm_query_join_request),
1423 dlm_query_join_handler, 1558 dlm_query_join_handler,
1424 NULL, &dlm_join_handlers); 1559 NULL, NULL, &dlm_join_handlers);
1425 if (status) 1560 if (status)
1426 goto bail; 1561 goto bail;
1427 1562
1428 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1563 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1429 sizeof(struct dlm_assert_joined), 1564 sizeof(struct dlm_assert_joined),
1430 dlm_assert_joined_handler, 1565 dlm_assert_joined_handler,
1431 NULL, &dlm_join_handlers); 1566 NULL, NULL, &dlm_join_handlers);
1432 if (status) 1567 if (status)
1433 goto bail; 1568 goto bail;
1434 1569
1435 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1570 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1436 sizeof(struct dlm_cancel_join), 1571 sizeof(struct dlm_cancel_join),
1437 dlm_cancel_join_handler, 1572 dlm_cancel_join_handler,
1438 NULL, &dlm_join_handlers); 1573 NULL, NULL, &dlm_join_handlers);
1439 1574
1440bail: 1575bail:
1441 if (status < 0) 1576 if (status < 0)
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index e5ca3db197f6..52578d907d9a 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
163 kick_thread = 1; 163 kick_thread = 1;
164 } 164 }
165 } 165 }
166 /* reduce the inflight count, this may result in the lockres
167 * being purged below during calc_usage */
168 if (lock->ml.node == dlm->node_num)
169 dlm_lockres_drop_inflight_ref(dlm, res);
166 170
167 spin_unlock(&res->spinlock); 171 spin_unlock(&res->spinlock);
168 wake_up(&res->wq); 172 wake_up(&res->wq);
@@ -437,7 +441,8 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
437 * held on exit: none 441 * held on exit: none
438 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED 442 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
439 */ 443 */
440int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) 444int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
445 void **ret_data)
441{ 446{
442 struct dlm_ctxt *dlm = data; 447 struct dlm_ctxt *dlm = data;
443 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; 448 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0ad872055cb3..77e4e6169a0d 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -99,9 +99,10 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
99 int idx); 99 int idx);
100 100
101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); 101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 102static int dlm_do_assert_master(struct dlm_ctxt *dlm,
103 unsigned int namelen, void *nodemap, 103 struct dlm_lock_resource *res,
104 u32 flags); 104 void *nodemap, u32 flags);
105static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
105 106
106static inline int dlm_mle_equal(struct dlm_ctxt *dlm, 107static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 struct dlm_master_list_entry *mle, 108 struct dlm_master_list_entry *mle,
@@ -237,7 +238,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
237 struct dlm_master_list_entry **mle, 238 struct dlm_master_list_entry **mle,
238 char *name, unsigned int namelen); 239 char *name, unsigned int namelen);
239 240
240static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); 241static int dlm_do_master_request(struct dlm_lock_resource *res,
242 struct dlm_master_list_entry *mle, int to);
241 243
242 244
243static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, 245static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
@@ -687,6 +689,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
687 INIT_LIST_HEAD(&res->purge); 689 INIT_LIST_HEAD(&res->purge);
688 atomic_set(&res->asts_reserved, 0); 690 atomic_set(&res->asts_reserved, 0);
689 res->migration_pending = 0; 691 res->migration_pending = 0;
692 res->inflight_locks = 0;
690 693
691 kref_init(&res->refs); 694 kref_init(&res->refs);
692 695
@@ -700,6 +703,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
700 res->last_used = 0; 703 res->last_used = 0;
701 704
702 memset(res->lvb, 0, DLM_LVB_LEN); 705 memset(res->lvb, 0, DLM_LVB_LEN);
706 memset(res->refmap, 0, sizeof(res->refmap));
703} 707}
704 708
705struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, 709struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
@@ -722,6 +726,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
722 return res; 726 return res;
723} 727}
724 728
729void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
730 struct dlm_lock_resource *res,
731 int new_lockres,
732 const char *file,
733 int line)
734{
735 if (!new_lockres)
736 assert_spin_locked(&res->spinlock);
737
738 if (!test_bit(dlm->node_num, res->refmap)) {
739 BUG_ON(res->inflight_locks != 0);
740 dlm_lockres_set_refmap_bit(dlm->node_num, res);
741 }
742 res->inflight_locks++;
743 mlog(0, "%s:%.*s: inflight++: now %u\n",
744 dlm->name, res->lockname.len, res->lockname.name,
745 res->inflight_locks);
746}
747
748void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
749 struct dlm_lock_resource *res,
750 const char *file,
751 int line)
752{
753 assert_spin_locked(&res->spinlock);
754
755 BUG_ON(res->inflight_locks == 0);
756 res->inflight_locks--;
757 mlog(0, "%s:%.*s: inflight--: now %u\n",
758 dlm->name, res->lockname.len, res->lockname.name,
759 res->inflight_locks);
760 if (res->inflight_locks == 0)
761 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
762 wake_up(&res->wq);
763}
764
725/* 765/*
726 * lookup a lock resource by name. 766 * lookup a lock resource by name.
727 * may already exist in the hashtable. 767 * may already exist in the hashtable.
@@ -752,6 +792,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
752 unsigned int hash; 792 unsigned int hash;
753 int tries = 0; 793 int tries = 0;
754 int bit, wait_on_recovery = 0; 794 int bit, wait_on_recovery = 0;
795 int drop_inflight_if_nonlocal = 0;
755 796
756 BUG_ON(!lockid); 797 BUG_ON(!lockid);
757 798
@@ -761,9 +802,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
761 802
762lookup: 803lookup:
763 spin_lock(&dlm->spinlock); 804 spin_lock(&dlm->spinlock);
764 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); 805 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765 if (tmpres) { 806 if (tmpres) {
807 int dropping_ref = 0;
808
809 spin_lock(&tmpres->spinlock);
810 if (tmpres->owner == dlm->node_num) {
811 BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
812 dlm_lockres_grab_inflight_ref(dlm, tmpres);
813 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
814 dropping_ref = 1;
815 spin_unlock(&tmpres->spinlock);
766 spin_unlock(&dlm->spinlock); 816 spin_unlock(&dlm->spinlock);
817
818 /* wait until done messaging the master, drop our ref to allow
819 * the lockres to be purged, start over. */
820 if (dropping_ref) {
821 spin_lock(&tmpres->spinlock);
822 __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
823 spin_unlock(&tmpres->spinlock);
824 dlm_lockres_put(tmpres);
825 tmpres = NULL;
826 goto lookup;
827 }
828
767 mlog(0, "found in hash!\n"); 829 mlog(0, "found in hash!\n");
768 if (res) 830 if (res)
769 dlm_lockres_put(res); 831 dlm_lockres_put(res);
@@ -793,6 +855,7 @@ lookup:
793 spin_lock(&res->spinlock); 855 spin_lock(&res->spinlock);
794 dlm_change_lockres_owner(dlm, res, dlm->node_num); 856 dlm_change_lockres_owner(dlm, res, dlm->node_num);
795 __dlm_insert_lockres(dlm, res); 857 __dlm_insert_lockres(dlm, res);
858 dlm_lockres_grab_inflight_ref(dlm, res);
796 spin_unlock(&res->spinlock); 859 spin_unlock(&res->spinlock);
797 spin_unlock(&dlm->spinlock); 860 spin_unlock(&dlm->spinlock);
798 /* lockres still marked IN_PROGRESS */ 861 /* lockres still marked IN_PROGRESS */
@@ -805,29 +868,40 @@ lookup:
805 /* if we found a block, wait for lock to be mastered by another node */ 868 /* if we found a block, wait for lock to be mastered by another node */
806 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); 869 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
807 if (blocked) { 870 if (blocked) {
871 int mig;
808 if (mle->type == DLM_MLE_MASTER) { 872 if (mle->type == DLM_MLE_MASTER) {
809 mlog(ML_ERROR, "master entry for nonexistent lock!\n"); 873 mlog(ML_ERROR, "master entry for nonexistent lock!\n");
810 BUG(); 874 BUG();
811 } else if (mle->type == DLM_MLE_MIGRATION) { 875 }
812 /* migration is in progress! */ 876 mig = (mle->type == DLM_MLE_MIGRATION);
813 /* the good news is that we now know the 877 /* if there is a migration in progress, let the migration
814 * "current" master (mle->master). */ 878 * finish before continuing. we can wait for the absence
815 879 * of the MIGRATION mle: either the migrate finished or
880 * one of the nodes died and the mle was cleaned up.
881 * if there is a BLOCK here, but it already has a master
882 * set, we are too late. the master does not have a ref
883 * for us in the refmap. detach the mle and drop it.
884 * either way, go back to the top and start over. */
885 if (mig || mle->master != O2NM_MAX_NODES) {
886 BUG_ON(mig && mle->master == dlm->node_num);
887 /* we arrived too late. the master does not
888 * have a ref for us. retry. */
889 mlog(0, "%s:%.*s: late on %s\n",
890 dlm->name, namelen, lockid,
891 mig ? "MIGRATION" : "BLOCK");
816 spin_unlock(&dlm->master_lock); 892 spin_unlock(&dlm->master_lock);
817 assert_spin_locked(&dlm->spinlock);
818
819 /* set the lockres owner and hash it */
820 spin_lock(&res->spinlock);
821 dlm_set_lockres_owner(dlm, res, mle->master);
822 __dlm_insert_lockres(dlm, res);
823 spin_unlock(&res->spinlock);
824 spin_unlock(&dlm->spinlock); 893 spin_unlock(&dlm->spinlock);
825 894
826 /* master is known, detach */ 895 /* master is known, detach */
827 dlm_mle_detach_hb_events(dlm, mle); 896 if (!mig)
897 dlm_mle_detach_hb_events(dlm, mle);
828 dlm_put_mle(mle); 898 dlm_put_mle(mle);
829 mle = NULL; 899 mle = NULL;
830 goto wake_waiters; 900 /* this is lame, but we cant wait on either
901 * the mle or lockres waitqueue here */
902 if (mig)
903 msleep(100);
904 goto lookup;
831 } 905 }
832 } else { 906 } else {
833 /* go ahead and try to master lock on this node */ 907 /* go ahead and try to master lock on this node */
@@ -858,6 +932,13 @@ lookup:
858 932
859 /* finally add the lockres to its hash bucket */ 933 /* finally add the lockres to its hash bucket */
860 __dlm_insert_lockres(dlm, res); 934 __dlm_insert_lockres(dlm, res);
935 /* since this lockres is new it doesnt not require the spinlock */
936 dlm_lockres_grab_inflight_ref_new(dlm, res);
937
938 /* if this node does not become the master make sure to drop
939 * this inflight reference below */
940 drop_inflight_if_nonlocal = 1;
941
861 /* get an extra ref on the mle in case this is a BLOCK 942 /* get an extra ref on the mle in case this is a BLOCK
862 * if so, the creator of the BLOCK may try to put the last 943 * if so, the creator of the BLOCK may try to put the last
863 * ref at this time in the assert master handler, so we 944 * ref at this time in the assert master handler, so we
@@ -910,7 +991,7 @@ redo_request:
910 ret = -EINVAL; 991 ret = -EINVAL;
911 dlm_node_iter_init(mle->vote_map, &iter); 992 dlm_node_iter_init(mle->vote_map, &iter);
912 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 993 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
913 ret = dlm_do_master_request(mle, nodenum); 994 ret = dlm_do_master_request(res, mle, nodenum);
914 if (ret < 0) 995 if (ret < 0)
915 mlog_errno(ret); 996 mlog_errno(ret);
916 if (mle->master != O2NM_MAX_NODES) { 997 if (mle->master != O2NM_MAX_NODES) {
@@ -960,6 +1041,8 @@ wait:
960 1041
961wake_waiters: 1042wake_waiters:
962 spin_lock(&res->spinlock); 1043 spin_lock(&res->spinlock);
1044 if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
1045 dlm_lockres_drop_inflight_ref(dlm, res);
963 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1046 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
964 spin_unlock(&res->spinlock); 1047 spin_unlock(&res->spinlock);
965 wake_up(&res->wq); 1048 wake_up(&res->wq);
@@ -998,7 +1081,7 @@ recheck:
998 /* this will cause the master to re-assert across 1081 /* this will cause the master to re-assert across
999 * the whole cluster, freeing up mles */ 1082 * the whole cluster, freeing up mles */
1000 if (res->owner != dlm->node_num) { 1083 if (res->owner != dlm->node_num) {
1001 ret = dlm_do_master_request(mle, res->owner); 1084 ret = dlm_do_master_request(res, mle, res->owner);
1002 if (ret < 0) { 1085 if (ret < 0) {
1003 /* give recovery a chance to run */ 1086 /* give recovery a chance to run */
1004 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 1087 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
@@ -1062,6 +1145,8 @@ recheck:
1062 * now tell other nodes that I am 1145 * now tell other nodes that I am
1063 * mastering this. */ 1146 * mastering this. */
1064 mle->master = dlm->node_num; 1147 mle->master = dlm->node_num;
1148 /* ref was grabbed in get_lock_resource
1149 * will be dropped in dlmlock_master */
1065 assert = 1; 1150 assert = 1;
1066 sleep = 0; 1151 sleep = 0;
1067 } 1152 }
@@ -1087,7 +1172,8 @@ recheck:
1087 (atomic_read(&mle->woken) == 1), 1172 (atomic_read(&mle->woken) == 1),
1088 timeo); 1173 timeo);
1089 if (res->owner == O2NM_MAX_NODES) { 1174 if (res->owner == O2NM_MAX_NODES) {
1090 mlog(0, "waiting again\n"); 1175 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1176 res->lockname.len, res->lockname.name);
1091 goto recheck; 1177 goto recheck;
1092 } 1178 }
1093 mlog(0, "done waiting, master is %u\n", res->owner); 1179 mlog(0, "done waiting, master is %u\n", res->owner);
@@ -1100,8 +1186,7 @@ recheck:
1100 m = dlm->node_num; 1186 m = dlm->node_num;
1101 mlog(0, "about to master %.*s here, this=%u\n", 1187 mlog(0, "about to master %.*s here, this=%u\n",
1102 res->lockname.len, res->lockname.name, m); 1188 res->lockname.len, res->lockname.name, m);
1103 ret = dlm_do_assert_master(dlm, res->lockname.name, 1189 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1104 res->lockname.len, mle->vote_map, 0);
1105 if (ret) { 1190 if (ret) {
1106 /* This is a failure in the network path, 1191 /* This is a failure in the network path,
1107 * not in the response to the assert_master 1192 * not in the response to the assert_master
@@ -1117,6 +1202,8 @@ recheck:
1117 1202
1118 /* set the lockres owner */ 1203 /* set the lockres owner */
1119 spin_lock(&res->spinlock); 1204 spin_lock(&res->spinlock);
1205 /* mastery reference obtained either during
1206 * assert_master_handler or in get_lock_resource */
1120 dlm_change_lockres_owner(dlm, res, m); 1207 dlm_change_lockres_owner(dlm, res, m);
1121 spin_unlock(&res->spinlock); 1208 spin_unlock(&res->spinlock);
1122 1209
@@ -1283,7 +1370,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1283 * 1370 *
1284 */ 1371 */
1285 1372
1286static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) 1373static int dlm_do_master_request(struct dlm_lock_resource *res,
1374 struct dlm_master_list_entry *mle, int to)
1287{ 1375{
1288 struct dlm_ctxt *dlm = mle->dlm; 1376 struct dlm_ctxt *dlm = mle->dlm;
1289 struct dlm_master_request request; 1377 struct dlm_master_request request;
@@ -1339,6 +1427,9 @@ again:
1339 case DLM_MASTER_RESP_YES: 1427 case DLM_MASTER_RESP_YES:
1340 set_bit(to, mle->response_map); 1428 set_bit(to, mle->response_map);
1341 mlog(0, "node %u is the master, response=YES\n", to); 1429 mlog(0, "node %u is the master, response=YES\n", to);
1430 mlog(0, "%s:%.*s: master node %u now knows I have a "
1431 "reference\n", dlm->name, res->lockname.len,
1432 res->lockname.name, to);
1342 mle->master = to; 1433 mle->master = to;
1343 break; 1434 break;
1344 case DLM_MASTER_RESP_NO: 1435 case DLM_MASTER_RESP_NO:
@@ -1379,7 +1470,8 @@ out:
1379 * 1470 *
1380 * if possible, TRIM THIS DOWN!!! 1471 * if possible, TRIM THIS DOWN!!!
1381 */ 1472 */
1382int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) 1473int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1474 void **ret_data)
1383{ 1475{
1384 u8 response = DLM_MASTER_RESP_MAYBE; 1476 u8 response = DLM_MASTER_RESP_MAYBE;
1385 struct dlm_ctxt *dlm = data; 1477 struct dlm_ctxt *dlm = data;
@@ -1417,10 +1509,11 @@ way_up_top:
1417 1509
1418 /* take care of the easy cases up front */ 1510 /* take care of the easy cases up front */
1419 spin_lock(&res->spinlock); 1511 spin_lock(&res->spinlock);
1420 if (res->state & DLM_LOCK_RES_RECOVERING) { 1512 if (res->state & (DLM_LOCK_RES_RECOVERING|
1513 DLM_LOCK_RES_MIGRATING)) {
1421 spin_unlock(&res->spinlock); 1514 spin_unlock(&res->spinlock);
1422 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " 1515 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1423 "being recovered\n"); 1516 "being recovered/migrated\n");
1424 response = DLM_MASTER_RESP_ERROR; 1517 response = DLM_MASTER_RESP_ERROR;
1425 if (mle) 1518 if (mle)
1426 kmem_cache_free(dlm_mle_cache, mle); 1519 kmem_cache_free(dlm_mle_cache, mle);
@@ -1428,8 +1521,10 @@ way_up_top:
1428 } 1521 }
1429 1522
1430 if (res->owner == dlm->node_num) { 1523 if (res->owner == dlm->node_num) {
1524 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1525 dlm->name, namelen, name, request->node_idx);
1526 dlm_lockres_set_refmap_bit(request->node_idx, res);
1431 spin_unlock(&res->spinlock); 1527 spin_unlock(&res->spinlock);
1432 // mlog(0, "this node is the master\n");
1433 response = DLM_MASTER_RESP_YES; 1528 response = DLM_MASTER_RESP_YES;
1434 if (mle) 1529 if (mle)
1435 kmem_cache_free(dlm_mle_cache, mle); 1530 kmem_cache_free(dlm_mle_cache, mle);
@@ -1477,7 +1572,6 @@ way_up_top:
1477 mlog(0, "node %u is master, but trying to migrate to " 1572 mlog(0, "node %u is master, but trying to migrate to "
1478 "node %u.\n", tmpmle->master, tmpmle->new_master); 1573 "node %u.\n", tmpmle->master, tmpmle->new_master);
1479 if (tmpmle->master == dlm->node_num) { 1574 if (tmpmle->master == dlm->node_num) {
1480 response = DLM_MASTER_RESP_YES;
1481 mlog(ML_ERROR, "no owner on lockres, but this " 1575 mlog(ML_ERROR, "no owner on lockres, but this "
1482 "node is trying to migrate it to %u?!\n", 1576 "node is trying to migrate it to %u?!\n",
1483 tmpmle->new_master); 1577 tmpmle->new_master);
@@ -1494,6 +1588,10 @@ way_up_top:
1494 * go back and clean the mles on any 1588 * go back and clean the mles on any
1495 * other nodes */ 1589 * other nodes */
1496 dispatch_assert = 1; 1590 dispatch_assert = 1;
1591 dlm_lockres_set_refmap_bit(request->node_idx, res);
1592 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1593 dlm->name, namelen, name,
1594 request->node_idx);
1497 } else 1595 } else
1498 response = DLM_MASTER_RESP_NO; 1596 response = DLM_MASTER_RESP_NO;
1499 } else { 1597 } else {
@@ -1607,17 +1705,24 @@ send_response:
1607 * can periodically run all locks owned by this node 1705 * can periodically run all locks owned by this node
1608 * and re-assert across the cluster... 1706 * and re-assert across the cluster...
1609 */ 1707 */
1610static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 1708int dlm_do_assert_master(struct dlm_ctxt *dlm,
1611 unsigned int namelen, void *nodemap, 1709 struct dlm_lock_resource *res,
1612 u32 flags) 1710 void *nodemap, u32 flags)
1613{ 1711{
1614 struct dlm_assert_master assert; 1712 struct dlm_assert_master assert;
1615 int to, tmpret; 1713 int to, tmpret;
1616 struct dlm_node_iter iter; 1714 struct dlm_node_iter iter;
1617 int ret = 0; 1715 int ret = 0;
1618 int reassert; 1716 int reassert;
1717 const char *lockname = res->lockname.name;
1718 unsigned int namelen = res->lockname.len;
1619 1719
1620 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1720 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1721
1722 spin_lock(&res->spinlock);
1723 res->state |= DLM_LOCK_RES_SETREF_INPROG;
1724 spin_unlock(&res->spinlock);
1725
1621again: 1726again:
1622 reassert = 0; 1727 reassert = 0;
1623 1728
@@ -1647,6 +1752,7 @@ again:
1647 mlog(0, "link to %d went down!\n", to); 1752 mlog(0, "link to %d went down!\n", to);
1648 /* any nonzero status return will do */ 1753 /* any nonzero status return will do */
1649 ret = tmpret; 1754 ret = tmpret;
1755 r = 0;
1650 } else if (r < 0) { 1756 } else if (r < 0) {
1651 /* ok, something horribly messed. kill thyself. */ 1757 /* ok, something horribly messed. kill thyself. */
1652 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1758 mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1661,17 +1767,39 @@ again:
1661 spin_unlock(&dlm->master_lock); 1767 spin_unlock(&dlm->master_lock);
1662 spin_unlock(&dlm->spinlock); 1768 spin_unlock(&dlm->spinlock);
1663 BUG(); 1769 BUG();
1664 } else if (r == EAGAIN) { 1770 }
1771
1772 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1773 !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1774 mlog(ML_ERROR, "%.*s: very strange, "
1775 "master MLE but no lockres on %u\n",
1776 namelen, lockname, to);
1777 }
1778
1779 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1665 mlog(0, "%.*s: node %u create mles on other " 1780 mlog(0, "%.*s: node %u create mles on other "
1666 "nodes and requests a re-assert\n", 1781 "nodes and requests a re-assert\n",
1667 namelen, lockname, to); 1782 namelen, lockname, to);
1668 reassert = 1; 1783 reassert = 1;
1669 } 1784 }
1785 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1786 mlog(0, "%.*s: node %u has a reference to this "
1787 "lockres, set the bit in the refmap\n",
1788 namelen, lockname, to);
1789 spin_lock(&res->spinlock);
1790 dlm_lockres_set_refmap_bit(to, res);
1791 spin_unlock(&res->spinlock);
1792 }
1670 } 1793 }
1671 1794
1672 if (reassert) 1795 if (reassert)
1673 goto again; 1796 goto again;
1674 1797
1798 spin_lock(&res->spinlock);
1799 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1800 spin_unlock(&res->spinlock);
1801 wake_up(&res->wq);
1802
1675 return ret; 1803 return ret;
1676} 1804}
1677 1805
@@ -1684,7 +1812,8 @@ again:
1684 * 1812 *
1685 * if possible, TRIM THIS DOWN!!! 1813 * if possible, TRIM THIS DOWN!!!
1686 */ 1814 */
1687int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) 1815int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1816 void **ret_data)
1688{ 1817{
1689 struct dlm_ctxt *dlm = data; 1818 struct dlm_ctxt *dlm = data;
1690 struct dlm_master_list_entry *mle = NULL; 1819 struct dlm_master_list_entry *mle = NULL;
@@ -1693,7 +1822,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1693 char *name; 1822 char *name;
1694 unsigned int namelen, hash; 1823 unsigned int namelen, hash;
1695 u32 flags; 1824 u32 flags;
1696 int master_request = 0; 1825 int master_request = 0, have_lockres_ref = 0;
1697 int ret = 0; 1826 int ret = 0;
1698 1827
1699 if (!dlm_grab(dlm)) 1828 if (!dlm_grab(dlm))
@@ -1851,6 +1980,7 @@ ok:
1851 spin_unlock(&mle->spinlock); 1980 spin_unlock(&mle->spinlock);
1852 1981
1853 if (res) { 1982 if (res) {
1983 int wake = 0;
1854 spin_lock(&res->spinlock); 1984 spin_lock(&res->spinlock);
1855 if (mle->type == DLM_MLE_MIGRATION) { 1985 if (mle->type == DLM_MLE_MIGRATION) {
1856 mlog(0, "finishing off migration of lockres %.*s, " 1986 mlog(0, "finishing off migration of lockres %.*s, "
@@ -1858,12 +1988,16 @@ ok:
1858 res->lockname.len, res->lockname.name, 1988 res->lockname.len, res->lockname.name,
1859 dlm->node_num, mle->new_master); 1989 dlm->node_num, mle->new_master);
1860 res->state &= ~DLM_LOCK_RES_MIGRATING; 1990 res->state &= ~DLM_LOCK_RES_MIGRATING;
1991 wake = 1;
1861 dlm_change_lockres_owner(dlm, res, mle->new_master); 1992 dlm_change_lockres_owner(dlm, res, mle->new_master);
1862 BUG_ON(res->state & DLM_LOCK_RES_DIRTY); 1993 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1863 } else { 1994 } else {
1864 dlm_change_lockres_owner(dlm, res, mle->master); 1995 dlm_change_lockres_owner(dlm, res, mle->master);
1865 } 1996 }
1866 spin_unlock(&res->spinlock); 1997 spin_unlock(&res->spinlock);
1998 have_lockres_ref = 1;
1999 if (wake)
2000 wake_up(&res->wq);
1867 } 2001 }
1868 2002
1869 /* master is known, detach if not already detached. 2003 /* master is known, detach if not already detached.
@@ -1913,12 +2047,28 @@ ok:
1913 2047
1914done: 2048done:
1915 ret = 0; 2049 ret = 0;
1916 if (res) 2050 if (res) {
1917 dlm_lockres_put(res); 2051 spin_lock(&res->spinlock);
2052 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2053 spin_unlock(&res->spinlock);
2054 *ret_data = (void *)res;
2055 }
1918 dlm_put(dlm); 2056 dlm_put(dlm);
1919 if (master_request) { 2057 if (master_request) {
1920 mlog(0, "need to tell master to reassert\n"); 2058 mlog(0, "need to tell master to reassert\n");
1921 ret = EAGAIN; // positive. negative would shoot down the node. 2059 /* positive. negative would shoot down the node. */
2060 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2061 if (!have_lockres_ref) {
2062 mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2063 "mle present here for %s:%.*s, but no lockres!\n",
2064 assert->node_idx, dlm->name, namelen, name);
2065 }
2066 }
2067 if (have_lockres_ref) {
2068 /* let the master know we have a reference to the lockres */
2069 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2070 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2071 dlm->name, namelen, name, assert->node_idx);
1922 } 2072 }
1923 return ret; 2073 return ret;
1924 2074
@@ -1929,11 +2079,25 @@ kill:
1929 __dlm_print_one_lock_resource(res); 2079 __dlm_print_one_lock_resource(res);
1930 spin_unlock(&res->spinlock); 2080 spin_unlock(&res->spinlock);
1931 spin_unlock(&dlm->spinlock); 2081 spin_unlock(&dlm->spinlock);
1932 dlm_lockres_put(res); 2082 *ret_data = (void *)res;
1933 dlm_put(dlm); 2083 dlm_put(dlm);
1934 return -EINVAL; 2084 return -EINVAL;
1935} 2085}
1936 2086
2087void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2088{
2089 struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2090
2091 if (ret_data) {
2092 spin_lock(&res->spinlock);
2093 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2094 spin_unlock(&res->spinlock);
2095 wake_up(&res->wq);
2096 dlm_lockres_put(res);
2097 }
2098 return;
2099}
2100
1937int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, 2101int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1938 struct dlm_lock_resource *res, 2102 struct dlm_lock_resource *res,
1939 int ignore_higher, u8 request_from, u32 flags) 2103 int ignore_higher, u8 request_from, u32 flags)
@@ -2023,9 +2187,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2023 * even if one or more nodes die */ 2187 * even if one or more nodes die */
2024 mlog(0, "worker about to master %.*s here, this=%u\n", 2188 mlog(0, "worker about to master %.*s here, this=%u\n",
2025 res->lockname.len, res->lockname.name, dlm->node_num); 2189 res->lockname.len, res->lockname.name, dlm->node_num);
2026 ret = dlm_do_assert_master(dlm, res->lockname.name, 2190 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2027 res->lockname.len,
2028 nodemap, flags);
2029 if (ret < 0) { 2191 if (ret < 0) {
2030 /* no need to restart, we are done */ 2192 /* no need to restart, we are done */
2031 if (!dlm_is_host_down(ret)) 2193 if (!dlm_is_host_down(ret))
@@ -2097,14 +2259,180 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2097 return ret; 2259 return ret;
2098} 2260}
2099 2261
2262/*
2263 * DLM_DEREF_LOCKRES_MSG
2264 */
2265
2266int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2267{
2268 struct dlm_deref_lockres deref;
2269 int ret = 0, r;
2270 const char *lockname;
2271 unsigned int namelen;
2272
2273 lockname = res->lockname.name;
2274 namelen = res->lockname.len;
2275 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2276
2277 mlog(0, "%s:%.*s: sending deref to %d\n",
2278 dlm->name, namelen, lockname, res->owner);
2279 memset(&deref, 0, sizeof(deref));
2280 deref.node_idx = dlm->node_num;
2281 deref.namelen = namelen;
2282 memcpy(deref.name, lockname, namelen);
2283
2284 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2285 &deref, sizeof(deref), res->owner, &r);
2286 if (ret < 0)
2287 mlog_errno(ret);
2288 else if (r < 0) {
2289 /* BAD. other node says I did not have a ref. */
2290 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2291 "(master=%u) got %d.\n", dlm->name, namelen,
2292 lockname, res->owner, r);
2293 dlm_print_one_lock_resource(res);
2294 BUG();
2295 }
2296 return ret;
2297}
2298
2299int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2300 void **ret_data)
2301{
2302 struct dlm_ctxt *dlm = data;
2303 struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2304 struct dlm_lock_resource *res = NULL;
2305 char *name;
2306 unsigned int namelen;
2307 int ret = -EINVAL;
2308 u8 node;
2309 unsigned int hash;
2310 struct dlm_work_item *item;
2311 int cleared = 0;
2312 int dispatch = 0;
2313
2314 if (!dlm_grab(dlm))
2315 return 0;
2316
2317 name = deref->name;
2318 namelen = deref->namelen;
2319 node = deref->node_idx;
2320
2321 if (namelen > DLM_LOCKID_NAME_MAX) {
2322 mlog(ML_ERROR, "Invalid name length!");
2323 goto done;
2324 }
2325 if (deref->node_idx >= O2NM_MAX_NODES) {
2326 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2327 goto done;
2328 }
2329
2330 hash = dlm_lockid_hash(name, namelen);
2331
2332 spin_lock(&dlm->spinlock);
2333 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2334 if (!res) {
2335 spin_unlock(&dlm->spinlock);
2336 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2337 dlm->name, namelen, name);
2338 goto done;
2339 }
2340 spin_unlock(&dlm->spinlock);
2341
2342 spin_lock(&res->spinlock);
2343 if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2344 dispatch = 1;
2345 else {
2346 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2347 if (test_bit(node, res->refmap)) {
2348 dlm_lockres_clear_refmap_bit(node, res);
2349 cleared = 1;
2350 }
2351 }
2352 spin_unlock(&res->spinlock);
2353
2354 if (!dispatch) {
2355 if (cleared)
2356 dlm_lockres_calc_usage(dlm, res);
2357 else {
2358 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2359 "but it is already dropped!\n", dlm->name,
2360 res->lockname.len, res->lockname.name, node);
2361 __dlm_print_one_lock_resource(res);
2362 }
2363 ret = 0;
2364 goto done;
2365 }
2366
2367 item = kzalloc(sizeof(*item), GFP_NOFS);
2368 if (!item) {
2369 ret = -ENOMEM;
2370 mlog_errno(ret);
2371 goto done;
2372 }
2373
2374 dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2375 item->u.dl.deref_res = res;
2376 item->u.dl.deref_node = node;
2377
2378 spin_lock(&dlm->work_lock);
2379 list_add_tail(&item->list, &dlm->work_list);
2380 spin_unlock(&dlm->work_lock);
2381
2382 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2383 return 0;
2384
2385done:
2386 if (res)
2387 dlm_lockres_put(res);
2388 dlm_put(dlm);
2389
2390 return ret;
2391}
2392
2393static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2394{
2395 struct dlm_ctxt *dlm;
2396 struct dlm_lock_resource *res;
2397 u8 node;
2398 u8 cleared = 0;
2399
2400 dlm = item->dlm;
2401 res = item->u.dl.deref_res;
2402 node = item->u.dl.deref_node;
2403
2404 spin_lock(&res->spinlock);
2405 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2406 if (test_bit(node, res->refmap)) {
2407 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2408 dlm_lockres_clear_refmap_bit(node, res);
2409 cleared = 1;
2410 }
2411 spin_unlock(&res->spinlock);
2412
2413 if (cleared) {
2414 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2415 dlm->name, res->lockname.len, res->lockname.name, node);
2416 dlm_lockres_calc_usage(dlm, res);
2417 } else {
2418 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2419 "but it is already dropped!\n", dlm->name,
2420 res->lockname.len, res->lockname.name, node);
2421 __dlm_print_one_lock_resource(res);
2422 }
2423
2424 dlm_lockres_put(res);
2425}
2426
2100 2427
2101/* 2428/*
2102 * DLM_MIGRATE_LOCKRES 2429 * DLM_MIGRATE_LOCKRES
2103 */ 2430 */
2104 2431
2105 2432
2106int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 2433static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2107 u8 target) 2434 struct dlm_lock_resource *res,
2435 u8 target)
2108{ 2436{
2109 struct dlm_master_list_entry *mle = NULL; 2437 struct dlm_master_list_entry *mle = NULL;
2110 struct dlm_master_list_entry *oldmle = NULL; 2438 struct dlm_master_list_entry *oldmle = NULL;
@@ -2116,7 +2444,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2116 struct list_head *queue, *iter; 2444 struct list_head *queue, *iter;
2117 int i; 2445 int i;
2118 struct dlm_lock *lock; 2446 struct dlm_lock *lock;
2119 int empty = 1; 2447 int empty = 1, wake = 0;
2120 2448
2121 if (!dlm_grab(dlm)) 2449 if (!dlm_grab(dlm))
2122 return -EINVAL; 2450 return -EINVAL;
@@ -2241,6 +2569,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2241 res->lockname.name, target); 2569 res->lockname.name, target);
2242 spin_lock(&res->spinlock); 2570 spin_lock(&res->spinlock);
2243 res->state &= ~DLM_LOCK_RES_MIGRATING; 2571 res->state &= ~DLM_LOCK_RES_MIGRATING;
2572 wake = 1;
2244 spin_unlock(&res->spinlock); 2573 spin_unlock(&res->spinlock);
2245 ret = -EINVAL; 2574 ret = -EINVAL;
2246 } 2575 }
@@ -2268,6 +2597,9 @@ fail:
2268 * the lockres 2597 * the lockres
2269 */ 2598 */
2270 2599
2600 /* now that remote nodes are spinning on the MIGRATING flag,
2601 * ensure that all assert_master work is flushed. */
2602 flush_workqueue(dlm->dlm_worker);
2271 2603
2272 /* get an extra reference on the mle. 2604 /* get an extra reference on the mle.
2273 * otherwise the assert_master from the new 2605 * otherwise the assert_master from the new
@@ -2296,6 +2628,7 @@ fail:
2296 dlm_put_mle_inuse(mle); 2628 dlm_put_mle_inuse(mle);
2297 spin_lock(&res->spinlock); 2629 spin_lock(&res->spinlock);
2298 res->state &= ~DLM_LOCK_RES_MIGRATING; 2630 res->state &= ~DLM_LOCK_RES_MIGRATING;
2631 wake = 1;
2299 spin_unlock(&res->spinlock); 2632 spin_unlock(&res->spinlock);
2300 goto leave; 2633 goto leave;
2301 } 2634 }
@@ -2322,7 +2655,8 @@ fail:
2322 res->owner == target) 2655 res->owner == target)
2323 break; 2656 break;
2324 2657
2325 mlog(0, "timed out during migration\n"); 2658 mlog(0, "%s:%.*s: timed out during migration\n",
2659 dlm->name, res->lockname.len, res->lockname.name);
2326 /* avoid hang during shutdown when migrating lockres 2660 /* avoid hang during shutdown when migrating lockres
2327 * to a node which also goes down */ 2661 * to a node which also goes down */
2328 if (dlm_is_node_dead(dlm, target)) { 2662 if (dlm_is_node_dead(dlm, target)) {
@@ -2330,20 +2664,20 @@ fail:
2330 "target %u is no longer up, restarting\n", 2664 "target %u is no longer up, restarting\n",
2331 dlm->name, res->lockname.len, 2665 dlm->name, res->lockname.len,
2332 res->lockname.name, target); 2666 res->lockname.name, target);
2333 ret = -ERESTARTSYS; 2667 ret = -EINVAL;
2668 /* migration failed, detach and clean up mle */
2669 dlm_mle_detach_hb_events(dlm, mle);
2670 dlm_put_mle(mle);
2671 dlm_put_mle_inuse(mle);
2672 spin_lock(&res->spinlock);
2673 res->state &= ~DLM_LOCK_RES_MIGRATING;
2674 wake = 1;
2675 spin_unlock(&res->spinlock);
2676 goto leave;
2334 } 2677 }
2335 } 2678 } else
2336 if (ret == -ERESTARTSYS) { 2679 mlog(0, "%s:%.*s: caught signal during migration\n",
2337 /* migration failed, detach and clean up mle */ 2680 dlm->name, res->lockname.len, res->lockname.name);
2338 dlm_mle_detach_hb_events(dlm, mle);
2339 dlm_put_mle(mle);
2340 dlm_put_mle_inuse(mle);
2341 spin_lock(&res->spinlock);
2342 res->state &= ~DLM_LOCK_RES_MIGRATING;
2343 spin_unlock(&res->spinlock);
2344 goto leave;
2345 }
2346 /* TODO: if node died: stop, clean up, return error */
2347 } 2681 }
2348 2682
2349 /* all done, set the owner, clear the flag */ 2683 /* all done, set the owner, clear the flag */
@@ -2366,6 +2700,11 @@ leave:
2366 if (ret < 0) 2700 if (ret < 0)
2367 dlm_kick_thread(dlm, res); 2701 dlm_kick_thread(dlm, res);
2368 2702
2703 /* wake up waiters if the MIGRATING flag got set
2704 * but migration failed */
2705 if (wake)
2706 wake_up(&res->wq);
2707
2369 /* TODO: cleanup */ 2708 /* TODO: cleanup */
2370 if (mres) 2709 if (mres)
2371 free_page((unsigned long)mres); 2710 free_page((unsigned long)mres);
@@ -2376,6 +2715,53 @@ leave:
2376 return ret; 2715 return ret;
2377} 2716}
2378 2717
2718#define DLM_MIGRATION_RETRY_MS 100
2719
2720/* Should be called only after beginning the domain leave process.
2721 * There should not be any remaining locks on nonlocal lock resources,
2722 * and there should be no local locks left on locally mastered resources.
2723 *
2724 * Called with the dlm spinlock held, may drop it to do migration, but
2725 * will re-acquire before exit.
2726 *
2727 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2728int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2729{
2730 int ret;
2731 int lock_dropped = 0;
2732
2733 if (res->owner != dlm->node_num) {
2734 if (!__dlm_lockres_unused(res)) {
2735 mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2736 "trying to free this but locks remain\n",
2737 dlm->name, res->lockname.len, res->lockname.name);
2738 }
2739 goto leave;
2740 }
2741
2742 /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2743 spin_unlock(&dlm->spinlock);
2744 lock_dropped = 1;
2745 while (1) {
2746 ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2747 if (ret >= 0)
2748 break;
2749 if (ret == -ENOTEMPTY) {
2750 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2751 res->lockname.len, res->lockname.name);
2752 BUG();
2753 }
2754
2755 mlog(0, "lockres %.*s: migrate failed, "
2756 "retrying\n", res->lockname.len,
2757 res->lockname.name);
2758 msleep(DLM_MIGRATION_RETRY_MS);
2759 }
2760 spin_lock(&dlm->spinlock);
2761leave:
2762 return lock_dropped;
2763}
2764
2379int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) 2765int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2380{ 2766{
2381 int ret; 2767 int ret;
@@ -2405,7 +2791,8 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2405 return can_proceed; 2791 return can_proceed;
2406} 2792}
2407 2793
2408int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 2794static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2795 struct dlm_lock_resource *res)
2409{ 2796{
2410 int ret; 2797 int ret;
2411 spin_lock(&res->spinlock); 2798 spin_lock(&res->spinlock);
@@ -2434,8 +2821,15 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2434 __dlm_lockres_reserve_ast(res); 2821 __dlm_lockres_reserve_ast(res);
2435 spin_unlock(&res->spinlock); 2822 spin_unlock(&res->spinlock);
2436 2823
2437 /* now flush all the pending asts.. hang out for a bit */ 2824 /* now flush all the pending asts */
2438 dlm_kick_thread(dlm, res); 2825 dlm_kick_thread(dlm, res);
2826 /* before waiting on DIRTY, block processes which may
2827 * try to dirty the lockres before MIGRATING is set */
2828 spin_lock(&res->spinlock);
2829 BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2830 res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2831 spin_unlock(&res->spinlock);
2832 /* now wait on any pending asts and the DIRTY state */
2439 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); 2833 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2440 dlm_lockres_release_ast(dlm, res); 2834 dlm_lockres_release_ast(dlm, res);
2441 2835
@@ -2461,6 +2855,13 @@ again:
2461 mlog(0, "trying again...\n"); 2855 mlog(0, "trying again...\n");
2462 goto again; 2856 goto again;
2463 } 2857 }
2858 /* now that we are sure the MIGRATING state is there, drop
2859 * the unneded state which blocked threads trying to DIRTY */
2860 spin_lock(&res->spinlock);
2861 BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2862 BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2863 res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2864 spin_unlock(&res->spinlock);
2464 2865
2465 /* did the target go down or die? */ 2866 /* did the target go down or die? */
2466 spin_lock(&dlm->spinlock); 2867 spin_lock(&dlm->spinlock);
@@ -2490,7 +2891,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2490{ 2891{
2491 struct list_head *iter, *iter2; 2892 struct list_head *iter, *iter2;
2492 struct list_head *queue = &res->granted; 2893 struct list_head *queue = &res->granted;
2493 int i; 2894 int i, bit;
2494 struct dlm_lock *lock; 2895 struct dlm_lock *lock;
2495 2896
2496 assert_spin_locked(&res->spinlock); 2897 assert_spin_locked(&res->spinlock);
@@ -2508,12 +2909,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2508 BUG_ON(!list_empty(&lock->bast_list)); 2909 BUG_ON(!list_empty(&lock->bast_list));
2509 BUG_ON(lock->ast_pending); 2910 BUG_ON(lock->ast_pending);
2510 BUG_ON(lock->bast_pending); 2911 BUG_ON(lock->bast_pending);
2912 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2511 list_del_init(&lock->list); 2913 list_del_init(&lock->list);
2512 dlm_lock_put(lock); 2914 dlm_lock_put(lock);
2513 } 2915 }
2514 } 2916 }
2515 queue++; 2917 queue++;
2516 } 2918 }
2919 bit = 0;
2920 while (1) {
2921 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2922 if (bit >= O2NM_MAX_NODES)
2923 break;
2924 /* do not clear the local node reference, if there is a
2925 * process holding this, let it drop the ref itself */
2926 if (bit != dlm->node_num) {
2927 mlog(0, "%s:%.*s: node %u had a ref to this "
2928 "migrating lockres, clearing\n", dlm->name,
2929 res->lockname.len, res->lockname.name, bit);
2930 dlm_lockres_clear_refmap_bit(bit, res);
2931 }
2932 bit++;
2933 }
2517} 2934}
2518 2935
2519/* for now this is not too intelligent. we will 2936/* for now this is not too intelligent. we will
@@ -2601,6 +3018,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2601 mlog(0, "migrate request (node %u) returned %d!\n", 3018 mlog(0, "migrate request (node %u) returned %d!\n",
2602 nodenum, status); 3019 nodenum, status);
2603 ret = status; 3020 ret = status;
3021 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3022 /* during the migration request we short-circuited
3023 * the mastery of the lockres. make sure we have
3024 * a mastery ref for nodenum */
3025 mlog(0, "%s:%.*s: need ref for node %u\n",
3026 dlm->name, res->lockname.len, res->lockname.name,
3027 nodenum);
3028 spin_lock(&res->spinlock);
3029 dlm_lockres_set_refmap_bit(nodenum, res);
3030 spin_unlock(&res->spinlock);
2604 } 3031 }
2605 } 3032 }
2606 3033
@@ -2619,7 +3046,8 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2619 * we will have no mle in the list to start with. now we can add an mle for 3046 * we will have no mle in the list to start with. now we can add an mle for
2620 * the migration and this should be the only one found for those scanning the 3047 * the migration and this should be the only one found for those scanning the
2621 * list. */ 3048 * list. */
2622int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) 3049int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3050 void **ret_data)
2623{ 3051{
2624 struct dlm_ctxt *dlm = data; 3052 struct dlm_ctxt *dlm = data;
2625 struct dlm_lock_resource *res = NULL; 3053 struct dlm_lock_resource *res = NULL;
@@ -2745,7 +3173,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2745 /* remove it from the list so that only one 3173 /* remove it from the list so that only one
2746 * mle will be found */ 3174 * mle will be found */
2747 list_del_init(&tmp->list); 3175 list_del_init(&tmp->list);
2748 __dlm_mle_detach_hb_events(dlm, mle); 3176 /* this was obviously WRONG. mle is uninited here. should be tmp. */
3177 __dlm_mle_detach_hb_events(dlm, tmp);
3178 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3179 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3180 "telling master to get ref for cleared out mle "
3181 "during migration\n", dlm->name, namelen, name,
3182 master, new_master);
2749 } 3183 }
2750 spin_unlock(&tmp->spinlock); 3184 spin_unlock(&tmp->spinlock);
2751 } 3185 }
@@ -2753,6 +3187,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2753 /* now add a migration mle to the tail of the list */ 3187 /* now add a migration mle to the tail of the list */
2754 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); 3188 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2755 mle->new_master = new_master; 3189 mle->new_master = new_master;
3190 /* the new master will be sending an assert master for this.
3191 * at that point we will get the refmap reference */
2756 mle->master = master; 3192 mle->master = master;
2757 /* do this for consistency with other mle types */ 3193 /* do this for consistency with other mle types */
2758 set_bit(new_master, mle->maybe_map); 3194 set_bit(new_master, mle->maybe_map);
@@ -2902,6 +3338,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2902 clear_bit(dlm->node_num, iter.node_map); 3338 clear_bit(dlm->node_num, iter.node_map);
2903 spin_unlock(&dlm->spinlock); 3339 spin_unlock(&dlm->spinlock);
2904 3340
3341 /* ownership of the lockres is changing. account for the
3342 * mastery reference here since old_master will briefly have
3343 * a reference after the migration completes */
3344 spin_lock(&res->spinlock);
3345 dlm_lockres_set_refmap_bit(old_master, res);
3346 spin_unlock(&res->spinlock);
3347
2905 mlog(0, "now time to do a migrate request to other nodes\n"); 3348 mlog(0, "now time to do a migrate request to other nodes\n");
2906 ret = dlm_do_migrate_request(dlm, res, old_master, 3349 ret = dlm_do_migrate_request(dlm, res, old_master,
2907 dlm->node_num, &iter); 3350 dlm->node_num, &iter);
@@ -2914,8 +3357,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2914 res->lockname.len, res->lockname.name); 3357 res->lockname.len, res->lockname.name);
2915 /* this call now finishes out the nodemap 3358 /* this call now finishes out the nodemap
2916 * even if one or more nodes die */ 3359 * even if one or more nodes die */
2917 ret = dlm_do_assert_master(dlm, res->lockname.name, 3360 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2918 res->lockname.len, iter.node_map,
2919 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3361 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2920 if (ret < 0) { 3362 if (ret < 0) {
2921 /* no longer need to retry. all living nodes contacted. */ 3363 /* no longer need to retry. all living nodes contacted. */
@@ -2927,8 +3369,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2927 set_bit(old_master, iter.node_map); 3369 set_bit(old_master, iter.node_map);
2928 mlog(0, "doing assert master of %.*s back to %u\n", 3370 mlog(0, "doing assert master of %.*s back to %u\n",
2929 res->lockname.len, res->lockname.name, old_master); 3371 res->lockname.len, res->lockname.name, old_master);
2930 ret = dlm_do_assert_master(dlm, res->lockname.name, 3372 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2931 res->lockname.len, iter.node_map,
2932 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3373 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2933 if (ret < 0) { 3374 if (ret < 0) {
2934 mlog(0, "assert master to original master failed " 3375 mlog(0, "assert master to original master failed "
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 367a11e9e2ed..6d4a83d50152 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -163,9 +163,6 @@ void dlm_dispatch_work(struct work_struct *work)
163 dlm_workfunc_t *workfunc; 163 dlm_workfunc_t *workfunc;
164 int tot=0; 164 int tot=0;
165 165
166 if (!dlm_joined(dlm))
167 return;
168
169 spin_lock(&dlm->work_lock); 166 spin_lock(&dlm->work_lock);
170 list_splice_init(&dlm->work_list, &tmp_list); 167 list_splice_init(&dlm->work_list, &tmp_list);
171 spin_unlock(&dlm->work_lock); 168 spin_unlock(&dlm->work_lock);
@@ -821,7 +818,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
821 818
822} 819}
823 820
824int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) 821int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
822 void **ret_data)
825{ 823{
826 struct dlm_ctxt *dlm = data; 824 struct dlm_ctxt *dlm = data;
827 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; 825 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -978,7 +976,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
978} 976}
979 977
980 978
981int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) 979int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
980 void **ret_data)
982{ 981{
983 struct dlm_ctxt *dlm = data; 982 struct dlm_ctxt *dlm = data;
984 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; 983 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
@@ -1129,6 +1128,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1129 if (total_locks == mres_total_locks) 1128 if (total_locks == mres_total_locks)
1130 mres->flags |= DLM_MRES_ALL_DONE; 1129 mres->flags |= DLM_MRES_ALL_DONE;
1131 1130
1131 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
1132 dlm->name, res->lockname.len, res->lockname.name,
1133 orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
1134 send_to);
1135
1132 /* send it */ 1136 /* send it */
1133 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, 1137 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
1134 sz, send_to, &status); 1138 sz, send_to, &status);
@@ -1213,6 +1217,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1213 return 0; 1217 return 0;
1214} 1218}
1215 1219
1220static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
1221 struct dlm_migratable_lockres *mres)
1222{
1223 struct dlm_lock dummy;
1224 memset(&dummy, 0, sizeof(dummy));
1225 dummy.ml.cookie = 0;
1226 dummy.ml.type = LKM_IVMODE;
1227 dummy.ml.convert_type = LKM_IVMODE;
1228 dummy.ml.highest_blocked = LKM_IVMODE;
1229 dummy.lksb = NULL;
1230 dummy.ml.node = dlm->node_num;
1231 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
1232}
1233
1234static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
1235 struct dlm_migratable_lock *ml,
1236 u8 *nodenum)
1237{
1238 if (unlikely(ml->cookie == 0 &&
1239 ml->type == LKM_IVMODE &&
1240 ml->convert_type == LKM_IVMODE &&
1241 ml->highest_blocked == LKM_IVMODE &&
1242 ml->list == DLM_BLOCKED_LIST)) {
1243 *nodenum = ml->node;
1244 return 1;
1245 }
1246 return 0;
1247}
1216 1248
1217int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1249int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1218 struct dlm_migratable_lockres *mres, 1250 struct dlm_migratable_lockres *mres,
@@ -1260,6 +1292,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1260 goto error; 1292 goto error;
1261 } 1293 }
1262 } 1294 }
1295 if (total_locks == 0) {
1296 /* send a dummy lock to indicate a mastery reference only */
1297 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
1298 dlm->name, res->lockname.len, res->lockname.name,
1299 send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
1300 "migration");
1301 dlm_add_dummy_lock(dlm, mres);
1302 }
1263 /* flush any remaining locks */ 1303 /* flush any remaining locks */
1264 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1304 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1265 if (ret < 0) 1305 if (ret < 0)
@@ -1293,7 +1333,8 @@ error:
1293 * do we spin? returning an error only delays the problem really 1333 * do we spin? returning an error only delays the problem really
1294 */ 1334 */
1295 1335
1296int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) 1336int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
1337 void **ret_data)
1297{ 1338{
1298 struct dlm_ctxt *dlm = data; 1339 struct dlm_ctxt *dlm = data;
1299 struct dlm_migratable_lockres *mres = 1340 struct dlm_migratable_lockres *mres =
@@ -1382,17 +1423,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1382 spin_lock(&res->spinlock); 1423 spin_lock(&res->spinlock);
1383 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1424 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1384 spin_unlock(&res->spinlock); 1425 spin_unlock(&res->spinlock);
1426 wake_up(&res->wq);
1385 1427
1386 /* add an extra ref for just-allocated lockres 1428 /* add an extra ref for just-allocated lockres
1387 * otherwise the lockres will be purged immediately */ 1429 * otherwise the lockres will be purged immediately */
1388 dlm_lockres_get(res); 1430 dlm_lockres_get(res);
1389
1390 } 1431 }
1391 1432
1392 /* at this point we have allocated everything we need, 1433 /* at this point we have allocated everything we need,
1393 * and we have a hashed lockres with an extra ref and 1434 * and we have a hashed lockres with an extra ref and
1394 * the proper res->state flags. */ 1435 * the proper res->state flags. */
1395 ret = 0; 1436 ret = 0;
1437 spin_lock(&res->spinlock);
1438 /* drop this either when master requery finds a different master
1439 * or when a lock is added by the recovery worker */
1440 dlm_lockres_grab_inflight_ref(dlm, res);
1396 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1441 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1397 /* migration cannot have an unknown master */ 1442 /* migration cannot have an unknown master */
1398 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1443 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1445,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1400 "unknown owner.. will need to requery: " 1445 "unknown owner.. will need to requery: "
1401 "%.*s\n", mres->lockname_len, mres->lockname); 1446 "%.*s\n", mres->lockname_len, mres->lockname);
1402 } else { 1447 } else {
1403 spin_lock(&res->spinlock); 1448 /* take a reference now to pin the lockres, drop it
1449 * when locks are added in the worker */
1404 dlm_change_lockres_owner(dlm, res, dlm->node_num); 1450 dlm_change_lockres_owner(dlm, res, dlm->node_num);
1405 spin_unlock(&res->spinlock);
1406 } 1451 }
1452 spin_unlock(&res->spinlock);
1407 1453
1408 /* queue up work for dlm_mig_lockres_worker */ 1454 /* queue up work for dlm_mig_lockres_worker */
1409 dlm_grab(dlm); /* get an extra ref for the work item */ 1455 dlm_grab(dlm); /* get an extra ref for the work item */
@@ -1459,6 +1505,9 @@ again:
1459 "this node will take it.\n", 1505 "this node will take it.\n",
1460 res->lockname.len, res->lockname.name); 1506 res->lockname.len, res->lockname.name);
1461 } else { 1507 } else {
1508 spin_lock(&res->spinlock);
1509 dlm_lockres_drop_inflight_ref(dlm, res);
1510 spin_unlock(&res->spinlock);
1462 mlog(0, "master needs to respond to sender " 1511 mlog(0, "master needs to respond to sender "
1463 "that node %u still owns %.*s\n", 1512 "that node %u still owns %.*s\n",
1464 real_master, res->lockname.len, 1513 real_master, res->lockname.len,
@@ -1578,7 +1627,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1578/* this function cannot error, so unless the sending 1627/* this function cannot error, so unless the sending
1579 * or receiving of the message failed, the owner can 1628 * or receiving of the message failed, the owner can
1580 * be trusted */ 1629 * be trusted */
1581int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) 1630int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
1631 void **ret_data)
1582{ 1632{
1583 struct dlm_ctxt *dlm = data; 1633 struct dlm_ctxt *dlm = data;
1584 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1634 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
@@ -1660,21 +1710,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1660{ 1710{
1661 struct dlm_migratable_lock *ml; 1711 struct dlm_migratable_lock *ml;
1662 struct list_head *queue; 1712 struct list_head *queue;
1713 struct list_head *tmpq = NULL;
1663 struct dlm_lock *newlock = NULL; 1714 struct dlm_lock *newlock = NULL;
1664 struct dlm_lockstatus *lksb = NULL; 1715 struct dlm_lockstatus *lksb = NULL;
1665 int ret = 0; 1716 int ret = 0;
1666 int i, bad; 1717 int i, j, bad;
1667 struct list_head *iter; 1718 struct list_head *iter;
1668 struct dlm_lock *lock = NULL; 1719 struct dlm_lock *lock = NULL;
1720 u8 from = O2NM_MAX_NODES;
1721 unsigned int added = 0;
1669 1722
1670 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1723 mlog(0, "running %d locks for this lockres\n", mres->num_locks);
1671 for (i=0; i<mres->num_locks; i++) { 1724 for (i=0; i<mres->num_locks; i++) {
1672 ml = &(mres->ml[i]); 1725 ml = &(mres->ml[i]);
1726
1727 if (dlm_is_dummy_lock(dlm, ml, &from)) {
1728 /* placeholder, just need to set the refmap bit */
1729 BUG_ON(mres->num_locks != 1);
1730 mlog(0, "%s:%.*s: dummy lock for %u\n",
1731 dlm->name, mres->lockname_len, mres->lockname,
1732 from);
1733 spin_lock(&res->spinlock);
1734 dlm_lockres_set_refmap_bit(from, res);
1735 spin_unlock(&res->spinlock);
1736 added++;
1737 break;
1738 }
1673 BUG_ON(ml->highest_blocked != LKM_IVMODE); 1739 BUG_ON(ml->highest_blocked != LKM_IVMODE);
1674 newlock = NULL; 1740 newlock = NULL;
1675 lksb = NULL; 1741 lksb = NULL;
1676 1742
1677 queue = dlm_list_num_to_pointer(res, ml->list); 1743 queue = dlm_list_num_to_pointer(res, ml->list);
1744 tmpq = NULL;
1678 1745
1679 /* if the lock is for the local node it needs to 1746 /* if the lock is for the local node it needs to
1680 * be moved to the proper location within the queue. 1747 * be moved to the proper location within the queue.
@@ -1684,11 +1751,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1684 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); 1751 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
1685 1752
1686 spin_lock(&res->spinlock); 1753 spin_lock(&res->spinlock);
1687 list_for_each(iter, queue) { 1754 for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
1688 lock = list_entry (iter, struct dlm_lock, list); 1755 tmpq = dlm_list_idx_to_ptr(res, j);
1689 if (lock->ml.cookie != ml->cookie) 1756 list_for_each(iter, tmpq) {
1690 lock = NULL; 1757 lock = list_entry (iter, struct dlm_lock, list);
1691 else 1758 if (lock->ml.cookie != ml->cookie)
1759 lock = NULL;
1760 else
1761 break;
1762 }
1763 if (lock)
1692 break; 1764 break;
1693 } 1765 }
1694 1766
@@ -1698,12 +1770,20 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1698 u64 c = ml->cookie; 1770 u64 c = ml->cookie;
1699 mlog(ML_ERROR, "could not find local lock " 1771 mlog(ML_ERROR, "could not find local lock "
1700 "with cookie %u:%llu!\n", 1772 "with cookie %u:%llu!\n",
1701 dlm_get_lock_cookie_node(c), 1773 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1702 dlm_get_lock_cookie_seq(c)); 1774 dlm_get_lock_cookie_seq(be64_to_cpu(c)));
1775 __dlm_print_one_lock_resource(res);
1703 BUG(); 1776 BUG();
1704 } 1777 }
1705 BUG_ON(lock->ml.node != ml->node); 1778 BUG_ON(lock->ml.node != ml->node);
1706 1779
1780 if (tmpq != queue) {
1781 mlog(0, "lock was on %u instead of %u for %.*s\n",
1782 j, ml->list, res->lockname.len, res->lockname.name);
1783 spin_unlock(&res->spinlock);
1784 continue;
1785 }
1786
1707 /* see NOTE above about why we do not update 1787 /* see NOTE above about why we do not update
1708 * to match the master here */ 1788 * to match the master here */
1709 1789
@@ -1711,6 +1791,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1711 /* do not alter lock refcount. switching lists. */ 1791 /* do not alter lock refcount. switching lists. */
1712 list_move_tail(&lock->list, queue); 1792 list_move_tail(&lock->list, queue);
1713 spin_unlock(&res->spinlock); 1793 spin_unlock(&res->spinlock);
1794 added++;
1714 1795
1715 mlog(0, "just reordered a local lock!\n"); 1796 mlog(0, "just reordered a local lock!\n");
1716 continue; 1797 continue;
@@ -1799,14 +1880,14 @@ skip_lvb:
1799 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " 1880 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
1800 "exists on this lockres!\n", dlm->name, 1881 "exists on this lockres!\n", dlm->name,
1801 res->lockname.len, res->lockname.name, 1882 res->lockname.len, res->lockname.name,
1802 dlm_get_lock_cookie_node(c), 1883 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1803 dlm_get_lock_cookie_seq(c)); 1884 dlm_get_lock_cookie_seq(be64_to_cpu(c)));
1804 1885
1805 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " 1886 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
1806 "node=%u, cookie=%u:%llu, queue=%d\n", 1887 "node=%u, cookie=%u:%llu, queue=%d\n",
1807 ml->type, ml->convert_type, ml->node, 1888 ml->type, ml->convert_type, ml->node,
1808 dlm_get_lock_cookie_node(ml->cookie), 1889 dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
1809 dlm_get_lock_cookie_seq(ml->cookie), 1890 dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
1810 ml->list); 1891 ml->list);
1811 1892
1812 __dlm_print_one_lock_resource(res); 1893 __dlm_print_one_lock_resource(res);
@@ -1817,12 +1898,22 @@ skip_lvb:
1817 if (!bad) { 1898 if (!bad) {
1818 dlm_lock_get(newlock); 1899 dlm_lock_get(newlock);
1819 list_add_tail(&newlock->list, queue); 1900 list_add_tail(&newlock->list, queue);
1901 mlog(0, "%s:%.*s: added lock for node %u, "
1902 "setting refmap bit\n", dlm->name,
1903 res->lockname.len, res->lockname.name, ml->node);
1904 dlm_lockres_set_refmap_bit(ml->node, res);
1905 added++;
1820 } 1906 }
1821 spin_unlock(&res->spinlock); 1907 spin_unlock(&res->spinlock);
1822 } 1908 }
1823 mlog(0, "done running all the locks\n"); 1909 mlog(0, "done running all the locks\n");
1824 1910
1825leave: 1911leave:
1912 /* balance the ref taken when the work was queued */
1913 spin_lock(&res->spinlock);
1914 dlm_lockres_drop_inflight_ref(dlm, res);
1915 spin_unlock(&res->spinlock);
1916
1826 if (ret < 0) { 1917 if (ret < 0) {
1827 mlog_errno(ret); 1918 mlog_errno(ret);
1828 if (newlock) 1919 if (newlock)
@@ -1935,9 +2026,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1935 if (res->owner == dead_node) { 2026 if (res->owner == dead_node) {
1936 list_del_init(&res->recovering); 2027 list_del_init(&res->recovering);
1937 spin_lock(&res->spinlock); 2028 spin_lock(&res->spinlock);
2029 /* new_master has our reference from
2030 * the lock state sent during recovery */
1938 dlm_change_lockres_owner(dlm, res, new_master); 2031 dlm_change_lockres_owner(dlm, res, new_master);
1939 res->state &= ~DLM_LOCK_RES_RECOVERING; 2032 res->state &= ~DLM_LOCK_RES_RECOVERING;
1940 if (!__dlm_lockres_unused(res)) 2033 if (__dlm_lockres_has_locks(res))
1941 __dlm_dirty_lockres(dlm, res); 2034 __dlm_dirty_lockres(dlm, res);
1942 spin_unlock(&res->spinlock); 2035 spin_unlock(&res->spinlock);
1943 wake_up(&res->wq); 2036 wake_up(&res->wq);
@@ -1977,9 +2070,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1977 dlm_lockres_put(res); 2070 dlm_lockres_put(res);
1978 } 2071 }
1979 spin_lock(&res->spinlock); 2072 spin_lock(&res->spinlock);
2073 /* new_master has our reference from
2074 * the lock state sent during recovery */
1980 dlm_change_lockres_owner(dlm, res, new_master); 2075 dlm_change_lockres_owner(dlm, res, new_master);
1981 res->state &= ~DLM_LOCK_RES_RECOVERING; 2076 res->state &= ~DLM_LOCK_RES_RECOVERING;
1982 if (!__dlm_lockres_unused(res)) 2077 if (__dlm_lockres_has_locks(res))
1983 __dlm_dirty_lockres(dlm, res); 2078 __dlm_dirty_lockres(dlm, res);
1984 spin_unlock(&res->spinlock); 2079 spin_unlock(&res->spinlock);
1985 wake_up(&res->wq); 2080 wake_up(&res->wq);
@@ -2048,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2048{ 2143{
2049 struct list_head *iter, *tmpiter; 2144 struct list_head *iter, *tmpiter;
2050 struct dlm_lock *lock; 2145 struct dlm_lock *lock;
2146 unsigned int freed = 0;
2051 2147
2052 /* this node is the lockres master: 2148 /* this node is the lockres master:
2053 * 1) remove any stale locks for the dead node 2149 * 1) remove any stale locks for the dead node
@@ -2062,6 +2158,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2062 if (lock->ml.node == dead_node) { 2158 if (lock->ml.node == dead_node) {
2063 list_del_init(&lock->list); 2159 list_del_init(&lock->list);
2064 dlm_lock_put(lock); 2160 dlm_lock_put(lock);
2161 freed++;
2065 } 2162 }
2066 } 2163 }
2067 list_for_each_safe(iter, tmpiter, &res->converting) { 2164 list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2166,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2069 if (lock->ml.node == dead_node) { 2166 if (lock->ml.node == dead_node) {
2070 list_del_init(&lock->list); 2167 list_del_init(&lock->list);
2071 dlm_lock_put(lock); 2168 dlm_lock_put(lock);
2169 freed++;
2072 } 2170 }
2073 } 2171 }
2074 list_for_each_safe(iter, tmpiter, &res->blocked) { 2172 list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2174,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2076 if (lock->ml.node == dead_node) { 2174 if (lock->ml.node == dead_node) {
2077 list_del_init(&lock->list); 2175 list_del_init(&lock->list);
2078 dlm_lock_put(lock); 2176 dlm_lock_put(lock);
2177 freed++;
2079 } 2178 }
2080 } 2179 }
2081 2180
2181 if (freed) {
2182 mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
2183 "dropping ref from lockres\n", dlm->name,
2184 res->lockname.len, res->lockname.name, freed, dead_node);
2185 BUG_ON(!test_bit(dead_node, res->refmap));
2186 dlm_lockres_clear_refmap_bit(dead_node, res);
2187 } else if (test_bit(dead_node, res->refmap)) {
2188 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2189 "no locks and had not purged before dying\n", dlm->name,
2190 res->lockname.len, res->lockname.name, dead_node);
2191 dlm_lockres_clear_refmap_bit(dead_node, res);
2192 }
2193
2082 /* do not kick thread yet */ 2194 /* do not kick thread yet */
2083 __dlm_dirty_lockres(dlm, res); 2195 __dlm_dirty_lockres(dlm, res);
2084} 2196}
@@ -2141,9 +2253,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2141 spin_lock(&res->spinlock); 2253 spin_lock(&res->spinlock);
2142 /* zero the lvb if necessary */ 2254 /* zero the lvb if necessary */
2143 dlm_revalidate_lvb(dlm, res, dead_node); 2255 dlm_revalidate_lvb(dlm, res, dead_node);
2144 if (res->owner == dead_node) 2256 if (res->owner == dead_node) {
2257 if (res->state & DLM_LOCK_RES_DROPPING_REF)
2258 mlog(0, "%s:%.*s: owned by "
2259 "dead node %u, this node was "
2260 "dropping its ref when it died. "
2261 "continue, dropping the flag.\n",
2262 dlm->name, res->lockname.len,
2263 res->lockname.name, dead_node);
2264
2265 /* the wake_up for this will happen when the
2266 * RECOVERING flag is dropped later */
2267 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
2268
2145 dlm_move_lockres_to_recovery_list(dlm, res); 2269 dlm_move_lockres_to_recovery_list(dlm, res);
2146 else if (res->owner == dlm->node_num) { 2270 } else if (res->owner == dlm->node_num) {
2147 dlm_free_dead_locks(dlm, res, dead_node); 2271 dlm_free_dead_locks(dlm, res, dead_node);
2148 __dlm_lockres_calc_usage(dlm, res); 2272 __dlm_lockres_calc_usage(dlm, res);
2149 } 2273 }
@@ -2480,7 +2604,8 @@ retry:
2480 return ret; 2604 return ret;
2481} 2605}
2482 2606
2483int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) 2607int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2608 void **ret_data)
2484{ 2609{
2485 struct dlm_ctxt *dlm = data; 2610 struct dlm_ctxt *dlm = data;
2486 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf; 2611 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2608,7 +2733,8 @@ stage2:
2608 return ret; 2733 return ret;
2609} 2734}
2610 2735
2611int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) 2736int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2737 void **ret_data)
2612{ 2738{
2613 struct dlm_ctxt *dlm = data; 2739 struct dlm_ctxt *dlm = data;
2614 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2740 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 0c822f3ffb05..8ffa0916eb86 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -54,9 +54,6 @@
54#include "cluster/masklog.h" 54#include "cluster/masklog.h"
55 55
56static int dlm_thread(void *data); 56static int dlm_thread(void *data);
57static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
59
60static void dlm_flush_asts(struct dlm_ctxt *dlm); 57static void dlm_flush_asts(struct dlm_ctxt *dlm);
61 58
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 59#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@ repeat:
82 current->state = TASK_RUNNING; 79 current->state = TASK_RUNNING;
83} 80}
84 81
85 82int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
86int __dlm_lockres_unused(struct dlm_lock_resource *res)
87{ 83{
88 if (list_empty(&res->granted) && 84 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) && 85 list_empty(&res->converting) &&
90 list_empty(&res->blocked) && 86 list_empty(&res->blocked))
91 list_empty(&res->dirty)) 87 return 0;
92 return 1; 88 return 1;
89}
90
91/* "unused": the lockres has no locks, is not on the dirty list,
92 * has no inflight locks (in the gap between mastery and acquiring
93 * the first lock), and has no bits in its refmap.
94 * truly ready to be freed. */
95int __dlm_lockres_unused(struct dlm_lock_resource *res)
96{
97 if (!__dlm_lockres_has_locks(res) &&
98 (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
99 /* try not to scan the bitmap unless the first two
100 * conditions are already true */
101 int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
102 if (bit >= O2NM_MAX_NODES) {
103 /* since the bit for dlm->node_num is not
104 * set, inflight_locks better be zero */
105 BUG_ON(res->inflight_locks != 0);
106 return 1;
107 }
108 }
93 return 0; 109 return 0;
94} 110}
95 111
@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
106 assert_spin_locked(&res->spinlock); 122 assert_spin_locked(&res->spinlock);
107 123
108 if (__dlm_lockres_unused(res)){ 124 if (__dlm_lockres_unused(res)){
109 /* For now, just keep any resource we master */
110 if (res->owner == dlm->node_num)
111 {
112 if (!list_empty(&res->purge)) {
113 mlog(0, "we master %s:%.*s, but it is on "
114 "the purge list. Removing\n",
115 dlm->name, res->lockname.len,
116 res->lockname.name);
117 list_del_init(&res->purge);
118 dlm->purge_count--;
119 }
120 return;
121 }
122
123 if (list_empty(&res->purge)) { 125 if (list_empty(&res->purge)) {
124 mlog(0, "putting lockres %.*s from purge list\n", 126 mlog(0, "putting lockres %.*s:%p onto purge list\n",
125 res->lockname.len, res->lockname.name); 127 res->lockname.len, res->lockname.name, res);
126 128
127 res->last_used = jiffies; 129 res->last_used = jiffies;
130 dlm_lockres_get(res);
128 list_add_tail(&res->purge, &dlm->purge_list); 131 list_add_tail(&res->purge, &dlm->purge_list);
129 dlm->purge_count++; 132 dlm->purge_count++;
130
131 /* if this node is not the owner, there is
132 * no way to keep track of who the owner could be.
133 * unhash it to avoid serious problems. */
134 if (res->owner != dlm->node_num) {
135 mlog(0, "%s:%.*s: doing immediate "
136 "purge of lockres owned by %u\n",
137 dlm->name, res->lockname.len,
138 res->lockname.name, res->owner);
139
140 dlm_purge_lockres_now(dlm, res);
141 }
142 } 133 }
143 } else if (!list_empty(&res->purge)) { 134 } else if (!list_empty(&res->purge)) {
144 mlog(0, "removing lockres %.*s from purge list, " 135 mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
145 "owner=%u\n", res->lockname.len, res->lockname.name, 136 res->lockname.len, res->lockname.name, res, res->owner);
146 res->owner);
147 137
148 list_del_init(&res->purge); 138 list_del_init(&res->purge);
139 dlm_lockres_put(res);
149 dlm->purge_count--; 140 dlm->purge_count--;
150 } 141 }
151} 142}
@@ -163,68 +154,65 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
163 spin_unlock(&dlm->spinlock); 154 spin_unlock(&dlm->spinlock);
164} 155}
165 156
166/* TODO: Eventual API: Called with the dlm spinlock held, may drop it 157static int dlm_purge_lockres(struct dlm_ctxt *dlm,
167 * to do migration, but will re-acquire before exit. */ 158 struct dlm_lock_resource *res)
168void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
169{ 159{
170 int master; 160 int master;
171 int ret; 161 int ret = 0;
172
173 spin_lock(&lockres->spinlock);
174 master = lockres->owner == dlm->node_num;
175 spin_unlock(&lockres->spinlock);
176 162
177 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len, 163 spin_lock(&res->spinlock);
178 lockres->lockname.name, master); 164 if (!__dlm_lockres_unused(res)) {
179 165 spin_unlock(&res->spinlock);
180 /* Non master is the easy case -- no migration required, just 166 mlog(0, "%s:%.*s: tried to purge but not unused\n",
181 * quit. */ 167 dlm->name, res->lockname.len, res->lockname.name);
168 return -ENOTEMPTY;
169 }
170 master = (res->owner == dlm->node_num);
182 if (!master) 171 if (!master)
183 goto finish; 172 res->state |= DLM_LOCK_RES_DROPPING_REF;
184 173 spin_unlock(&res->spinlock);
185 /* Wheee! Migrate lockres here! */
186 spin_unlock(&dlm->spinlock);
187again:
188 174
189 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); 175 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
190 if (ret == -ENOTEMPTY) { 176 res->lockname.name, master);
191 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
192 lockres->lockname.len, lockres->lockname.name);
193 177
194 BUG(); 178 if (!master) {
195 } else if (ret < 0) { 179 spin_lock(&res->spinlock);
196 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", 180 /* This ensures that clear refmap is sent after the set */
197 lockres->lockname.len, lockres->lockname.name); 181 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
198 msleep(100); 182 spin_unlock(&res->spinlock);
199 goto again; 183 /* drop spinlock to do messaging, retake below */
184 spin_unlock(&dlm->spinlock);
185 /* clear our bit from the master's refmap, ignore errors */
186 ret = dlm_drop_lockres_ref(dlm, res);
187 if (ret < 0) {
188 mlog_errno(ret);
189 if (!dlm_is_host_down(ret))
190 BUG();
191 }
192 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
193 dlm->name, res->lockname.len, res->lockname.name, ret);
194 spin_lock(&dlm->spinlock);
200 } 195 }
201 196
202 spin_lock(&dlm->spinlock); 197 if (!list_empty(&res->purge)) {
203 198 mlog(0, "removing lockres %.*s:%p from purgelist, "
204finish: 199 "master = %d\n", res->lockname.len, res->lockname.name,
205 if (!list_empty(&lockres->purge)) { 200 res, master);
206 list_del_init(&lockres->purge); 201 list_del_init(&res->purge);
202 dlm_lockres_put(res);
207 dlm->purge_count--; 203 dlm->purge_count--;
208 } 204 }
209 __dlm_unhash_lockres(lockres); 205 __dlm_unhash_lockres(res);
210}
211
212/* make an unused lockres go away immediately.
213 * as soon as the dlm spinlock is dropped, this lockres
214 * will not be found. kfree still happens on last put. */
215static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
216 struct dlm_lock_resource *lockres)
217{
218 assert_spin_locked(&dlm->spinlock);
219 assert_spin_locked(&lockres->spinlock);
220 206
221 BUG_ON(!__dlm_lockres_unused(lockres)); 207 /* lockres is not in the hash now. drop the flag and wake up
222 208 * any processes waiting in dlm_get_lock_resource. */
223 if (!list_empty(&lockres->purge)) { 209 if (!master) {
224 list_del_init(&lockres->purge); 210 spin_lock(&res->spinlock);
225 dlm->purge_count--; 211 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
212 spin_unlock(&res->spinlock);
213 wake_up(&res->wq);
226 } 214 }
227 __dlm_unhash_lockres(lockres); 215 return 0;
228} 216}
229 217
230static void dlm_run_purge_list(struct dlm_ctxt *dlm, 218static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -268,13 +256,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
268 break; 256 break;
269 } 257 }
270 258
259 mlog(0, "removing lockres %.*s:%p from purgelist\n",
260 lockres->lockname.len, lockres->lockname.name, lockres);
271 list_del_init(&lockres->purge); 261 list_del_init(&lockres->purge);
262 dlm_lockres_put(lockres);
272 dlm->purge_count--; 263 dlm->purge_count--;
273 264
274 /* This may drop and reacquire the dlm spinlock if it 265 /* This may drop and reacquire the dlm spinlock if it
275 * has to do migration. */ 266 * has to do migration. */
276 mlog(0, "calling dlm_purge_lockres!\n"); 267 mlog(0, "calling dlm_purge_lockres!\n");
277 dlm_purge_lockres(dlm, lockres); 268 if (dlm_purge_lockres(dlm, lockres))
269 BUG();
278 mlog(0, "DONE calling dlm_purge_lockres!\n"); 270 mlog(0, "DONE calling dlm_purge_lockres!\n");
279 271
280 /* Avoid adding any scheduling latencies */ 272 /* Avoid adding any scheduling latencies */
@@ -467,12 +459,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
467 assert_spin_locked(&res->spinlock); 459 assert_spin_locked(&res->spinlock);
468 460
469 /* don't shuffle secondary queues */ 461 /* don't shuffle secondary queues */
470 if ((res->owner == dlm->node_num) && 462 if ((res->owner == dlm->node_num)) {
471 !(res->state & DLM_LOCK_RES_DIRTY)) { 463 if (res->state & (DLM_LOCK_RES_MIGRATING |
472 /* ref for dirty_list */ 464 DLM_LOCK_RES_BLOCK_DIRTY))
473 dlm_lockres_get(res); 465 return;
474 list_add_tail(&res->dirty, &dlm->dirty_list); 466
475 res->state |= DLM_LOCK_RES_DIRTY; 467 if (list_empty(&res->dirty)) {
468 /* ref for dirty_list */
469 dlm_lockres_get(res);
470 list_add_tail(&res->dirty, &dlm->dirty_list);
471 res->state |= DLM_LOCK_RES_DIRTY;
472 }
476 } 473 }
477} 474}
478 475
@@ -651,7 +648,7 @@ static int dlm_thread(void *data)
651 dlm_lockres_get(res); 648 dlm_lockres_get(res);
652 649
653 spin_lock(&res->spinlock); 650 spin_lock(&res->spinlock);
654 res->state &= ~DLM_LOCK_RES_DIRTY; 651 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
655 list_del_init(&res->dirty); 652 list_del_init(&res->dirty);
656 spin_unlock(&res->spinlock); 653 spin_unlock(&res->spinlock);
657 spin_unlock(&dlm->spinlock); 654 spin_unlock(&dlm->spinlock);
@@ -675,10 +672,11 @@ static int dlm_thread(void *data)
675 /* it is now ok to move lockreses in these states 672 /* it is now ok to move lockreses in these states
676 * to the dirty list, assuming that they will only be 673 * to the dirty list, assuming that they will only be
677 * dirty for a short while. */ 674 * dirty for a short while. */
675 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
678 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 676 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
679 DLM_LOCK_RES_MIGRATING |
680 DLM_LOCK_RES_RECOVERING)) { 677 DLM_LOCK_RES_RECOVERING)) {
681 /* move it to the tail and keep going */ 678 /* move it to the tail and keep going */
679 res->state &= ~DLM_LOCK_RES_DIRTY;
682 spin_unlock(&res->spinlock); 680 spin_unlock(&res->spinlock);
683 mlog(0, "delaying list shuffling for in-" 681 mlog(0, "delaying list shuffling for in-"
684 "progress lockres %.*s, state=%d\n", 682 "progress lockres %.*s, state=%d\n",
@@ -699,6 +697,7 @@ static int dlm_thread(void *data)
699 697
700 /* called while holding lockres lock */ 698 /* called while holding lockres lock */
701 dlm_shuffle_lists(dlm, res); 699 dlm_shuffle_lists(dlm, res);
700 res->state &= ~DLM_LOCK_RES_DIRTY;
702 spin_unlock(&res->spinlock); 701 spin_unlock(&res->spinlock);
703 702
704 dlm_lockres_calc_usage(dlm, res); 703 dlm_lockres_calc_usage(dlm, res);
@@ -709,11 +708,8 @@ in_progress:
709 /* if the lock was in-progress, stick 708 /* if the lock was in-progress, stick
710 * it on the back of the list */ 709 * it on the back of the list */
711 if (delay) { 710 if (delay) {
712 /* ref for dirty_list */
713 dlm_lockres_get(res);
714 spin_lock(&res->spinlock); 711 spin_lock(&res->spinlock);
715 list_add_tail(&res->dirty, &dlm->dirty_list); 712 __dlm_dirty_lockres(dlm, res);
716 res->state |= DLM_LOCK_RES_DIRTY;
717 spin_unlock(&res->spinlock); 713 spin_unlock(&res->spinlock);
718 } 714 }
719 dlm_lockres_put(res); 715 dlm_lockres_put(res);
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index 37be4b2e0d4a..86ca085ef324 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -147,6 +147,10 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
147 goto leave; 147 goto leave;
148 } 148 }
149 149
150 if (res->state & DLM_LOCK_RES_MIGRATING) {
151 status = DLM_MIGRATING;
152 goto leave;
153 }
150 154
151 /* see above for what the spec says about 155 /* see above for what the spec says about
152 * LKM_CANCEL and the lock queue state */ 156 * LKM_CANCEL and the lock queue state */
@@ -244,8 +248,8 @@ leave:
244 /* this should always be coupled with list removal */ 248 /* this should always be coupled with list removal */
245 BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK)); 249 BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
246 mlog(0, "lock %u:%llu should be gone now! refs=%d\n", 250 mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
247 dlm_get_lock_cookie_node(lock->ml.cookie), 251 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
248 dlm_get_lock_cookie_seq(lock->ml.cookie), 252 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
249 atomic_read(&lock->lock_refs.refcount)-1); 253 atomic_read(&lock->lock_refs.refcount)-1);
250 dlm_lock_put(lock); 254 dlm_lock_put(lock);
251 } 255 }
@@ -379,7 +383,8 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
379 * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID, 383 * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
380 * return value from dlmunlock_master 384 * return value from dlmunlock_master
381 */ 385 */
382int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data) 386int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
387 void **ret_data)
383{ 388{
384 struct dlm_ctxt *dlm = data; 389 struct dlm_ctxt *dlm = data;
385 struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf; 390 struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
@@ -502,8 +507,8 @@ not_found:
502 if (!found) 507 if (!found)
503 mlog(ML_ERROR, "failed to find lock to unlock! " 508 mlog(ML_ERROR, "failed to find lock to unlock! "
504 "cookie=%u:%llu\n", 509 "cookie=%u:%llu\n",
505 dlm_get_lock_cookie_node(unlock->cookie), 510 dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
506 dlm_get_lock_cookie_seq(unlock->cookie)); 511 dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
507 else 512 else
508 dlm_lock_put(lock); 513 dlm_lock_put(lock);
509 514
diff --git a/fs/ocfs2/vote.c b/fs/ocfs2/vote.c
index 0afd8b9af70f..f30e63b9910c 100644
--- a/fs/ocfs2/vote.c
+++ b/fs/ocfs2/vote.c
@@ -887,7 +887,7 @@ static inline int ocfs2_translate_response(int response)
887 887
888static int ocfs2_handle_response_message(struct o2net_msg *msg, 888static int ocfs2_handle_response_message(struct o2net_msg *msg,
889 u32 len, 889 u32 len,
890 void *data) 890 void *data, void **ret_data)
891{ 891{
892 unsigned int response_id, node_num; 892 unsigned int response_id, node_num;
893 int response_status; 893 int response_status;
@@ -943,7 +943,7 @@ bail:
943 943
944static int ocfs2_handle_vote_message(struct o2net_msg *msg, 944static int ocfs2_handle_vote_message(struct o2net_msg *msg,
945 u32 len, 945 u32 len,
946 void *data) 946 void *data, void **ret_data)
947{ 947{
948 int status; 948 int status;
949 struct ocfs2_super *osb = data; 949 struct ocfs2_super *osb = data;
@@ -1007,7 +1007,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
1007 osb->net_key, 1007 osb->net_key,
1008 sizeof(struct ocfs2_response_msg), 1008 sizeof(struct ocfs2_response_msg),
1009 ocfs2_handle_response_message, 1009 ocfs2_handle_response_message,
1010 osb, &osb->osb_net_handlers); 1010 osb, NULL, &osb->osb_net_handlers);
1011 if (status) { 1011 if (status) {
1012 mlog_errno(status); 1012 mlog_errno(status);
1013 goto bail; 1013 goto bail;
@@ -1017,7 +1017,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
1017 osb->net_key, 1017 osb->net_key,
1018 sizeof(struct ocfs2_vote_msg), 1018 sizeof(struct ocfs2_vote_msg),
1019 ocfs2_handle_vote_message, 1019 ocfs2_handle_vote_message,
1020 osb, &osb->osb_net_handlers); 1020 osb, NULL, &osb->osb_net_handlers);
1021 if (status) { 1021 if (status) {
1022 mlog_errno(status); 1022 mlog_errno(status);
1023 goto bail; 1023 goto bail;