aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@woody.linux-foundation.org>2007-02-08 13:37:22 -0500
committerLinus Torvalds <torvalds@woody.linux-foundation.org>2007-02-08 13:37:22 -0500
commit5986a2ec35836a878350c54af4bd91b1de6abc59 (patch)
tree2efe068e124071ca30a5f1886402b890d7ba429e /fs
parent43187902cbfafe73ede0144166b741fb0f7d04e1 (diff)
parentff05d1c4643dd4260eb699396043d7e8009c0de4 (diff)
Merge branch 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2
* 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2: (22 commits) configfs: Zero terminate data in configfs attribute writes. [PATCH] ocfs2 heartbeat: clean up bio submission code ocfs2: introduce sc->sc_send_lock to protect outbound outbound messages [PATCH] ocfs2: drop INET from Kconfig, not needed ocfs2_dlm: Add timeout to dlm join domain ocfs2_dlm: Silence some messages during join domain ocfs2_dlm: disallow a domain join if node maps mismatch ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres ocfs2: Binds listener to the configured ip address ocfs2_dlm: Calling post handler function in assert master handler ocfs2: Added post handler callable function in o2net message handler ocfs2_dlm: Cookies in locks not being printed correctly in error messages ocfs2_dlm: Silence a failed convert ocfs2_dlm: wake up sleepers on the lockres waitqueue ocfs2_dlm: Dlm dispatch was stopping too early ocfs2_dlm: Drop inflight refmap even if no locks found on the lockres ocfs2_dlm: Flush dlm workqueue before starting to migrate ocfs2_dlm: Fix migrate lockres handler queue scanning ocfs2_dlm: Make dlmunlock() wait for migration to complete ocfs2_dlm: Fixes race between migrate and dirty ...
Diffstat (limited to 'fs')
-rw-r--r--fs/Kconfig1
-rw-r--r--fs/configfs/file.c9
-rw-r--r--fs/ocfs2/cluster/heartbeat.c158
-rw-r--r--fs/ocfs2/cluster/tcp.c35
-rw-r--r--fs/ocfs2/cluster/tcp.h6
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h12
-rw-r--r--fs/ocfs2/dlm/dlmast.c14
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h130
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c40
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c30
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c253
-rw-r--r--fs/ocfs2/dlm/dlmlock.c7
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c579
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c182
-rw-r--r--fs/ocfs2/dlm/dlmthread.c200
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c15
-rw-r--r--fs/ocfs2/vote.c8
17 files changed, 1211 insertions, 468 deletions
diff --git a/fs/Kconfig b/fs/Kconfig
index 8cd2417a14db..5e8e9d9ccb33 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -426,7 +426,6 @@ config OCFS2_FS
426 select CONFIGFS_FS 426 select CONFIGFS_FS
427 select JBD 427 select JBD
428 select CRC32 428 select CRC32
429 select INET
430 help 429 help
431 OCFS2 is a general purpose extent based shared disk cluster file 430 OCFS2 is a general purpose extent based shared disk cluster file
432 system with many similarities to ext3. It supports 64 bit inode 431 system with many similarities to ext3. It supports 64 bit inode
diff --git a/fs/configfs/file.c b/fs/configfs/file.c
index 2a7cb086e80c..d98be5e01328 100644
--- a/fs/configfs/file.c
+++ b/fs/configfs/file.c
@@ -162,14 +162,17 @@ fill_write_buffer(struct configfs_buffer * buffer, const char __user * buf, size
162 int error; 162 int error;
163 163
164 if (!buffer->page) 164 if (!buffer->page)
165 buffer->page = (char *)get_zeroed_page(GFP_KERNEL); 165 buffer->page = (char *)__get_free_pages(GFP_KERNEL, 0);
166 if (!buffer->page) 166 if (!buffer->page)
167 return -ENOMEM; 167 return -ENOMEM;
168 168
169 if (count > PAGE_SIZE) 169 if (count >= PAGE_SIZE)
170 count = PAGE_SIZE; 170 count = PAGE_SIZE - 1;
171 error = copy_from_user(buffer->page,buf,count); 171 error = copy_from_user(buffer->page,buf,count);
172 buffer->needs_read_fill = 1; 172 buffer->needs_read_fill = 1;
173 /* if buf is assumed to contain a string, terminate it by \0,
174 * so e.g. sscanf() can scan the string easily */
175 buffer->page[count] = 0;
173 return error ? -EFAULT : count; 176 return error ? -EFAULT : count;
174} 177}
175 178
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 277ca67a2ad6..5a9779bb9236 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -184,10 +184,9 @@ static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
184 flush_scheduled_work(); 184 flush_scheduled_work();
185} 185}
186 186
187static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, 187static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
188 unsigned int num_ios)
189{ 188{
190 atomic_set(&wc->wc_num_reqs, num_ios); 189 atomic_set(&wc->wc_num_reqs, 1);
191 init_completion(&wc->wc_io_complete); 190 init_completion(&wc->wc_io_complete);
192 wc->wc_error = 0; 191 wc->wc_error = 0;
193} 192}
@@ -212,6 +211,7 @@ static void o2hb_wait_on_io(struct o2hb_region *reg,
212 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 211 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
213 212
214 blk_run_address_space(mapping); 213 blk_run_address_space(mapping);
214 o2hb_bio_wait_dec(wc, 1);
215 215
216 wait_for_completion(&wc->wc_io_complete); 216 wait_for_completion(&wc->wc_io_complete);
217} 217}
@@ -231,6 +231,7 @@ static int o2hb_bio_end_io(struct bio *bio,
231 return 1; 231 return 1;
232 232
233 o2hb_bio_wait_dec(wc, 1); 233 o2hb_bio_wait_dec(wc, 1);
234 bio_put(bio);
234 return 0; 235 return 0;
235} 236}
236 237
@@ -238,23 +239,22 @@ static int o2hb_bio_end_io(struct bio *bio,
238 * start_slot. */ 239 * start_slot. */
239static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 240static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
240 struct o2hb_bio_wait_ctxt *wc, 241 struct o2hb_bio_wait_ctxt *wc,
241 unsigned int start_slot, 242 unsigned int *current_slot,
242 unsigned int num_slots) 243 unsigned int max_slots)
243{ 244{
244 int i, nr_vecs, len, first_page, last_page; 245 int len, current_page;
245 unsigned int vec_len, vec_start; 246 unsigned int vec_len, vec_start;
246 unsigned int bits = reg->hr_block_bits; 247 unsigned int bits = reg->hr_block_bits;
247 unsigned int spp = reg->hr_slots_per_page; 248 unsigned int spp = reg->hr_slots_per_page;
249 unsigned int cs = *current_slot;
248 struct bio *bio; 250 struct bio *bio;
249 struct page *page; 251 struct page *page;
250 252
251 nr_vecs = (num_slots + spp - 1) / spp;
252
253 /* Testing has shown this allocation to take long enough under 253 /* Testing has shown this allocation to take long enough under
254 * GFP_KERNEL that the local node can get fenced. It would be 254 * GFP_KERNEL that the local node can get fenced. It would be
255 * nicest if we could pre-allocate these bios and avoid this 255 * nicest if we could pre-allocate these bios and avoid this
256 * all together. */ 256 * all together. */
257 bio = bio_alloc(GFP_ATOMIC, nr_vecs); 257 bio = bio_alloc(GFP_ATOMIC, 16);
258 if (!bio) { 258 if (!bio) {
259 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 259 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
260 bio = ERR_PTR(-ENOMEM); 260 bio = ERR_PTR(-ENOMEM);
@@ -262,137 +262,53 @@ static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
262 } 262 }
263 263
264 /* Must put everything in 512 byte sectors for the bio... */ 264 /* Must put everything in 512 byte sectors for the bio... */
265 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); 265 bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
266 bio->bi_bdev = reg->hr_bdev; 266 bio->bi_bdev = reg->hr_bdev;
267 bio->bi_private = wc; 267 bio->bi_private = wc;
268 bio->bi_end_io = o2hb_bio_end_io; 268 bio->bi_end_io = o2hb_bio_end_io;
269 269
270 first_page = start_slot / spp; 270 vec_start = (cs << bits) % PAGE_CACHE_SIZE;
271 last_page = first_page + nr_vecs; 271 while(cs < max_slots) {
272 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; 272 current_page = cs / spp;
273 for(i = first_page; i < last_page; i++) { 273 page = reg->hr_slot_data[current_page];
274 page = reg->hr_slot_data[i];
275 274
276 vec_len = PAGE_CACHE_SIZE; 275 vec_len = min(PAGE_CACHE_SIZE,
277 /* last page might be short */ 276 (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
278 if (((i + 1) * spp) > (start_slot + num_slots))
279 vec_len = ((num_slots + start_slot) % spp) << bits;
280 vec_len -= vec_start;
281 277
282 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 278 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
283 i, vec_len, vec_start); 279 current_page, vec_len, vec_start);
284 280
285 len = bio_add_page(bio, page, vec_len, vec_start); 281 len = bio_add_page(bio, page, vec_len, vec_start);
286 if (len != vec_len) { 282 if (len != vec_len) break;
287 bio_put(bio);
288 bio = ERR_PTR(-EIO);
289
290 mlog(ML_ERROR, "Error adding page to bio i = %d, "
291 "vec_len = %u, len = %d\n, start = %u\n",
292 i, vec_len, len, vec_start);
293 goto bail;
294 }
295 283
284 cs += vec_len / (PAGE_CACHE_SIZE/spp);
296 vec_start = 0; 285 vec_start = 0;
297 } 286 }
298 287
299bail: 288bail:
289 *current_slot = cs;
300 return bio; 290 return bio;
301} 291}
302 292
303/*
304 * Compute the maximum number of sectors the bdev can handle in one bio,
305 * as a power of two.
306 *
307 * Stolen from oracleasm, thanks Joel!
308 */
309static int compute_max_sectors(struct block_device *bdev)
310{
311 int max_pages, max_sectors, pow_two_sectors;
312
313 struct request_queue *q;
314
315 q = bdev_get_queue(bdev);
316 max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
317 if (max_pages > BIO_MAX_PAGES)
318 max_pages = BIO_MAX_PAGES;
319 if (max_pages > q->max_phys_segments)
320 max_pages = q->max_phys_segments;
321 if (max_pages > q->max_hw_segments)
322 max_pages = q->max_hw_segments;
323 max_pages--; /* Handle I/Os that straddle a page */
324
325 if (max_pages) {
326 max_sectors = max_pages << (PAGE_SHIFT - 9);
327 } else {
328 /* If BIO contains 1 or less than 1 page. */
329 max_sectors = q->max_sectors;
330 }
331 /* Why is fls() 1-based???? */
332 pow_two_sectors = 1 << (fls(max_sectors) - 1);
333
334 return pow_two_sectors;
335}
336
337static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
338 unsigned int num_slots,
339 unsigned int *num_bios,
340 unsigned int *slots_per_bio)
341{
342 unsigned int max_sectors, io_sectors;
343
344 max_sectors = compute_max_sectors(reg->hr_bdev);
345
346 io_sectors = num_slots << (reg->hr_block_bits - 9);
347
348 *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
349 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
350
351 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
352 "device can handle %u sectors of I/O\n", io_sectors, num_slots,
353 max_sectors);
354 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
355 *num_bios, *slots_per_bio);
356}
357
358static int o2hb_read_slots(struct o2hb_region *reg, 293static int o2hb_read_slots(struct o2hb_region *reg,
359 unsigned int max_slots) 294 unsigned int max_slots)
360{ 295{
361 unsigned int num_bios, slots_per_bio, start_slot, num_slots; 296 unsigned int current_slot=0;
362 int i, status; 297 int status;
363 struct o2hb_bio_wait_ctxt wc; 298 struct o2hb_bio_wait_ctxt wc;
364 struct bio **bios;
365 struct bio *bio; 299 struct bio *bio;
366 300
367 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); 301 o2hb_bio_wait_init(&wc);
368 302
369 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); 303 while(current_slot < max_slots) {
370 if (!bios) { 304 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
371 status = -ENOMEM;
372 mlog_errno(status);
373 return status;
374 }
375
376 o2hb_bio_wait_init(&wc, num_bios);
377
378 num_slots = slots_per_bio;
379 for(i = 0; i < num_bios; i++) {
380 start_slot = i * slots_per_bio;
381
382 /* adjust num_slots at last bio */
383 if (max_slots < (start_slot + num_slots))
384 num_slots = max_slots - start_slot;
385
386 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
387 if (IS_ERR(bio)) { 305 if (IS_ERR(bio)) {
388 o2hb_bio_wait_dec(&wc, num_bios - i);
389
390 status = PTR_ERR(bio); 306 status = PTR_ERR(bio);
391 mlog_errno(status); 307 mlog_errno(status);
392 goto bail_and_wait; 308 goto bail_and_wait;
393 } 309 }
394 bios[i] = bio;
395 310
311 atomic_inc(&wc.wc_num_reqs);
396 submit_bio(READ, bio); 312 submit_bio(READ, bio);
397 } 313 }
398 314
@@ -403,38 +319,30 @@ bail_and_wait:
403 if (wc.wc_error && !status) 319 if (wc.wc_error && !status)
404 status = wc.wc_error; 320 status = wc.wc_error;
405 321
406 if (bios) {
407 for(i = 0; i < num_bios; i++)
408 if (bios[i])
409 bio_put(bios[i]);
410 kfree(bios);
411 }
412
413 return status; 322 return status;
414} 323}
415 324
416static int o2hb_issue_node_write(struct o2hb_region *reg, 325static int o2hb_issue_node_write(struct o2hb_region *reg,
417 struct bio **write_bio,
418 struct o2hb_bio_wait_ctxt *write_wc) 326 struct o2hb_bio_wait_ctxt *write_wc)
419{ 327{
420 int status; 328 int status;
421 unsigned int slot; 329 unsigned int slot;
422 struct bio *bio; 330 struct bio *bio;
423 331
424 o2hb_bio_wait_init(write_wc, 1); 332 o2hb_bio_wait_init(write_wc);
425 333
426 slot = o2nm_this_node(); 334 slot = o2nm_this_node();
427 335
428 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); 336 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
429 if (IS_ERR(bio)) { 337 if (IS_ERR(bio)) {
430 status = PTR_ERR(bio); 338 status = PTR_ERR(bio);
431 mlog_errno(status); 339 mlog_errno(status);
432 goto bail; 340 goto bail;
433 } 341 }
434 342
343 atomic_inc(&write_wc->wc_num_reqs);
435 submit_bio(WRITE, bio); 344 submit_bio(WRITE, bio);
436 345
437 *write_bio = bio;
438 status = 0; 346 status = 0;
439bail: 347bail:
440 return status; 348 return status;
@@ -826,7 +734,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
826{ 734{
827 int i, ret, highest_node, change = 0; 735 int i, ret, highest_node, change = 0;
828 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 736 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
829 struct bio *write_bio;
830 struct o2hb_bio_wait_ctxt write_wc; 737 struct o2hb_bio_wait_ctxt write_wc;
831 738
832 ret = o2nm_configured_node_map(configured_nodes, 739 ret = o2nm_configured_node_map(configured_nodes,
@@ -864,7 +771,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
864 771
865 /* And fire off the write. Note that we don't wait on this I/O 772 /* And fire off the write. Note that we don't wait on this I/O
866 * until later. */ 773 * until later. */
867 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 774 ret = o2hb_issue_node_write(reg, &write_wc);
868 if (ret < 0) { 775 if (ret < 0) {
869 mlog_errno(ret); 776 mlog_errno(ret);
870 return ret; 777 return ret;
@@ -882,7 +789,6 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
882 * people we find in our steady state have seen us. 789 * people we find in our steady state have seen us.
883 */ 790 */
884 o2hb_wait_on_io(reg, &write_wc); 791 o2hb_wait_on_io(reg, &write_wc);
885 bio_put(write_bio);
886 if (write_wc.wc_error) { 792 if (write_wc.wc_error) {
887 /* Do not re-arm the write timeout on I/O error - we 793 /* Do not re-arm the write timeout on I/O error - we
888 * can't be sure that the new block ever made it to 794 * can't be sure that the new block ever made it to
@@ -943,7 +849,6 @@ static int o2hb_thread(void *data)
943{ 849{
944 int i, ret; 850 int i, ret;
945 struct o2hb_region *reg = data; 851 struct o2hb_region *reg = data;
946 struct bio *write_bio;
947 struct o2hb_bio_wait_ctxt write_wc; 852 struct o2hb_bio_wait_ctxt write_wc;
948 struct timeval before_hb, after_hb; 853 struct timeval before_hb, after_hb;
949 unsigned int elapsed_msec; 854 unsigned int elapsed_msec;
@@ -993,10 +898,9 @@ static int o2hb_thread(void *data)
993 * 898 *
994 * XXX: Should we skip this on unclean_stop? */ 899 * XXX: Should we skip this on unclean_stop? */
995 o2hb_prepare_block(reg, 0); 900 o2hb_prepare_block(reg, 0);
996 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 901 ret = o2hb_issue_node_write(reg, &write_wc);
997 if (ret == 0) { 902 if (ret == 0) {
998 o2hb_wait_on_io(reg, &write_wc); 903 o2hb_wait_on_io(reg, &write_wc);
999 bio_put(write_bio);
1000 } else { 904 } else {
1001 mlog_errno(ret); 905 mlog_errno(ret);
1002 } 906 }
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ae4ff4a6636b..1718215fc018 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -556,6 +556,8 @@ static void o2net_register_callbacks(struct sock *sk,
556 sk->sk_data_ready = o2net_data_ready; 556 sk->sk_data_ready = o2net_data_ready;
557 sk->sk_state_change = o2net_state_change; 557 sk->sk_state_change = o2net_state_change;
558 558
559 mutex_init(&sc->sc_send_lock);
560
559 write_unlock_bh(&sk->sk_callback_lock); 561 write_unlock_bh(&sk->sk_callback_lock);
560} 562}
561 563
@@ -688,6 +690,7 @@ static void o2net_handler_put(struct o2net_msg_handler *nmh)
688 * be given to the handler if their payload is longer than the max. */ 690 * be given to the handler if their payload is longer than the max. */
689int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, 691int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
690 o2net_msg_handler_func *func, void *data, 692 o2net_msg_handler_func *func, void *data,
693 o2net_post_msg_handler_func *post_func,
691 struct list_head *unreg_list) 694 struct list_head *unreg_list)
692{ 695{
693 struct o2net_msg_handler *nmh = NULL; 696 struct o2net_msg_handler *nmh = NULL;
@@ -722,6 +725,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
722 725
723 nmh->nh_func = func; 726 nmh->nh_func = func;
724 nmh->nh_func_data = data; 727 nmh->nh_func_data = data;
728 nmh->nh_post_func = post_func;
725 nmh->nh_msg_type = msg_type; 729 nmh->nh_msg_type = msg_type;
726 nmh->nh_max_len = max_len; 730 nmh->nh_max_len = max_len;
727 nmh->nh_key = key; 731 nmh->nh_key = key;
@@ -856,10 +860,12 @@ static void o2net_sendpage(struct o2net_sock_container *sc,
856 ssize_t ret; 860 ssize_t ret;
857 861
858 862
863 mutex_lock(&sc->sc_send_lock);
859 ret = sc->sc_sock->ops->sendpage(sc->sc_sock, 864 ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
860 virt_to_page(kmalloced_virt), 865 virt_to_page(kmalloced_virt),
861 (long)kmalloced_virt & ~PAGE_MASK, 866 (long)kmalloced_virt & ~PAGE_MASK,
862 size, MSG_DONTWAIT); 867 size, MSG_DONTWAIT);
868 mutex_unlock(&sc->sc_send_lock);
863 if (ret != size) { 869 if (ret != size) {
864 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT 870 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
865 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret); 871 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
@@ -974,8 +980,10 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
974 980
975 /* finally, convert the message header to network byte-order 981 /* finally, convert the message header to network byte-order
976 * and send */ 982 * and send */
983 mutex_lock(&sc->sc_send_lock);
977 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen, 984 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
978 sizeof(struct o2net_msg) + caller_bytes); 985 sizeof(struct o2net_msg) + caller_bytes);
986 mutex_unlock(&sc->sc_send_lock);
979 msglog(msg, "sending returned %d\n", ret); 987 msglog(msg, "sending returned %d\n", ret);
980 if (ret < 0) { 988 if (ret < 0) {
981 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret); 989 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
@@ -1049,6 +1057,7 @@ static int o2net_process_message(struct o2net_sock_container *sc,
1049 int ret = 0, handler_status; 1057 int ret = 0, handler_status;
1050 enum o2net_system_error syserr; 1058 enum o2net_system_error syserr;
1051 struct o2net_msg_handler *nmh = NULL; 1059 struct o2net_msg_handler *nmh = NULL;
1060 void *ret_data = NULL;
1052 1061
1053 msglog(hdr, "processing message\n"); 1062 msglog(hdr, "processing message\n");
1054 1063
@@ -1101,17 +1110,26 @@ static int o2net_process_message(struct o2net_sock_container *sc,
1101 sc->sc_msg_type = be16_to_cpu(hdr->msg_type); 1110 sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
1102 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + 1111 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
1103 be16_to_cpu(hdr->data_len), 1112 be16_to_cpu(hdr->data_len),
1104 nmh->nh_func_data); 1113 nmh->nh_func_data, &ret_data);
1105 do_gettimeofday(&sc->sc_tv_func_stop); 1114 do_gettimeofday(&sc->sc_tv_func_stop);
1106 1115
1107out_respond: 1116out_respond:
1108 /* this destroys the hdr, so don't use it after this */ 1117 /* this destroys the hdr, so don't use it after this */
1118 mutex_lock(&sc->sc_send_lock);
1109 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, 1119 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
1110 handler_status); 1120 handler_status);
1121 mutex_unlock(&sc->sc_send_lock);
1111 hdr = NULL; 1122 hdr = NULL;
1112 mlog(0, "sending handler status %d, syserr %d returned %d\n", 1123 mlog(0, "sending handler status %d, syserr %d returned %d\n",
1113 handler_status, syserr, ret); 1124 handler_status, syserr, ret);
1114 1125
1126 if (nmh) {
1127 BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
1128 if (nmh->nh_post_func)
1129 (nmh->nh_post_func)(handler_status, nmh->nh_func_data,
1130 ret_data);
1131 }
1132
1115out: 1133out:
1116 if (nmh) 1134 if (nmh)
1117 o2net_handler_put(nmh); 1135 o2net_handler_put(nmh);
@@ -1795,13 +1813,13 @@ out:
1795 ready(sk, bytes); 1813 ready(sk, bytes);
1796} 1814}
1797 1815
1798static int o2net_open_listening_sock(__be16 port) 1816static int o2net_open_listening_sock(__be32 addr, __be16 port)
1799{ 1817{
1800 struct socket *sock = NULL; 1818 struct socket *sock = NULL;
1801 int ret; 1819 int ret;
1802 struct sockaddr_in sin = { 1820 struct sockaddr_in sin = {
1803 .sin_family = PF_INET, 1821 .sin_family = PF_INET,
1804 .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) }, 1822 .sin_addr = { .s_addr = (__force u32)addr },
1805 .sin_port = (__force u16)port, 1823 .sin_port = (__force u16)port,
1806 }; 1824 };
1807 1825
@@ -1824,15 +1842,15 @@ static int o2net_open_listening_sock(__be16 port)
1824 sock->sk->sk_reuse = 1; 1842 sock->sk->sk_reuse = 1;
1825 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); 1843 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
1826 if (ret < 0) { 1844 if (ret < 0) {
1827 mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", 1845 mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, "
1828 ntohs(port), ret); 1846 "ret=%d\n", NIPQUAD(addr), ntohs(port), ret);
1829 goto out; 1847 goto out;
1830 } 1848 }
1831 1849
1832 ret = sock->ops->listen(sock, 64); 1850 ret = sock->ops->listen(sock, 64);
1833 if (ret < 0) { 1851 if (ret < 0) {
1834 mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", 1852 mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n",
1835 ntohs(port), ret); 1853 NIPQUAD(addr), ntohs(port), ret);
1836 } 1854 }
1837 1855
1838out: 1856out:
@@ -1865,7 +1883,8 @@ int o2net_start_listening(struct o2nm_node *node)
1865 return -ENOMEM; /* ? */ 1883 return -ENOMEM; /* ? */
1866 } 1884 }
1867 1885
1868 ret = o2net_open_listening_sock(node->nd_ipv4_port); 1886 ret = o2net_open_listening_sock(node->nd_ipv4_address,
1887 node->nd_ipv4_port);
1869 if (ret) { 1888 if (ret) {
1870 destroy_workqueue(o2net_wq); 1889 destroy_workqueue(o2net_wq);
1871 o2net_wq = NULL; 1890 o2net_wq = NULL;
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 21a4e43df836..da880fc215f0 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -50,7 +50,10 @@ struct o2net_msg
50 __u8 buf[0]; 50 __u8 buf[0];
51}; 51};
52 52
53typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data); 53typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data,
54 void **ret_data);
55typedef void (o2net_post_msg_handler_func)(int status, void *data,
56 void *ret_data);
54 57
55#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg)) 58#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg))
56 59
@@ -99,6 +102,7 @@ int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
99 102
100int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, 103int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
101 o2net_msg_handler_func *func, void *data, 104 o2net_msg_handler_func *func, void *data,
105 o2net_post_msg_handler_func *post_func,
102 struct list_head *unreg_list); 106 struct list_head *unreg_list);
103void o2net_unregister_handler_list(struct list_head *list); 107void o2net_unregister_handler_list(struct list_head *list);
104 108
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index b700dc9624d1..4dae5df5e467 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,12 @@
38 * locking semantics of the file system using the protocol. It should 38 * locking semantics of the file system using the protocol. It should
39 * be somewhere else, I'm sure, but right now it isn't. 39 * be somewhere else, I'm sure, but right now it isn't.
40 * 40 *
41 * New in version 7:
42 * - DLM join domain includes the live nodemap
43 *
44 * New in version 6:
45 * - DLM lockres remote refcount fixes.
46 *
41 * New in version 5: 47 * New in version 5:
42 * - Network timeout checking protocol 48 * - Network timeout checking protocol
43 * 49 *
@@ -51,7 +57,7 @@
51 * - full 64 bit i_size in the metadata lock lvbs 57 * - full 64 bit i_size in the metadata lock lvbs
52 * - introduction of "rw" lock and pushing meta/data locking down 58 * - introduction of "rw" lock and pushing meta/data locking down
53 */ 59 */
54#define O2NET_PROTOCOL_VERSION 5ULL 60#define O2NET_PROTOCOL_VERSION 7ULL
55struct o2net_handshake { 61struct o2net_handshake {
56 __be64 protocol_version; 62 __be64 protocol_version;
57 __be64 connector_id; 63 __be64 connector_id;
@@ -149,6 +155,8 @@ struct o2net_sock_container {
149 struct timeval sc_tv_func_stop; 155 struct timeval sc_tv_func_stop;
150 u32 sc_msg_key; 156 u32 sc_msg_key;
151 u16 sc_msg_type; 157 u16 sc_msg_type;
158
159 struct mutex sc_send_lock;
152}; 160};
153 161
154struct o2net_msg_handler { 162struct o2net_msg_handler {
@@ -158,6 +166,8 @@ struct o2net_msg_handler {
158 u32 nh_key; 166 u32 nh_key;
159 o2net_msg_handler_func *nh_func; 167 o2net_msg_handler_func *nh_func;
160 o2net_msg_handler_func *nh_func_data; 168 o2net_msg_handler_func *nh_func_data;
169 o2net_post_msg_handler_func
170 *nh_post_func;
161 struct kref nh_kref; 171 struct kref nh_kref;
162 struct list_head nh_unregister_item; 172 struct list_head nh_unregister_item;
163}; 173};
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 681046d51393..241cad342a48 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -263,7 +263,8 @@ void dlm_do_local_bast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
263 263
264 264
265 265
266int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data) 266int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
267 void **ret_data)
267{ 268{
268 int ret; 269 int ret;
269 unsigned int locklen; 270 unsigned int locklen;
@@ -311,8 +312,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
311 past->type != DLM_BAST) { 312 past->type != DLM_BAST) {
312 mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu" 313 mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu"
313 "name=%.*s\n", past->type, 314 "name=%.*s\n", past->type,
314 dlm_get_lock_cookie_node(cookie), 315 dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
315 dlm_get_lock_cookie_seq(cookie), 316 dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
316 locklen, name); 317 locklen, name);
317 ret = DLM_IVLOCKID; 318 ret = DLM_IVLOCKID;
318 goto leave; 319 goto leave;
@@ -323,8 +324,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
323 mlog(0, "got %sast for unknown lockres! " 324 mlog(0, "got %sast for unknown lockres! "
324 "cookie=%u:%llu, name=%.*s, namelen=%u\n", 325 "cookie=%u:%llu, name=%.*s, namelen=%u\n",
325 past->type == DLM_AST ? "" : "b", 326 past->type == DLM_AST ? "" : "b",
326 dlm_get_lock_cookie_node(cookie), 327 dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
327 dlm_get_lock_cookie_seq(cookie), 328 dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
328 locklen, name, locklen); 329 locklen, name, locklen);
329 ret = DLM_IVLOCKID; 330 ret = DLM_IVLOCKID;
330 goto leave; 331 goto leave;
@@ -369,7 +370,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
369 370
370 mlog(0, "got %sast for unknown lock! cookie=%u:%llu, " 371 mlog(0, "got %sast for unknown lock! cookie=%u:%llu, "
371 "name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b", 372 "name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b",
372 dlm_get_lock_cookie_node(cookie), dlm_get_lock_cookie_seq(cookie), 373 dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
374 dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
373 locklen, name, locklen); 375 locklen, name, locklen);
374 376
375 ret = DLM_NORMAL; 377 ret = DLM_NORMAL;
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 6b6ff76538c5..e90b92f9ece1 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -180,6 +180,11 @@ struct dlm_assert_master_priv
180 unsigned ignore_higher:1; 180 unsigned ignore_higher:1;
181}; 181};
182 182
183struct dlm_deref_lockres_priv
184{
185 struct dlm_lock_resource *deref_res;
186 u8 deref_node;
187};
183 188
184struct dlm_work_item 189struct dlm_work_item
185{ 190{
@@ -191,6 +196,7 @@ struct dlm_work_item
191 struct dlm_request_all_locks_priv ral; 196 struct dlm_request_all_locks_priv ral;
192 struct dlm_mig_lockres_priv ml; 197 struct dlm_mig_lockres_priv ml;
193 struct dlm_assert_master_priv am; 198 struct dlm_assert_master_priv am;
199 struct dlm_deref_lockres_priv dl;
194 } u; 200 } u;
195}; 201};
196 202
@@ -222,6 +228,9 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
222#define DLM_LOCK_RES_DIRTY 0x00000008 228#define DLM_LOCK_RES_DIRTY 0x00000008
223#define DLM_LOCK_RES_IN_PROGRESS 0x00000010 229#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
224#define DLM_LOCK_RES_MIGRATING 0x00000020 230#define DLM_LOCK_RES_MIGRATING 0x00000020
231#define DLM_LOCK_RES_DROPPING_REF 0x00000040
232#define DLM_LOCK_RES_BLOCK_DIRTY 0x00001000
233#define DLM_LOCK_RES_SETREF_INPROG 0x00002000
225 234
226/* max milliseconds to wait to sync up a network failure with a node death */ 235/* max milliseconds to wait to sync up a network failure with a node death */
227#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000) 236#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
@@ -265,6 +274,8 @@ struct dlm_lock_resource
265 u8 owner; //node which owns the lock resource, or unknown 274 u8 owner; //node which owns the lock resource, or unknown
266 u16 state; 275 u16 state;
267 char lvb[DLM_LVB_LEN]; 276 char lvb[DLM_LVB_LEN];
277 unsigned int inflight_locks;
278 unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
268}; 279};
269 280
270struct dlm_migratable_lock 281struct dlm_migratable_lock
@@ -367,7 +378,7 @@ enum {
367 DLM_CONVERT_LOCK_MSG, /* 504 */ 378 DLM_CONVERT_LOCK_MSG, /* 504 */
368 DLM_PROXY_AST_MSG, /* 505 */ 379 DLM_PROXY_AST_MSG, /* 505 */
369 DLM_UNLOCK_LOCK_MSG, /* 506 */ 380 DLM_UNLOCK_LOCK_MSG, /* 506 */
370 DLM_UNUSED_MSG2, /* 507 */ 381 DLM_DEREF_LOCKRES_MSG, /* 507 */
371 DLM_MIGRATE_REQUEST_MSG, /* 508 */ 382 DLM_MIGRATE_REQUEST_MSG, /* 508 */
372 DLM_MIG_LOCKRES_MSG, /* 509 */ 383 DLM_MIG_LOCKRES_MSG, /* 509 */
373 DLM_QUERY_JOIN_MSG, /* 510 */ 384 DLM_QUERY_JOIN_MSG, /* 510 */
@@ -417,6 +428,9 @@ struct dlm_master_request
417 u8 name[O2NM_MAX_NAME_LEN]; 428 u8 name[O2NM_MAX_NAME_LEN];
418}; 429};
419 430
431#define DLM_ASSERT_RESPONSE_REASSERT 0x00000001
432#define DLM_ASSERT_RESPONSE_MASTERY_REF 0x00000002
433
420#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001 434#define DLM_ASSERT_MASTER_MLE_CLEANUP 0x00000001
421#define DLM_ASSERT_MASTER_REQUERY 0x00000002 435#define DLM_ASSERT_MASTER_REQUERY 0x00000002
422#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004 436#define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
@@ -430,6 +444,8 @@ struct dlm_assert_master
430 u8 name[O2NM_MAX_NAME_LEN]; 444 u8 name[O2NM_MAX_NAME_LEN];
431}; 445};
432 446
447#define DLM_MIGRATE_RESPONSE_MASTERY_REF 0x00000001
448
433struct dlm_migrate_request 449struct dlm_migrate_request
434{ 450{
435 u8 master; 451 u8 master;
@@ -609,12 +625,16 @@ struct dlm_begin_reco
609}; 625};
610 626
611 627
628#define BITS_PER_BYTE 8
629#define BITS_TO_BYTES(bits) (((bits)+BITS_PER_BYTE-1)/BITS_PER_BYTE)
630
612struct dlm_query_join_request 631struct dlm_query_join_request
613{ 632{
614 u8 node_idx; 633 u8 node_idx;
615 u8 pad1[2]; 634 u8 pad1[2];
616 u8 name_len; 635 u8 name_len;
617 u8 domain[O2NM_MAX_NAME_LEN]; 636 u8 domain[O2NM_MAX_NAME_LEN];
637 u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
618}; 638};
619 639
620struct dlm_assert_joined 640struct dlm_assert_joined
@@ -648,6 +668,16 @@ struct dlm_finalize_reco
648 __be32 pad2; 668 __be32 pad2;
649}; 669};
650 670
671struct dlm_deref_lockres
672{
673 u32 pad1;
674 u16 pad2;
675 u8 node_idx;
676 u8 namelen;
677
678 u8 name[O2NM_MAX_NAME_LEN];
679};
680
651static inline enum dlm_status 681static inline enum dlm_status
652__dlm_lockres_state_to_status(struct dlm_lock_resource *res) 682__dlm_lockres_state_to_status(struct dlm_lock_resource *res)
653{ 683{
@@ -688,16 +718,20 @@ void dlm_lock_put(struct dlm_lock *lock);
688void dlm_lock_attach_lockres(struct dlm_lock *lock, 718void dlm_lock_attach_lockres(struct dlm_lock *lock,
689 struct dlm_lock_resource *res); 719 struct dlm_lock_resource *res);
690 720
691int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data); 721int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
692int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data); 722 void **ret_data);
693int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data); 723int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
724 void **ret_data);
725int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
726 void **ret_data);
694 727
695void dlm_revert_pending_convert(struct dlm_lock_resource *res, 728void dlm_revert_pending_convert(struct dlm_lock_resource *res,
696 struct dlm_lock *lock); 729 struct dlm_lock *lock);
697void dlm_revert_pending_lock(struct dlm_lock_resource *res, 730void dlm_revert_pending_lock(struct dlm_lock_resource *res,
698 struct dlm_lock *lock); 731 struct dlm_lock *lock);
699 732
700int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data); 733int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
734 void **ret_data);
701void dlm_commit_pending_cancel(struct dlm_lock_resource *res, 735void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
702 struct dlm_lock *lock); 736 struct dlm_lock *lock);
703void dlm_commit_pending_unlock(struct dlm_lock_resource *res, 737void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
@@ -721,8 +755,6 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
721 struct dlm_lock_resource *res); 755 struct dlm_lock_resource *res);
722void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 756void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
723 struct dlm_lock_resource *res); 757 struct dlm_lock_resource *res);
724void dlm_purge_lockres(struct dlm_ctxt *dlm,
725 struct dlm_lock_resource *lockres);
726static inline void dlm_lockres_get(struct dlm_lock_resource *res) 758static inline void dlm_lockres_get(struct dlm_lock_resource *res)
727{ 759{
728 /* This is called on every lookup, so it might be worth 760 /* This is called on every lookup, so it might be worth
@@ -733,6 +765,10 @@ void dlm_lockres_put(struct dlm_lock_resource *res);
733void __dlm_unhash_lockres(struct dlm_lock_resource *res); 765void __dlm_unhash_lockres(struct dlm_lock_resource *res);
734void __dlm_insert_lockres(struct dlm_ctxt *dlm, 766void __dlm_insert_lockres(struct dlm_ctxt *dlm,
735 struct dlm_lock_resource *res); 767 struct dlm_lock_resource *res);
768struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
769 const char *name,
770 unsigned int len,
771 unsigned int hash);
736struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 772struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
737 const char *name, 773 const char *name,
738 unsigned int len, 774 unsigned int len,
@@ -753,6 +789,47 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
753 const char *name, 789 const char *name,
754 unsigned int namelen); 790 unsigned int namelen);
755 791
792#define dlm_lockres_set_refmap_bit(bit,res) \
793 __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
794#define dlm_lockres_clear_refmap_bit(bit,res) \
795 __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
796
797static inline void __dlm_lockres_set_refmap_bit(int bit,
798 struct dlm_lock_resource *res,
799 const char *file,
800 int line)
801{
802 //printk("%s:%d:%.*s: setting bit %d\n", file, line,
803 // res->lockname.len, res->lockname.name, bit);
804 set_bit(bit, res->refmap);
805}
806
807static inline void __dlm_lockres_clear_refmap_bit(int bit,
808 struct dlm_lock_resource *res,
809 const char *file,
810 int line)
811{
812 //printk("%s:%d:%.*s: clearing bit %d\n", file, line,
813 // res->lockname.len, res->lockname.name, bit);
814 clear_bit(bit, res->refmap);
815}
816
817void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
818 struct dlm_lock_resource *res,
819 const char *file,
820 int line);
821void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
822 struct dlm_lock_resource *res,
823 int new_lockres,
824 const char *file,
825 int line);
826#define dlm_lockres_drop_inflight_ref(d,r) \
827 __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
828#define dlm_lockres_grab_inflight_ref(d,r) \
829 __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
830#define dlm_lockres_grab_inflight_ref_new(d,r) \
831 __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
832
756void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 833void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
757void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); 834void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
758void dlm_do_local_ast(struct dlm_ctxt *dlm, 835void dlm_do_local_ast(struct dlm_ctxt *dlm,
@@ -801,10 +878,7 @@ int dlm_heartbeat_init(struct dlm_ctxt *dlm);
801void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data); 878void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data);
802void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data); 879void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data);
803 880
804int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); 881int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
805int dlm_migrate_lockres(struct dlm_ctxt *dlm,
806 struct dlm_lock_resource *res,
807 u8 target);
808int dlm_finish_migration(struct dlm_ctxt *dlm, 882int dlm_finish_migration(struct dlm_ctxt *dlm,
809 struct dlm_lock_resource *res, 883 struct dlm_lock_resource *res,
810 u8 old_master); 884 u8 old_master);
@@ -812,15 +886,27 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
812 struct dlm_lock_resource *res); 886 struct dlm_lock_resource *res);
813void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res); 887void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
814 888
815int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data); 889int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
816int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data); 890 void **ret_data);
817int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data); 891int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
818int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data); 892 void **ret_data);
819int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data); 893void dlm_assert_master_post_handler(int status, void *data, void *ret_data);
820int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data); 894int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
821int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data); 895 void **ret_data);
822int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data); 896int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
823int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data); 897 void **ret_data);
898int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
899 void **ret_data);
900int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
901 void **ret_data);
902int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
903 void **ret_data);
904int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
905 void **ret_data);
906int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
907 void **ret_data);
908int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
909 void **ret_data);
824int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 910int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
825 u8 nodenum, u8 *real_master); 911 u8 nodenum, u8 *real_master);
826 912
@@ -856,10 +942,12 @@ static inline void __dlm_wait_on_lockres(struct dlm_lock_resource *res)
856int dlm_init_mle_cache(void); 942int dlm_init_mle_cache(void);
857void dlm_destroy_mle_cache(void); 943void dlm_destroy_mle_cache(void);
858void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up); 944void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
945int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
946 struct dlm_lock_resource *res);
859void dlm_clean_master_list(struct dlm_ctxt *dlm, 947void dlm_clean_master_list(struct dlm_ctxt *dlm,
860 u8 dead_node); 948 u8 dead_node);
861int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); 949int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
862 950int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
863int __dlm_lockres_unused(struct dlm_lock_resource *res); 951int __dlm_lockres_unused(struct dlm_lock_resource *res);
864 952
865static inline const char * dlm_lock_mode_name(int mode) 953static inline const char * dlm_lock_mode_name(int mode)
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index c764dc8e40a2..ecb4d997221e 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -286,8 +286,8 @@ enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
286 __dlm_print_one_lock_resource(res); 286 __dlm_print_one_lock_resource(res);
287 mlog(ML_ERROR, "converting a remote lock that is already " 287 mlog(ML_ERROR, "converting a remote lock that is already "
288 "converting! (cookie=%u:%llu, conv=%d)\n", 288 "converting! (cookie=%u:%llu, conv=%d)\n",
289 dlm_get_lock_cookie_node(lock->ml.cookie), 289 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
290 dlm_get_lock_cookie_seq(lock->ml.cookie), 290 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
291 lock->ml.convert_type); 291 lock->ml.convert_type);
292 status = DLM_DENIED; 292 status = DLM_DENIED;
293 goto bail; 293 goto bail;
@@ -418,7 +418,8 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
418 * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS, 418 * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
419 * status from __dlmconvert_master 419 * status from __dlmconvert_master
420 */ 420 */
421int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) 421int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
422 void **ret_data)
422{ 423{
423 struct dlm_ctxt *dlm = data; 424 struct dlm_ctxt *dlm = data;
424 struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf; 425 struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
@@ -428,7 +429,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
428 struct dlm_lockstatus *lksb; 429 struct dlm_lockstatus *lksb;
429 enum dlm_status status = DLM_NORMAL; 430 enum dlm_status status = DLM_NORMAL;
430 u32 flags; 431 u32 flags;
431 int call_ast = 0, kick_thread = 0, ast_reserved = 0; 432 int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
432 433
433 if (!dlm_grab(dlm)) { 434 if (!dlm_grab(dlm)) {
434 dlm_error(DLM_REJECTED); 435 dlm_error(DLM_REJECTED);
@@ -479,25 +480,14 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
479 } 480 }
480 lock = NULL; 481 lock = NULL;
481 } 482 }
482 if (!lock) {
483 __dlm_print_one_lock_resource(res);
484 list_for_each(iter, &res->granted) {
485 lock = list_entry(iter, struct dlm_lock, list);
486 if (lock->ml.node == cnv->node_idx) {
487 mlog(ML_ERROR, "There is something here "
488 "for node %u, lock->ml.cookie=%llu, "
489 "cnv->cookie=%llu\n", cnv->node_idx,
490 (unsigned long long)lock->ml.cookie,
491 (unsigned long long)cnv->cookie);
492 break;
493 }
494 }
495 lock = NULL;
496 }
497 spin_unlock(&res->spinlock); 483 spin_unlock(&res->spinlock);
498 if (!lock) { 484 if (!lock) {
499 status = DLM_IVLOCKID; 485 status = DLM_IVLOCKID;
500 dlm_error(status); 486 mlog(ML_ERROR, "did not find lock to convert on grant queue! "
487 "cookie=%u:%llu\n",
488 dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
489 dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
490 __dlm_print_one_lock_resource(res);
501 goto leave; 491 goto leave;
502 } 492 }
503 493
@@ -524,8 +514,11 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
524 cnv->requested_type, 514 cnv->requested_type,
525 &call_ast, &kick_thread); 515 &call_ast, &kick_thread);
526 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 516 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
517 wake = 1;
527 } 518 }
528 spin_unlock(&res->spinlock); 519 spin_unlock(&res->spinlock);
520 if (wake)
521 wake_up(&res->wq);
529 522
530 if (status != DLM_NORMAL) { 523 if (status != DLM_NORMAL) {
531 if (status != DLM_NOTQUEUED) 524 if (status != DLM_NOTQUEUED)
@@ -534,12 +527,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
534 } 527 }
535 528
536leave: 529leave:
537 if (!lock) 530 if (lock)
538 mlog(ML_ERROR, "did not find lock to convert on grant queue! "
539 "cookie=%u:%llu\n",
540 dlm_get_lock_cookie_node(cnv->cookie),
541 dlm_get_lock_cookie_seq(cnv->cookie));
542 else
543 dlm_lock_put(lock); 531 dlm_lock_put(lock);
544 532
545 /* either queue the ast or release it, if reserved */ 533 /* either queue the ast or release it, if reserved */
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 3f6c8d88f7af..64239b37e5d4 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -53,6 +53,23 @@ void dlm_print_one_lock_resource(struct dlm_lock_resource *res)
53 spin_unlock(&res->spinlock); 53 spin_unlock(&res->spinlock);
54} 54}
55 55
56static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
57{
58 int bit;
59 assert_spin_locked(&res->spinlock);
60
61 mlog(ML_NOTICE, " refmap nodes: [ ");
62 bit = 0;
63 while (1) {
64 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
65 if (bit >= O2NM_MAX_NODES)
66 break;
67 printk("%u ", bit);
68 bit++;
69 }
70 printk("], inflight=%u\n", res->inflight_locks);
71}
72
56void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) 73void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
57{ 74{
58 struct list_head *iter2; 75 struct list_head *iter2;
@@ -65,6 +82,7 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
65 res->owner, res->state); 82 res->owner, res->state);
66 mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n", 83 mlog(ML_NOTICE, " last used: %lu, on purge list: %s\n",
67 res->last_used, list_empty(&res->purge) ? "no" : "yes"); 84 res->last_used, list_empty(&res->purge) ? "no" : "yes");
85 dlm_print_lockres_refmap(res);
68 mlog(ML_NOTICE, " granted queue: \n"); 86 mlog(ML_NOTICE, " granted queue: \n");
69 list_for_each(iter2, &res->granted) { 87 list_for_each(iter2, &res->granted) {
70 lock = list_entry(iter2, struct dlm_lock, list); 88 lock = list_entry(iter2, struct dlm_lock, list);
@@ -72,8 +90,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
72 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " 90 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
73 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 91 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
74 lock->ml.type, lock->ml.convert_type, lock->ml.node, 92 lock->ml.type, lock->ml.convert_type, lock->ml.node,
75 dlm_get_lock_cookie_node(lock->ml.cookie), 93 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
76 dlm_get_lock_cookie_seq(lock->ml.cookie), 94 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
77 list_empty(&lock->ast_list) ? 'y' : 'n', 95 list_empty(&lock->ast_list) ? 'y' : 'n',
78 lock->ast_pending ? 'y' : 'n', 96 lock->ast_pending ? 'y' : 'n',
79 list_empty(&lock->bast_list) ? 'y' : 'n', 97 list_empty(&lock->bast_list) ? 'y' : 'n',
@@ -87,8 +105,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
87 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " 105 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
88 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 106 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
89 lock->ml.type, lock->ml.convert_type, lock->ml.node, 107 lock->ml.type, lock->ml.convert_type, lock->ml.node,
90 dlm_get_lock_cookie_node(lock->ml.cookie), 108 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
91 dlm_get_lock_cookie_seq(lock->ml.cookie), 109 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
92 list_empty(&lock->ast_list) ? 'y' : 'n', 110 list_empty(&lock->ast_list) ? 'y' : 'n',
93 lock->ast_pending ? 'y' : 'n', 111 lock->ast_pending ? 'y' : 'n',
94 list_empty(&lock->bast_list) ? 'y' : 'n', 112 list_empty(&lock->bast_list) ? 'y' : 'n',
@@ -102,8 +120,8 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
102 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, " 120 mlog(ML_NOTICE, " type=%d, conv=%d, node=%u, "
103 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 121 "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n",
104 lock->ml.type, lock->ml.convert_type, lock->ml.node, 122 lock->ml.type, lock->ml.convert_type, lock->ml.node,
105 dlm_get_lock_cookie_node(lock->ml.cookie), 123 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
106 dlm_get_lock_cookie_seq(lock->ml.cookie), 124 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
107 list_empty(&lock->ast_list) ? 'y' : 'n', 125 list_empty(&lock->ast_list) ? 'y' : 'n',
108 lock->ast_pending ? 'y' : 'n', 126 lock->ast_pending ? 'y' : 'n',
109 list_empty(&lock->bast_list) ? 'y' : 'n', 127 list_empty(&lock->bast_list) ? 'y' : 'n',
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index f0b25f2dd205..6087c4749fee 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -48,6 +48,36 @@
48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) 48#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49#include "cluster/masklog.h" 49#include "cluster/masklog.h"
50 50
51/*
52 * ocfs2 node maps are array of long int, which limits to send them freely
53 * across the wire due to endianness issues. To workaround this, we convert
54 * long ints to byte arrays. Following 3 routines are helper functions to
55 * set/test/copy bits within those array of bytes
56 */
57static inline void byte_set_bit(u8 nr, u8 map[])
58{
59 map[nr >> 3] |= (1UL << (nr & 7));
60}
61
62static inline int byte_test_bit(u8 nr, u8 map[])
63{
64 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65}
66
67static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68 unsigned int sz)
69{
70 unsigned int nn;
71
72 if (!sz)
73 return;
74
75 memset(dmap, 0, ((sz + 7) >> 3));
76 for (nn = 0 ; nn < sz; nn++)
77 if (test_bit(nn, smap))
78 byte_set_bit(nn, dmap);
79}
80
51static void dlm_free_pagevec(void **vec, int pages) 81static void dlm_free_pagevec(void **vec, int pages)
52{ 82{
53 while (pages--) 83 while (pages--)
@@ -95,10 +125,14 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
95 125
96#define DLM_DOMAIN_BACKOFF_MS 200 126#define DLM_DOMAIN_BACKOFF_MS 200
97 127
98static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data); 128static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
99static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data); 129 void **ret_data);
100static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data); 130static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
101static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data); 131 void **ret_data);
132static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
133 void **ret_data);
134static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
135 void **ret_data);
102 136
103static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); 137static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
104 138
@@ -125,10 +159,10 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
125 hlist_add_head(&res->hash_node, bucket); 159 hlist_add_head(&res->hash_node, bucket);
126} 160}
127 161
128struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 162struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
129 const char *name, 163 const char *name,
130 unsigned int len, 164 unsigned int len,
131 unsigned int hash) 165 unsigned int hash)
132{ 166{
133 struct hlist_head *bucket; 167 struct hlist_head *bucket;
134 struct hlist_node *list; 168 struct hlist_node *list;
@@ -154,6 +188,37 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
154 return NULL; 188 return NULL;
155} 189}
156 190
191/* intended to be called by functions which do not care about lock
192 * resources which are being purged (most net _handler functions).
193 * this will return NULL for any lock resource which is found but
194 * currently in the process of dropping its mastery reference.
195 * use __dlm_lookup_lockres_full when you need the lock resource
196 * regardless (e.g. dlm_get_lock_resource) */
197struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
198 const char *name,
199 unsigned int len,
200 unsigned int hash)
201{
202 struct dlm_lock_resource *res = NULL;
203
204 mlog_entry("%.*s\n", len, name);
205
206 assert_spin_locked(&dlm->spinlock);
207
208 res = __dlm_lookup_lockres_full(dlm, name, len, hash);
209 if (res) {
210 spin_lock(&res->spinlock);
211 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
212 spin_unlock(&res->spinlock);
213 dlm_lockres_put(res);
214 return NULL;
215 }
216 spin_unlock(&res->spinlock);
217 }
218
219 return res;
220}
221
157struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, 222struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
158 const char *name, 223 const char *name,
159 unsigned int len) 224 unsigned int len)
@@ -330,43 +395,60 @@ static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
330 wake_up(&dlm_domain_events); 395 wake_up(&dlm_domain_events);
331} 396}
332 397
333static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) 398static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
334{ 399{
335 int i; 400 int i, num, n, ret = 0;
336 struct dlm_lock_resource *res; 401 struct dlm_lock_resource *res;
402 struct hlist_node *iter;
403 struct hlist_head *bucket;
404 int dropped;
337 405
338 mlog(0, "Migrating locks from domain %s\n", dlm->name); 406 mlog(0, "Migrating locks from domain %s\n", dlm->name);
339restart: 407
408 num = 0;
340 spin_lock(&dlm->spinlock); 409 spin_lock(&dlm->spinlock);
341 for (i = 0; i < DLM_HASH_BUCKETS; i++) { 410 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
342 while (!hlist_empty(dlm_lockres_hash(dlm, i))) { 411redo_bucket:
343 res = hlist_entry(dlm_lockres_hash(dlm, i)->first, 412 n = 0;
344 struct dlm_lock_resource, hash_node); 413 bucket = dlm_lockres_hash(dlm, i);
345 /* need reference when manually grabbing lockres */ 414 iter = bucket->first;
415 while (iter) {
416 n++;
417 res = hlist_entry(iter, struct dlm_lock_resource,
418 hash_node);
346 dlm_lockres_get(res); 419 dlm_lockres_get(res);
347 /* this should unhash the lockres 420 /* migrate, if necessary. this will drop the dlm
348 * and exit with dlm->spinlock */ 421 * spinlock and retake it if it does migration. */
349 mlog(0, "purging res=%p\n", res); 422 dropped = dlm_empty_lockres(dlm, res);
350 if (dlm_lockres_is_dirty(dlm, res)) { 423
351 /* HACK! this should absolutely go. 424 spin_lock(&res->spinlock);
352 * need to figure out why some empty 425 __dlm_lockres_calc_usage(dlm, res);
353 * lockreses are still marked dirty */ 426 iter = res->hash_node.next;
354 mlog(ML_ERROR, "lockres %.*s dirty!\n", 427 spin_unlock(&res->spinlock);
355 res->lockname.len, res->lockname.name); 428
356
357 spin_unlock(&dlm->spinlock);
358 dlm_kick_thread(dlm, res);
359 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
360 dlm_lockres_put(res);
361 goto restart;
362 }
363 dlm_purge_lockres(dlm, res);
364 dlm_lockres_put(res); 429 dlm_lockres_put(res);
430
431 cond_resched_lock(&dlm->spinlock);
432
433 if (dropped)
434 goto redo_bucket;
365 } 435 }
436 num += n;
437 mlog(0, "%s: touched %d lockreses in bucket %d "
438 "(tot=%d)\n", dlm->name, n, i, num);
366 } 439 }
367 spin_unlock(&dlm->spinlock); 440 spin_unlock(&dlm->spinlock);
368 441 wake_up(&dlm->dlm_thread_wq);
442
443 /* let the dlm thread take care of purging, keep scanning until
444 * nothing remains in the hash */
445 if (num) {
446 mlog(0, "%s: %d lock resources in hash last pass\n",
447 dlm->name, num);
448 ret = -EAGAIN;
449 }
369 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name); 450 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
451 return ret;
370} 452}
371 453
372static int dlm_no_joining_node(struct dlm_ctxt *dlm) 454static int dlm_no_joining_node(struct dlm_ctxt *dlm)
@@ -418,7 +500,8 @@ static void __dlm_print_nodes(struct dlm_ctxt *dlm)
418 printk("\n"); 500 printk("\n");
419} 501}
420 502
421static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data) 503static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
504 void **ret_data)
422{ 505{
423 struct dlm_ctxt *dlm = data; 506 struct dlm_ctxt *dlm = data;
424 unsigned int node; 507 unsigned int node;
@@ -571,7 +654,9 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
571 /* We changed dlm state, notify the thread */ 654 /* We changed dlm state, notify the thread */
572 dlm_kick_thread(dlm, NULL); 655 dlm_kick_thread(dlm, NULL);
573 656
574 dlm_migrate_all_locks(dlm); 657 while (dlm_migrate_all_locks(dlm)) {
658 mlog(0, "%s: more migration to do\n", dlm->name);
659 }
575 dlm_mark_domain_leaving(dlm); 660 dlm_mark_domain_leaving(dlm);
576 dlm_leave_domain(dlm); 661 dlm_leave_domain(dlm);
577 dlm_complete_dlm_shutdown(dlm); 662 dlm_complete_dlm_shutdown(dlm);
@@ -580,11 +665,13 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
580} 665}
581EXPORT_SYMBOL_GPL(dlm_unregister_domain); 666EXPORT_SYMBOL_GPL(dlm_unregister_domain);
582 667
583static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) 668static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
669 void **ret_data)
584{ 670{
585 struct dlm_query_join_request *query; 671 struct dlm_query_join_request *query;
586 enum dlm_query_join_response response; 672 enum dlm_query_join_response response;
587 struct dlm_ctxt *dlm = NULL; 673 struct dlm_ctxt *dlm = NULL;
674 u8 nodenum;
588 675
589 query = (struct dlm_query_join_request *) msg->buf; 676 query = (struct dlm_query_join_request *) msg->buf;
590 677
@@ -608,6 +695,28 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
608 695
609 spin_lock(&dlm_domain_lock); 696 spin_lock(&dlm_domain_lock);
610 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 697 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
698 if (!dlm)
699 goto unlock_respond;
700
701 /*
702 * There is a small window where the joining node may not see the
703 * node(s) that just left but still part of the cluster. DISALLOW
704 * join request if joining node has different node map.
705 */
706 nodenum=0;
707 while (nodenum < O2NM_MAX_NODES) {
708 if (test_bit(nodenum, dlm->domain_map)) {
709 if (!byte_test_bit(nodenum, query->node_map)) {
710 mlog(0, "disallow join as node %u does not "
711 "have node %u in its nodemap\n",
712 query->node_idx, nodenum);
713 response = JOIN_DISALLOW;
714 goto unlock_respond;
715 }
716 }
717 nodenum++;
718 }
719
611 /* Once the dlm ctxt is marked as leaving then we don't want 720 /* Once the dlm ctxt is marked as leaving then we don't want
612 * to be put in someone's domain map. 721 * to be put in someone's domain map.
613 * Also, explicitly disallow joining at certain troublesome 722 * Also, explicitly disallow joining at certain troublesome
@@ -626,15 +735,15 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
626 /* Disallow parallel joins. */ 735 /* Disallow parallel joins. */
627 response = JOIN_DISALLOW; 736 response = JOIN_DISALLOW;
628 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { 737 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
629 mlog(ML_NOTICE, "node %u trying to join, but recovery " 738 mlog(0, "node %u trying to join, but recovery "
630 "is ongoing.\n", bit); 739 "is ongoing.\n", bit);
631 response = JOIN_DISALLOW; 740 response = JOIN_DISALLOW;
632 } else if (test_bit(bit, dlm->recovery_map)) { 741 } else if (test_bit(bit, dlm->recovery_map)) {
633 mlog(ML_NOTICE, "node %u trying to join, but it " 742 mlog(0, "node %u trying to join, but it "
634 "still needs recovery.\n", bit); 743 "still needs recovery.\n", bit);
635 response = JOIN_DISALLOW; 744 response = JOIN_DISALLOW;
636 } else if (test_bit(bit, dlm->domain_map)) { 745 } else if (test_bit(bit, dlm->domain_map)) {
637 mlog(ML_NOTICE, "node %u trying to join, but it " 746 mlog(0, "node %u trying to join, but it "
638 "is still in the domain! needs recovery?\n", 747 "is still in the domain! needs recovery?\n",
639 bit); 748 bit);
640 response = JOIN_DISALLOW; 749 response = JOIN_DISALLOW;
@@ -649,6 +758,7 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
649 758
650 spin_unlock(&dlm->spinlock); 759 spin_unlock(&dlm->spinlock);
651 } 760 }
761unlock_respond:
652 spin_unlock(&dlm_domain_lock); 762 spin_unlock(&dlm_domain_lock);
653 763
654respond: 764respond:
@@ -657,7 +767,8 @@ respond:
657 return response; 767 return response;
658} 768}
659 769
660static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data) 770static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
771 void **ret_data)
661{ 772{
662 struct dlm_assert_joined *assert; 773 struct dlm_assert_joined *assert;
663 struct dlm_ctxt *dlm = NULL; 774 struct dlm_ctxt *dlm = NULL;
@@ -694,7 +805,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
694 return 0; 805 return 0;
695} 806}
696 807
697static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data) 808static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
809 void **ret_data)
698{ 810{
699 struct dlm_cancel_join *cancel; 811 struct dlm_cancel_join *cancel;
700 struct dlm_ctxt *dlm = NULL; 812 struct dlm_ctxt *dlm = NULL;
@@ -796,6 +908,9 @@ static int dlm_request_join(struct dlm_ctxt *dlm,
796 join_msg.name_len = strlen(dlm->name); 908 join_msg.name_len = strlen(dlm->name);
797 memcpy(join_msg.domain, dlm->name, join_msg.name_len); 909 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
798 910
911 /* copy live node map to join message */
912 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
913
799 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg, 914 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
800 sizeof(join_msg), node, &retval); 915 sizeof(join_msg), node, &retval);
801 if (status < 0 && status != -ENOPROTOOPT) { 916 if (status < 0 && status != -ENOPROTOOPT) {
@@ -1036,98 +1151,106 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1036 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 1151 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1037 sizeof(struct dlm_master_request), 1152 sizeof(struct dlm_master_request),
1038 dlm_master_request_handler, 1153 dlm_master_request_handler,
1039 dlm, &dlm->dlm_domain_handlers); 1154 dlm, NULL, &dlm->dlm_domain_handlers);
1040 if (status) 1155 if (status)
1041 goto bail; 1156 goto bail;
1042 1157
1043 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 1158 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1044 sizeof(struct dlm_assert_master), 1159 sizeof(struct dlm_assert_master),
1045 dlm_assert_master_handler, 1160 dlm_assert_master_handler,
1046 dlm, &dlm->dlm_domain_handlers); 1161 dlm, dlm_assert_master_post_handler,
1162 &dlm->dlm_domain_handlers);
1047 if (status) 1163 if (status)
1048 goto bail; 1164 goto bail;
1049 1165
1050 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 1166 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1051 sizeof(struct dlm_create_lock), 1167 sizeof(struct dlm_create_lock),
1052 dlm_create_lock_handler, 1168 dlm_create_lock_handler,
1053 dlm, &dlm->dlm_domain_handlers); 1169 dlm, NULL, &dlm->dlm_domain_handlers);
1054 if (status) 1170 if (status)
1055 goto bail; 1171 goto bail;
1056 1172
1057 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key, 1173 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1058 DLM_CONVERT_LOCK_MAX_LEN, 1174 DLM_CONVERT_LOCK_MAX_LEN,
1059 dlm_convert_lock_handler, 1175 dlm_convert_lock_handler,
1060 dlm, &dlm->dlm_domain_handlers); 1176 dlm, NULL, &dlm->dlm_domain_handlers);
1061 if (status) 1177 if (status)
1062 goto bail; 1178 goto bail;
1063 1179
1064 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key, 1180 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1065 DLM_UNLOCK_LOCK_MAX_LEN, 1181 DLM_UNLOCK_LOCK_MAX_LEN,
1066 dlm_unlock_lock_handler, 1182 dlm_unlock_lock_handler,
1067 dlm, &dlm->dlm_domain_handlers); 1183 dlm, NULL, &dlm->dlm_domain_handlers);
1068 if (status) 1184 if (status)
1069 goto bail; 1185 goto bail;
1070 1186
1071 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key, 1187 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1072 DLM_PROXY_AST_MAX_LEN, 1188 DLM_PROXY_AST_MAX_LEN,
1073 dlm_proxy_ast_handler, 1189 dlm_proxy_ast_handler,
1074 dlm, &dlm->dlm_domain_handlers); 1190 dlm, NULL, &dlm->dlm_domain_handlers);
1075 if (status) 1191 if (status)
1076 goto bail; 1192 goto bail;
1077 1193
1078 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key, 1194 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1079 sizeof(struct dlm_exit_domain), 1195 sizeof(struct dlm_exit_domain),
1080 dlm_exit_domain_handler, 1196 dlm_exit_domain_handler,
1081 dlm, &dlm->dlm_domain_handlers); 1197 dlm, NULL, &dlm->dlm_domain_handlers);
1198 if (status)
1199 goto bail;
1200
1201 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1202 sizeof(struct dlm_deref_lockres),
1203 dlm_deref_lockres_handler,
1204 dlm, NULL, &dlm->dlm_domain_handlers);
1082 if (status) 1205 if (status)
1083 goto bail; 1206 goto bail;
1084 1207
1085 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key, 1208 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1086 sizeof(struct dlm_migrate_request), 1209 sizeof(struct dlm_migrate_request),
1087 dlm_migrate_request_handler, 1210 dlm_migrate_request_handler,
1088 dlm, &dlm->dlm_domain_handlers); 1211 dlm, NULL, &dlm->dlm_domain_handlers);
1089 if (status) 1212 if (status)
1090 goto bail; 1213 goto bail;
1091 1214
1092 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key, 1215 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1093 DLM_MIG_LOCKRES_MAX_LEN, 1216 DLM_MIG_LOCKRES_MAX_LEN,
1094 dlm_mig_lockres_handler, 1217 dlm_mig_lockres_handler,
1095 dlm, &dlm->dlm_domain_handlers); 1218 dlm, NULL, &dlm->dlm_domain_handlers);
1096 if (status) 1219 if (status)
1097 goto bail; 1220 goto bail;
1098 1221
1099 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key, 1222 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1100 sizeof(struct dlm_master_requery), 1223 sizeof(struct dlm_master_requery),
1101 dlm_master_requery_handler, 1224 dlm_master_requery_handler,
1102 dlm, &dlm->dlm_domain_handlers); 1225 dlm, NULL, &dlm->dlm_domain_handlers);
1103 if (status) 1226 if (status)
1104 goto bail; 1227 goto bail;
1105 1228
1106 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key, 1229 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1107 sizeof(struct dlm_lock_request), 1230 sizeof(struct dlm_lock_request),
1108 dlm_request_all_locks_handler, 1231 dlm_request_all_locks_handler,
1109 dlm, &dlm->dlm_domain_handlers); 1232 dlm, NULL, &dlm->dlm_domain_handlers);
1110 if (status) 1233 if (status)
1111 goto bail; 1234 goto bail;
1112 1235
1113 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key, 1236 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1114 sizeof(struct dlm_reco_data_done), 1237 sizeof(struct dlm_reco_data_done),
1115 dlm_reco_data_done_handler, 1238 dlm_reco_data_done_handler,
1116 dlm, &dlm->dlm_domain_handlers); 1239 dlm, NULL, &dlm->dlm_domain_handlers);
1117 if (status) 1240 if (status)
1118 goto bail; 1241 goto bail;
1119 1242
1120 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key, 1243 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1121 sizeof(struct dlm_begin_reco), 1244 sizeof(struct dlm_begin_reco),
1122 dlm_begin_reco_handler, 1245 dlm_begin_reco_handler,
1123 dlm, &dlm->dlm_domain_handlers); 1246 dlm, NULL, &dlm->dlm_domain_handlers);
1124 if (status) 1247 if (status)
1125 goto bail; 1248 goto bail;
1126 1249
1127 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key, 1250 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1128 sizeof(struct dlm_finalize_reco), 1251 sizeof(struct dlm_finalize_reco),
1129 dlm_finalize_reco_handler, 1252 dlm_finalize_reco_handler,
1130 dlm, &dlm->dlm_domain_handlers); 1253 dlm, NULL, &dlm->dlm_domain_handlers);
1131 if (status) 1254 if (status)
1132 goto bail; 1255 goto bail;
1133 1256
@@ -1141,6 +1264,8 @@ bail:
1141static int dlm_join_domain(struct dlm_ctxt *dlm) 1264static int dlm_join_domain(struct dlm_ctxt *dlm)
1142{ 1265{
1143 int status; 1266 int status;
1267 unsigned int backoff;
1268 unsigned int total_backoff = 0;
1144 1269
1145 BUG_ON(!dlm); 1270 BUG_ON(!dlm);
1146 1271
@@ -1172,18 +1297,27 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1172 } 1297 }
1173 1298
1174 do { 1299 do {
1175 unsigned int backoff;
1176 status = dlm_try_to_join_domain(dlm); 1300 status = dlm_try_to_join_domain(dlm);
1177 1301
1178 /* If we're racing another node to the join, then we 1302 /* If we're racing another node to the join, then we
1179 * need to back off temporarily and let them 1303 * need to back off temporarily and let them
1180 * complete. */ 1304 * complete. */
1305#define DLM_JOIN_TIMEOUT_MSECS 90000
1181 if (status == -EAGAIN) { 1306 if (status == -EAGAIN) {
1182 if (signal_pending(current)) { 1307 if (signal_pending(current)) {
1183 status = -ERESTARTSYS; 1308 status = -ERESTARTSYS;
1184 goto bail; 1309 goto bail;
1185 } 1310 }
1186 1311
1312 if (total_backoff >
1313 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1314 status = -ERESTARTSYS;
1315 mlog(ML_NOTICE, "Timed out joining dlm domain "
1316 "%s after %u msecs\n", dlm->name,
1317 jiffies_to_msecs(total_backoff));
1318 goto bail;
1319 }
1320
1187 /* 1321 /*
1188 * <chip> After you! 1322 * <chip> After you!
1189 * <dale> No, after you! 1323 * <dale> No, after you!
@@ -1193,6 +1327,7 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
1193 */ 1327 */
1194 backoff = (unsigned int)(jiffies & 0x3); 1328 backoff = (unsigned int)(jiffies & 0x3);
1195 backoff *= DLM_DOMAIN_BACKOFF_MS; 1329 backoff *= DLM_DOMAIN_BACKOFF_MS;
1330 total_backoff += backoff;
1196 mlog(0, "backoff %d\n", backoff); 1331 mlog(0, "backoff %d\n", backoff);
1197 msleep(backoff); 1332 msleep(backoff);
1198 } 1333 }
@@ -1421,21 +1556,21 @@ static int dlm_register_net_handlers(void)
1421 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 1556 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1422 sizeof(struct dlm_query_join_request), 1557 sizeof(struct dlm_query_join_request),
1423 dlm_query_join_handler, 1558 dlm_query_join_handler,
1424 NULL, &dlm_join_handlers); 1559 NULL, NULL, &dlm_join_handlers);
1425 if (status) 1560 if (status)
1426 goto bail; 1561 goto bail;
1427 1562
1428 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 1563 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1429 sizeof(struct dlm_assert_joined), 1564 sizeof(struct dlm_assert_joined),
1430 dlm_assert_joined_handler, 1565 dlm_assert_joined_handler,
1431 NULL, &dlm_join_handlers); 1566 NULL, NULL, &dlm_join_handlers);
1432 if (status) 1567 if (status)
1433 goto bail; 1568 goto bail;
1434 1569
1435 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 1570 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1436 sizeof(struct dlm_cancel_join), 1571 sizeof(struct dlm_cancel_join),
1437 dlm_cancel_join_handler, 1572 dlm_cancel_join_handler,
1438 NULL, &dlm_join_handlers); 1573 NULL, NULL, &dlm_join_handlers);
1439 1574
1440bail: 1575bail:
1441 if (status < 0) 1576 if (status < 0)
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index e5ca3db197f6..52578d907d9a 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -163,6 +163,10 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
163 kick_thread = 1; 163 kick_thread = 1;
164 } 164 }
165 } 165 }
166 /* reduce the inflight count, this may result in the lockres
167 * being purged below during calc_usage */
168 if (lock->ml.node == dlm->node_num)
169 dlm_lockres_drop_inflight_ref(dlm, res);
166 170
167 spin_unlock(&res->spinlock); 171 spin_unlock(&res->spinlock);
168 wake_up(&res->wq); 172 wake_up(&res->wq);
@@ -437,7 +441,8 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
437 * held on exit: none 441 * held on exit: none
438 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED 442 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
439 */ 443 */
440int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) 444int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
445 void **ret_data)
441{ 446{
442 struct dlm_ctxt *dlm = data; 447 struct dlm_ctxt *dlm = data;
443 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; 448 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0ad872055cb3..77e4e6169a0d 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -99,9 +99,10 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
99 int idx); 99 int idx);
100 100
101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); 101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 102static int dlm_do_assert_master(struct dlm_ctxt *dlm,
103 unsigned int namelen, void *nodemap, 103 struct dlm_lock_resource *res,
104 u32 flags); 104 void *nodemap, u32 flags);
105static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
105 106
106static inline int dlm_mle_equal(struct dlm_ctxt *dlm, 107static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 struct dlm_master_list_entry *mle, 108 struct dlm_master_list_entry *mle,
@@ -237,7 +238,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
237 struct dlm_master_list_entry **mle, 238 struct dlm_master_list_entry **mle,
238 char *name, unsigned int namelen); 239 char *name, unsigned int namelen);
239 240
240static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); 241static int dlm_do_master_request(struct dlm_lock_resource *res,
242 struct dlm_master_list_entry *mle, int to);
241 243
242 244
243static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, 245static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
@@ -687,6 +689,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
687 INIT_LIST_HEAD(&res->purge); 689 INIT_LIST_HEAD(&res->purge);
688 atomic_set(&res->asts_reserved, 0); 690 atomic_set(&res->asts_reserved, 0);
689 res->migration_pending = 0; 691 res->migration_pending = 0;
692 res->inflight_locks = 0;
690 693
691 kref_init(&res->refs); 694 kref_init(&res->refs);
692 695
@@ -700,6 +703,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
700 res->last_used = 0; 703 res->last_used = 0;
701 704
702 memset(res->lvb, 0, DLM_LVB_LEN); 705 memset(res->lvb, 0, DLM_LVB_LEN);
706 memset(res->refmap, 0, sizeof(res->refmap));
703} 707}
704 708
705struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, 709struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
@@ -722,6 +726,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
722 return res; 726 return res;
723} 727}
724 728
729void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
730 struct dlm_lock_resource *res,
731 int new_lockres,
732 const char *file,
733 int line)
734{
735 if (!new_lockres)
736 assert_spin_locked(&res->spinlock);
737
738 if (!test_bit(dlm->node_num, res->refmap)) {
739 BUG_ON(res->inflight_locks != 0);
740 dlm_lockres_set_refmap_bit(dlm->node_num, res);
741 }
742 res->inflight_locks++;
743 mlog(0, "%s:%.*s: inflight++: now %u\n",
744 dlm->name, res->lockname.len, res->lockname.name,
745 res->inflight_locks);
746}
747
748void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
749 struct dlm_lock_resource *res,
750 const char *file,
751 int line)
752{
753 assert_spin_locked(&res->spinlock);
754
755 BUG_ON(res->inflight_locks == 0);
756 res->inflight_locks--;
757 mlog(0, "%s:%.*s: inflight--: now %u\n",
758 dlm->name, res->lockname.len, res->lockname.name,
759 res->inflight_locks);
760 if (res->inflight_locks == 0)
761 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
762 wake_up(&res->wq);
763}
764
725/* 765/*
726 * lookup a lock resource by name. 766 * lookup a lock resource by name.
727 * may already exist in the hashtable. 767 * may already exist in the hashtable.
@@ -752,6 +792,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
752 unsigned int hash; 792 unsigned int hash;
753 int tries = 0; 793 int tries = 0;
754 int bit, wait_on_recovery = 0; 794 int bit, wait_on_recovery = 0;
795 int drop_inflight_if_nonlocal = 0;
755 796
756 BUG_ON(!lockid); 797 BUG_ON(!lockid);
757 798
@@ -761,9 +802,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
761 802
762lookup: 803lookup:
763 spin_lock(&dlm->spinlock); 804 spin_lock(&dlm->spinlock);
764 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); 805 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765 if (tmpres) { 806 if (tmpres) {
807 int dropping_ref = 0;
808
809 spin_lock(&tmpres->spinlock);
810 if (tmpres->owner == dlm->node_num) {
811 BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
812 dlm_lockres_grab_inflight_ref(dlm, tmpres);
813 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
814 dropping_ref = 1;
815 spin_unlock(&tmpres->spinlock);
766 spin_unlock(&dlm->spinlock); 816 spin_unlock(&dlm->spinlock);
817
818 /* wait until done messaging the master, drop our ref to allow
819 * the lockres to be purged, start over. */
820 if (dropping_ref) {
821 spin_lock(&tmpres->spinlock);
822 __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
823 spin_unlock(&tmpres->spinlock);
824 dlm_lockres_put(tmpres);
825 tmpres = NULL;
826 goto lookup;
827 }
828
767 mlog(0, "found in hash!\n"); 829 mlog(0, "found in hash!\n");
768 if (res) 830 if (res)
769 dlm_lockres_put(res); 831 dlm_lockres_put(res);
@@ -793,6 +855,7 @@ lookup:
793 spin_lock(&res->spinlock); 855 spin_lock(&res->spinlock);
794 dlm_change_lockres_owner(dlm, res, dlm->node_num); 856 dlm_change_lockres_owner(dlm, res, dlm->node_num);
795 __dlm_insert_lockres(dlm, res); 857 __dlm_insert_lockres(dlm, res);
858 dlm_lockres_grab_inflight_ref(dlm, res);
796 spin_unlock(&res->spinlock); 859 spin_unlock(&res->spinlock);
797 spin_unlock(&dlm->spinlock); 860 spin_unlock(&dlm->spinlock);
798 /* lockres still marked IN_PROGRESS */ 861 /* lockres still marked IN_PROGRESS */
@@ -805,29 +868,40 @@ lookup:
805 /* if we found a block, wait for lock to be mastered by another node */ 868 /* if we found a block, wait for lock to be mastered by another node */
806 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); 869 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
807 if (blocked) { 870 if (blocked) {
871 int mig;
808 if (mle->type == DLM_MLE_MASTER) { 872 if (mle->type == DLM_MLE_MASTER) {
809 mlog(ML_ERROR, "master entry for nonexistent lock!\n"); 873 mlog(ML_ERROR, "master entry for nonexistent lock!\n");
810 BUG(); 874 BUG();
811 } else if (mle->type == DLM_MLE_MIGRATION) { 875 }
812 /* migration is in progress! */ 876 mig = (mle->type == DLM_MLE_MIGRATION);
813 /* the good news is that we now know the 877 /* if there is a migration in progress, let the migration
814 * "current" master (mle->master). */ 878 * finish before continuing. we can wait for the absence
815 879 * of the MIGRATION mle: either the migrate finished or
880 * one of the nodes died and the mle was cleaned up.
881 * if there is a BLOCK here, but it already has a master
882 * set, we are too late. the master does not have a ref
883 * for us in the refmap. detach the mle and drop it.
884 * either way, go back to the top and start over. */
885 if (mig || mle->master != O2NM_MAX_NODES) {
886 BUG_ON(mig && mle->master == dlm->node_num);
887 /* we arrived too late. the master does not
888 * have a ref for us. retry. */
889 mlog(0, "%s:%.*s: late on %s\n",
890 dlm->name, namelen, lockid,
891 mig ? "MIGRATION" : "BLOCK");
816 spin_unlock(&dlm->master_lock); 892 spin_unlock(&dlm->master_lock);
817 assert_spin_locked(&dlm->spinlock);
818
819 /* set the lockres owner and hash it */
820 spin_lock(&res->spinlock);
821 dlm_set_lockres_owner(dlm, res, mle->master);
822 __dlm_insert_lockres(dlm, res);
823 spin_unlock(&res->spinlock);
824 spin_unlock(&dlm->spinlock); 893 spin_unlock(&dlm->spinlock);
825 894
826 /* master is known, detach */ 895 /* master is known, detach */
827 dlm_mle_detach_hb_events(dlm, mle); 896 if (!mig)
897 dlm_mle_detach_hb_events(dlm, mle);
828 dlm_put_mle(mle); 898 dlm_put_mle(mle);
829 mle = NULL; 899 mle = NULL;
830 goto wake_waiters; 900 /* this is lame, but we cant wait on either
901 * the mle or lockres waitqueue here */
902 if (mig)
903 msleep(100);
904 goto lookup;
831 } 905 }
832 } else { 906 } else {
833 /* go ahead and try to master lock on this node */ 907 /* go ahead and try to master lock on this node */
@@ -858,6 +932,13 @@ lookup:
858 932
859 /* finally add the lockres to its hash bucket */ 933 /* finally add the lockres to its hash bucket */
860 __dlm_insert_lockres(dlm, res); 934 __dlm_insert_lockres(dlm, res);
935 /* since this lockres is new it doesnt not require the spinlock */
936 dlm_lockres_grab_inflight_ref_new(dlm, res);
937
938 /* if this node does not become the master make sure to drop
939 * this inflight reference below */
940 drop_inflight_if_nonlocal = 1;
941
861 /* get an extra ref on the mle in case this is a BLOCK 942 /* get an extra ref on the mle in case this is a BLOCK
862 * if so, the creator of the BLOCK may try to put the last 943 * if so, the creator of the BLOCK may try to put the last
863 * ref at this time in the assert master handler, so we 944 * ref at this time in the assert master handler, so we
@@ -910,7 +991,7 @@ redo_request:
910 ret = -EINVAL; 991 ret = -EINVAL;
911 dlm_node_iter_init(mle->vote_map, &iter); 992 dlm_node_iter_init(mle->vote_map, &iter);
912 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { 993 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
913 ret = dlm_do_master_request(mle, nodenum); 994 ret = dlm_do_master_request(res, mle, nodenum);
914 if (ret < 0) 995 if (ret < 0)
915 mlog_errno(ret); 996 mlog_errno(ret);
916 if (mle->master != O2NM_MAX_NODES) { 997 if (mle->master != O2NM_MAX_NODES) {
@@ -960,6 +1041,8 @@ wait:
960 1041
961wake_waiters: 1042wake_waiters:
962 spin_lock(&res->spinlock); 1043 spin_lock(&res->spinlock);
1044 if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
1045 dlm_lockres_drop_inflight_ref(dlm, res);
963 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1046 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
964 spin_unlock(&res->spinlock); 1047 spin_unlock(&res->spinlock);
965 wake_up(&res->wq); 1048 wake_up(&res->wq);
@@ -998,7 +1081,7 @@ recheck:
998 /* this will cause the master to re-assert across 1081 /* this will cause the master to re-assert across
999 * the whole cluster, freeing up mles */ 1082 * the whole cluster, freeing up mles */
1000 if (res->owner != dlm->node_num) { 1083 if (res->owner != dlm->node_num) {
1001 ret = dlm_do_master_request(mle, res->owner); 1084 ret = dlm_do_master_request(res, mle, res->owner);
1002 if (ret < 0) { 1085 if (ret < 0) {
1003 /* give recovery a chance to run */ 1086 /* give recovery a chance to run */
1004 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); 1087 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
@@ -1062,6 +1145,8 @@ recheck:
1062 * now tell other nodes that I am 1145 * now tell other nodes that I am
1063 * mastering this. */ 1146 * mastering this. */
1064 mle->master = dlm->node_num; 1147 mle->master = dlm->node_num;
1148 /* ref was grabbed in get_lock_resource
1149 * will be dropped in dlmlock_master */
1065 assert = 1; 1150 assert = 1;
1066 sleep = 0; 1151 sleep = 0;
1067 } 1152 }
@@ -1087,7 +1172,8 @@ recheck:
1087 (atomic_read(&mle->woken) == 1), 1172 (atomic_read(&mle->woken) == 1),
1088 timeo); 1173 timeo);
1089 if (res->owner == O2NM_MAX_NODES) { 1174 if (res->owner == O2NM_MAX_NODES) {
1090 mlog(0, "waiting again\n"); 1175 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1176 res->lockname.len, res->lockname.name);
1091 goto recheck; 1177 goto recheck;
1092 } 1178 }
1093 mlog(0, "done waiting, master is %u\n", res->owner); 1179 mlog(0, "done waiting, master is %u\n", res->owner);
@@ -1100,8 +1186,7 @@ recheck:
1100 m = dlm->node_num; 1186 m = dlm->node_num;
1101 mlog(0, "about to master %.*s here, this=%u\n", 1187 mlog(0, "about to master %.*s here, this=%u\n",
1102 res->lockname.len, res->lockname.name, m); 1188 res->lockname.len, res->lockname.name, m);
1103 ret = dlm_do_assert_master(dlm, res->lockname.name, 1189 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1104 res->lockname.len, mle->vote_map, 0);
1105 if (ret) { 1190 if (ret) {
1106 /* This is a failure in the network path, 1191 /* This is a failure in the network path,
1107 * not in the response to the assert_master 1192 * not in the response to the assert_master
@@ -1117,6 +1202,8 @@ recheck:
1117 1202
1118 /* set the lockres owner */ 1203 /* set the lockres owner */
1119 spin_lock(&res->spinlock); 1204 spin_lock(&res->spinlock);
1205 /* mastery reference obtained either during
1206 * assert_master_handler or in get_lock_resource */
1120 dlm_change_lockres_owner(dlm, res, m); 1207 dlm_change_lockres_owner(dlm, res, m);
1121 spin_unlock(&res->spinlock); 1208 spin_unlock(&res->spinlock);
1122 1209
@@ -1283,7 +1370,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1283 * 1370 *
1284 */ 1371 */
1285 1372
1286static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) 1373static int dlm_do_master_request(struct dlm_lock_resource *res,
1374 struct dlm_master_list_entry *mle, int to)
1287{ 1375{
1288 struct dlm_ctxt *dlm = mle->dlm; 1376 struct dlm_ctxt *dlm = mle->dlm;
1289 struct dlm_master_request request; 1377 struct dlm_master_request request;
@@ -1339,6 +1427,9 @@ again:
1339 case DLM_MASTER_RESP_YES: 1427 case DLM_MASTER_RESP_YES:
1340 set_bit(to, mle->response_map); 1428 set_bit(to, mle->response_map);
1341 mlog(0, "node %u is the master, response=YES\n", to); 1429 mlog(0, "node %u is the master, response=YES\n", to);
1430 mlog(0, "%s:%.*s: master node %u now knows I have a "
1431 "reference\n", dlm->name, res->lockname.len,
1432 res->lockname.name, to);
1342 mle->master = to; 1433 mle->master = to;
1343 break; 1434 break;
1344 case DLM_MASTER_RESP_NO: 1435 case DLM_MASTER_RESP_NO:
@@ -1379,7 +1470,8 @@ out:
1379 * 1470 *
1380 * if possible, TRIM THIS DOWN!!! 1471 * if possible, TRIM THIS DOWN!!!
1381 */ 1472 */
1382int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) 1473int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1474 void **ret_data)
1383{ 1475{
1384 u8 response = DLM_MASTER_RESP_MAYBE; 1476 u8 response = DLM_MASTER_RESP_MAYBE;
1385 struct dlm_ctxt *dlm = data; 1477 struct dlm_ctxt *dlm = data;
@@ -1417,10 +1509,11 @@ way_up_top:
1417 1509
1418 /* take care of the easy cases up front */ 1510 /* take care of the easy cases up front */
1419 spin_lock(&res->spinlock); 1511 spin_lock(&res->spinlock);
1420 if (res->state & DLM_LOCK_RES_RECOVERING) { 1512 if (res->state & (DLM_LOCK_RES_RECOVERING|
1513 DLM_LOCK_RES_MIGRATING)) {
1421 spin_unlock(&res->spinlock); 1514 spin_unlock(&res->spinlock);
1422 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " 1515 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1423 "being recovered\n"); 1516 "being recovered/migrated\n");
1424 response = DLM_MASTER_RESP_ERROR; 1517 response = DLM_MASTER_RESP_ERROR;
1425 if (mle) 1518 if (mle)
1426 kmem_cache_free(dlm_mle_cache, mle); 1519 kmem_cache_free(dlm_mle_cache, mle);
@@ -1428,8 +1521,10 @@ way_up_top:
1428 } 1521 }
1429 1522
1430 if (res->owner == dlm->node_num) { 1523 if (res->owner == dlm->node_num) {
1524 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1525 dlm->name, namelen, name, request->node_idx);
1526 dlm_lockres_set_refmap_bit(request->node_idx, res);
1431 spin_unlock(&res->spinlock); 1527 spin_unlock(&res->spinlock);
1432 // mlog(0, "this node is the master\n");
1433 response = DLM_MASTER_RESP_YES; 1528 response = DLM_MASTER_RESP_YES;
1434 if (mle) 1529 if (mle)
1435 kmem_cache_free(dlm_mle_cache, mle); 1530 kmem_cache_free(dlm_mle_cache, mle);
@@ -1477,7 +1572,6 @@ way_up_top:
1477 mlog(0, "node %u is master, but trying to migrate to " 1572 mlog(0, "node %u is master, but trying to migrate to "
1478 "node %u.\n", tmpmle->master, tmpmle->new_master); 1573 "node %u.\n", tmpmle->master, tmpmle->new_master);
1479 if (tmpmle->master == dlm->node_num) { 1574 if (tmpmle->master == dlm->node_num) {
1480 response = DLM_MASTER_RESP_YES;
1481 mlog(ML_ERROR, "no owner on lockres, but this " 1575 mlog(ML_ERROR, "no owner on lockres, but this "
1482 "node is trying to migrate it to %u?!\n", 1576 "node is trying to migrate it to %u?!\n",
1483 tmpmle->new_master); 1577 tmpmle->new_master);
@@ -1494,6 +1588,10 @@ way_up_top:
1494 * go back and clean the mles on any 1588 * go back and clean the mles on any
1495 * other nodes */ 1589 * other nodes */
1496 dispatch_assert = 1; 1590 dispatch_assert = 1;
1591 dlm_lockres_set_refmap_bit(request->node_idx, res);
1592 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1593 dlm->name, namelen, name,
1594 request->node_idx);
1497 } else 1595 } else
1498 response = DLM_MASTER_RESP_NO; 1596 response = DLM_MASTER_RESP_NO;
1499 } else { 1597 } else {
@@ -1607,17 +1705,24 @@ send_response:
1607 * can periodically run all locks owned by this node 1705 * can periodically run all locks owned by this node
1608 * and re-assert across the cluster... 1706 * and re-assert across the cluster...
1609 */ 1707 */
1610static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, 1708int dlm_do_assert_master(struct dlm_ctxt *dlm,
1611 unsigned int namelen, void *nodemap, 1709 struct dlm_lock_resource *res,
1612 u32 flags) 1710 void *nodemap, u32 flags)
1613{ 1711{
1614 struct dlm_assert_master assert; 1712 struct dlm_assert_master assert;
1615 int to, tmpret; 1713 int to, tmpret;
1616 struct dlm_node_iter iter; 1714 struct dlm_node_iter iter;
1617 int ret = 0; 1715 int ret = 0;
1618 int reassert; 1716 int reassert;
1717 const char *lockname = res->lockname.name;
1718 unsigned int namelen = res->lockname.len;
1619 1719
1620 BUG_ON(namelen > O2NM_MAX_NAME_LEN); 1720 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1721
1722 spin_lock(&res->spinlock);
1723 res->state |= DLM_LOCK_RES_SETREF_INPROG;
1724 spin_unlock(&res->spinlock);
1725
1621again: 1726again:
1622 reassert = 0; 1727 reassert = 0;
1623 1728
@@ -1647,6 +1752,7 @@ again:
1647 mlog(0, "link to %d went down!\n", to); 1752 mlog(0, "link to %d went down!\n", to);
1648 /* any nonzero status return will do */ 1753 /* any nonzero status return will do */
1649 ret = tmpret; 1754 ret = tmpret;
1755 r = 0;
1650 } else if (r < 0) { 1756 } else if (r < 0) {
1651 /* ok, something horribly messed. kill thyself. */ 1757 /* ok, something horribly messed. kill thyself. */
1652 mlog(ML_ERROR,"during assert master of %.*s to %u, " 1758 mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1661,17 +1767,39 @@ again:
1661 spin_unlock(&dlm->master_lock); 1767 spin_unlock(&dlm->master_lock);
1662 spin_unlock(&dlm->spinlock); 1768 spin_unlock(&dlm->spinlock);
1663 BUG(); 1769 BUG();
1664 } else if (r == EAGAIN) { 1770 }
1771
1772 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1773 !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1774 mlog(ML_ERROR, "%.*s: very strange, "
1775 "master MLE but no lockres on %u\n",
1776 namelen, lockname, to);
1777 }
1778
1779 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1665 mlog(0, "%.*s: node %u create mles on other " 1780 mlog(0, "%.*s: node %u create mles on other "
1666 "nodes and requests a re-assert\n", 1781 "nodes and requests a re-assert\n",
1667 namelen, lockname, to); 1782 namelen, lockname, to);
1668 reassert = 1; 1783 reassert = 1;
1669 } 1784 }
1785 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1786 mlog(0, "%.*s: node %u has a reference to this "
1787 "lockres, set the bit in the refmap\n",
1788 namelen, lockname, to);
1789 spin_lock(&res->spinlock);
1790 dlm_lockres_set_refmap_bit(to, res);
1791 spin_unlock(&res->spinlock);
1792 }
1670 } 1793 }
1671 1794
1672 if (reassert) 1795 if (reassert)
1673 goto again; 1796 goto again;
1674 1797
1798 spin_lock(&res->spinlock);
1799 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1800 spin_unlock(&res->spinlock);
1801 wake_up(&res->wq);
1802
1675 return ret; 1803 return ret;
1676} 1804}
1677 1805
@@ -1684,7 +1812,8 @@ again:
1684 * 1812 *
1685 * if possible, TRIM THIS DOWN!!! 1813 * if possible, TRIM THIS DOWN!!!
1686 */ 1814 */
1687int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) 1815int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1816 void **ret_data)
1688{ 1817{
1689 struct dlm_ctxt *dlm = data; 1818 struct dlm_ctxt *dlm = data;
1690 struct dlm_master_list_entry *mle = NULL; 1819 struct dlm_master_list_entry *mle = NULL;
@@ -1693,7 +1822,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1693 char *name; 1822 char *name;
1694 unsigned int namelen, hash; 1823 unsigned int namelen, hash;
1695 u32 flags; 1824 u32 flags;
1696 int master_request = 0; 1825 int master_request = 0, have_lockres_ref = 0;
1697 int ret = 0; 1826 int ret = 0;
1698 1827
1699 if (!dlm_grab(dlm)) 1828 if (!dlm_grab(dlm))
@@ -1851,6 +1980,7 @@ ok:
1851 spin_unlock(&mle->spinlock); 1980 spin_unlock(&mle->spinlock);
1852 1981
1853 if (res) { 1982 if (res) {
1983 int wake = 0;
1854 spin_lock(&res->spinlock); 1984 spin_lock(&res->spinlock);
1855 if (mle->type == DLM_MLE_MIGRATION) { 1985 if (mle->type == DLM_MLE_MIGRATION) {
1856 mlog(0, "finishing off migration of lockres %.*s, " 1986 mlog(0, "finishing off migration of lockres %.*s, "
@@ -1858,12 +1988,16 @@ ok:
1858 res->lockname.len, res->lockname.name, 1988 res->lockname.len, res->lockname.name,
1859 dlm->node_num, mle->new_master); 1989 dlm->node_num, mle->new_master);
1860 res->state &= ~DLM_LOCK_RES_MIGRATING; 1990 res->state &= ~DLM_LOCK_RES_MIGRATING;
1991 wake = 1;
1861 dlm_change_lockres_owner(dlm, res, mle->new_master); 1992 dlm_change_lockres_owner(dlm, res, mle->new_master);
1862 BUG_ON(res->state & DLM_LOCK_RES_DIRTY); 1993 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1863 } else { 1994 } else {
1864 dlm_change_lockres_owner(dlm, res, mle->master); 1995 dlm_change_lockres_owner(dlm, res, mle->master);
1865 } 1996 }
1866 spin_unlock(&res->spinlock); 1997 spin_unlock(&res->spinlock);
1998 have_lockres_ref = 1;
1999 if (wake)
2000 wake_up(&res->wq);
1867 } 2001 }
1868 2002
1869 /* master is known, detach if not already detached. 2003 /* master is known, detach if not already detached.
@@ -1913,12 +2047,28 @@ ok:
1913 2047
1914done: 2048done:
1915 ret = 0; 2049 ret = 0;
1916 if (res) 2050 if (res) {
1917 dlm_lockres_put(res); 2051 spin_lock(&res->spinlock);
2052 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2053 spin_unlock(&res->spinlock);
2054 *ret_data = (void *)res;
2055 }
1918 dlm_put(dlm); 2056 dlm_put(dlm);
1919 if (master_request) { 2057 if (master_request) {
1920 mlog(0, "need to tell master to reassert\n"); 2058 mlog(0, "need to tell master to reassert\n");
1921 ret = EAGAIN; // positive. negative would shoot down the node. 2059 /* positive. negative would shoot down the node. */
2060 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2061 if (!have_lockres_ref) {
2062 mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2063 "mle present here for %s:%.*s, but no lockres!\n",
2064 assert->node_idx, dlm->name, namelen, name);
2065 }
2066 }
2067 if (have_lockres_ref) {
2068 /* let the master know we have a reference to the lockres */
2069 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2070 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2071 dlm->name, namelen, name, assert->node_idx);
1922 } 2072 }
1923 return ret; 2073 return ret;
1924 2074
@@ -1929,11 +2079,25 @@ kill:
1929 __dlm_print_one_lock_resource(res); 2079 __dlm_print_one_lock_resource(res);
1930 spin_unlock(&res->spinlock); 2080 spin_unlock(&res->spinlock);
1931 spin_unlock(&dlm->spinlock); 2081 spin_unlock(&dlm->spinlock);
1932 dlm_lockres_put(res); 2082 *ret_data = (void *)res;
1933 dlm_put(dlm); 2083 dlm_put(dlm);
1934 return -EINVAL; 2084 return -EINVAL;
1935} 2085}
1936 2086
2087void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2088{
2089 struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2090
2091 if (ret_data) {
2092 spin_lock(&res->spinlock);
2093 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2094 spin_unlock(&res->spinlock);
2095 wake_up(&res->wq);
2096 dlm_lockres_put(res);
2097 }
2098 return;
2099}
2100
1937int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, 2101int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1938 struct dlm_lock_resource *res, 2102 struct dlm_lock_resource *res,
1939 int ignore_higher, u8 request_from, u32 flags) 2103 int ignore_higher, u8 request_from, u32 flags)
@@ -2023,9 +2187,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2023 * even if one or more nodes die */ 2187 * even if one or more nodes die */
2024 mlog(0, "worker about to master %.*s here, this=%u\n", 2188 mlog(0, "worker about to master %.*s here, this=%u\n",
2025 res->lockname.len, res->lockname.name, dlm->node_num); 2189 res->lockname.len, res->lockname.name, dlm->node_num);
2026 ret = dlm_do_assert_master(dlm, res->lockname.name, 2190 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2027 res->lockname.len,
2028 nodemap, flags);
2029 if (ret < 0) { 2191 if (ret < 0) {
2030 /* no need to restart, we are done */ 2192 /* no need to restart, we are done */
2031 if (!dlm_is_host_down(ret)) 2193 if (!dlm_is_host_down(ret))
@@ -2097,14 +2259,180 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2097 return ret; 2259 return ret;
2098} 2260}
2099 2261
2262/*
2263 * DLM_DEREF_LOCKRES_MSG
2264 */
2265
2266int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2267{
2268 struct dlm_deref_lockres deref;
2269 int ret = 0, r;
2270 const char *lockname;
2271 unsigned int namelen;
2272
2273 lockname = res->lockname.name;
2274 namelen = res->lockname.len;
2275 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2276
2277 mlog(0, "%s:%.*s: sending deref to %d\n",
2278 dlm->name, namelen, lockname, res->owner);
2279 memset(&deref, 0, sizeof(deref));
2280 deref.node_idx = dlm->node_num;
2281 deref.namelen = namelen;
2282 memcpy(deref.name, lockname, namelen);
2283
2284 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2285 &deref, sizeof(deref), res->owner, &r);
2286 if (ret < 0)
2287 mlog_errno(ret);
2288 else if (r < 0) {
2289 /* BAD. other node says I did not have a ref. */
2290 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2291 "(master=%u) got %d.\n", dlm->name, namelen,
2292 lockname, res->owner, r);
2293 dlm_print_one_lock_resource(res);
2294 BUG();
2295 }
2296 return ret;
2297}
2298
2299int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2300 void **ret_data)
2301{
2302 struct dlm_ctxt *dlm = data;
2303 struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2304 struct dlm_lock_resource *res = NULL;
2305 char *name;
2306 unsigned int namelen;
2307 int ret = -EINVAL;
2308 u8 node;
2309 unsigned int hash;
2310 struct dlm_work_item *item;
2311 int cleared = 0;
2312 int dispatch = 0;
2313
2314 if (!dlm_grab(dlm))
2315 return 0;
2316
2317 name = deref->name;
2318 namelen = deref->namelen;
2319 node = deref->node_idx;
2320
2321 if (namelen > DLM_LOCKID_NAME_MAX) {
2322 mlog(ML_ERROR, "Invalid name length!");
2323 goto done;
2324 }
2325 if (deref->node_idx >= O2NM_MAX_NODES) {
2326 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2327 goto done;
2328 }
2329
2330 hash = dlm_lockid_hash(name, namelen);
2331
2332 spin_lock(&dlm->spinlock);
2333 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2334 if (!res) {
2335 spin_unlock(&dlm->spinlock);
2336 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2337 dlm->name, namelen, name);
2338 goto done;
2339 }
2340 spin_unlock(&dlm->spinlock);
2341
2342 spin_lock(&res->spinlock);
2343 if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2344 dispatch = 1;
2345 else {
2346 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2347 if (test_bit(node, res->refmap)) {
2348 dlm_lockres_clear_refmap_bit(node, res);
2349 cleared = 1;
2350 }
2351 }
2352 spin_unlock(&res->spinlock);
2353
2354 if (!dispatch) {
2355 if (cleared)
2356 dlm_lockres_calc_usage(dlm, res);
2357 else {
2358 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2359 "but it is already dropped!\n", dlm->name,
2360 res->lockname.len, res->lockname.name, node);
2361 __dlm_print_one_lock_resource(res);
2362 }
2363 ret = 0;
2364 goto done;
2365 }
2366
2367 item = kzalloc(sizeof(*item), GFP_NOFS);
2368 if (!item) {
2369 ret = -ENOMEM;
2370 mlog_errno(ret);
2371 goto done;
2372 }
2373
2374 dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2375 item->u.dl.deref_res = res;
2376 item->u.dl.deref_node = node;
2377
2378 spin_lock(&dlm->work_lock);
2379 list_add_tail(&item->list, &dlm->work_list);
2380 spin_unlock(&dlm->work_lock);
2381
2382 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2383 return 0;
2384
2385done:
2386 if (res)
2387 dlm_lockres_put(res);
2388 dlm_put(dlm);
2389
2390 return ret;
2391}
2392
2393static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2394{
2395 struct dlm_ctxt *dlm;
2396 struct dlm_lock_resource *res;
2397 u8 node;
2398 u8 cleared = 0;
2399
2400 dlm = item->dlm;
2401 res = item->u.dl.deref_res;
2402 node = item->u.dl.deref_node;
2403
2404 spin_lock(&res->spinlock);
2405 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2406 if (test_bit(node, res->refmap)) {
2407 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2408 dlm_lockres_clear_refmap_bit(node, res);
2409 cleared = 1;
2410 }
2411 spin_unlock(&res->spinlock);
2412
2413 if (cleared) {
2414 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2415 dlm->name, res->lockname.len, res->lockname.name, node);
2416 dlm_lockres_calc_usage(dlm, res);
2417 } else {
2418 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2419 "but it is already dropped!\n", dlm->name,
2420 res->lockname.len, res->lockname.name, node);
2421 __dlm_print_one_lock_resource(res);
2422 }
2423
2424 dlm_lockres_put(res);
2425}
2426
2100 2427
2101/* 2428/*
2102 * DLM_MIGRATE_LOCKRES 2429 * DLM_MIGRATE_LOCKRES
2103 */ 2430 */
2104 2431
2105 2432
2106int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 2433static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2107 u8 target) 2434 struct dlm_lock_resource *res,
2435 u8 target)
2108{ 2436{
2109 struct dlm_master_list_entry *mle = NULL; 2437 struct dlm_master_list_entry *mle = NULL;
2110 struct dlm_master_list_entry *oldmle = NULL; 2438 struct dlm_master_list_entry *oldmle = NULL;
@@ -2116,7 +2444,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2116 struct list_head *queue, *iter; 2444 struct list_head *queue, *iter;
2117 int i; 2445 int i;
2118 struct dlm_lock *lock; 2446 struct dlm_lock *lock;
2119 int empty = 1; 2447 int empty = 1, wake = 0;
2120 2448
2121 if (!dlm_grab(dlm)) 2449 if (!dlm_grab(dlm))
2122 return -EINVAL; 2450 return -EINVAL;
@@ -2241,6 +2569,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2241 res->lockname.name, target); 2569 res->lockname.name, target);
2242 spin_lock(&res->spinlock); 2570 spin_lock(&res->spinlock);
2243 res->state &= ~DLM_LOCK_RES_MIGRATING; 2571 res->state &= ~DLM_LOCK_RES_MIGRATING;
2572 wake = 1;
2244 spin_unlock(&res->spinlock); 2573 spin_unlock(&res->spinlock);
2245 ret = -EINVAL; 2574 ret = -EINVAL;
2246 } 2575 }
@@ -2268,6 +2597,9 @@ fail:
2268 * the lockres 2597 * the lockres
2269 */ 2598 */
2270 2599
2600 /* now that remote nodes are spinning on the MIGRATING flag,
2601 * ensure that all assert_master work is flushed. */
2602 flush_workqueue(dlm->dlm_worker);
2271 2603
2272 /* get an extra reference on the mle. 2604 /* get an extra reference on the mle.
2273 * otherwise the assert_master from the new 2605 * otherwise the assert_master from the new
@@ -2296,6 +2628,7 @@ fail:
2296 dlm_put_mle_inuse(mle); 2628 dlm_put_mle_inuse(mle);
2297 spin_lock(&res->spinlock); 2629 spin_lock(&res->spinlock);
2298 res->state &= ~DLM_LOCK_RES_MIGRATING; 2630 res->state &= ~DLM_LOCK_RES_MIGRATING;
2631 wake = 1;
2299 spin_unlock(&res->spinlock); 2632 spin_unlock(&res->spinlock);
2300 goto leave; 2633 goto leave;
2301 } 2634 }
@@ -2322,7 +2655,8 @@ fail:
2322 res->owner == target) 2655 res->owner == target)
2323 break; 2656 break;
2324 2657
2325 mlog(0, "timed out during migration\n"); 2658 mlog(0, "%s:%.*s: timed out during migration\n",
2659 dlm->name, res->lockname.len, res->lockname.name);
2326 /* avoid hang during shutdown when migrating lockres 2660 /* avoid hang during shutdown when migrating lockres
2327 * to a node which also goes down */ 2661 * to a node which also goes down */
2328 if (dlm_is_node_dead(dlm, target)) { 2662 if (dlm_is_node_dead(dlm, target)) {
@@ -2330,20 +2664,20 @@ fail:
2330 "target %u is no longer up, restarting\n", 2664 "target %u is no longer up, restarting\n",
2331 dlm->name, res->lockname.len, 2665 dlm->name, res->lockname.len,
2332 res->lockname.name, target); 2666 res->lockname.name, target);
2333 ret = -ERESTARTSYS; 2667 ret = -EINVAL;
2668 /* migration failed, detach and clean up mle */
2669 dlm_mle_detach_hb_events(dlm, mle);
2670 dlm_put_mle(mle);
2671 dlm_put_mle_inuse(mle);
2672 spin_lock(&res->spinlock);
2673 res->state &= ~DLM_LOCK_RES_MIGRATING;
2674 wake = 1;
2675 spin_unlock(&res->spinlock);
2676 goto leave;
2334 } 2677 }
2335 } 2678 } else
2336 if (ret == -ERESTARTSYS) { 2679 mlog(0, "%s:%.*s: caught signal during migration\n",
2337 /* migration failed, detach and clean up mle */ 2680 dlm->name, res->lockname.len, res->lockname.name);
2338 dlm_mle_detach_hb_events(dlm, mle);
2339 dlm_put_mle(mle);
2340 dlm_put_mle_inuse(mle);
2341 spin_lock(&res->spinlock);
2342 res->state &= ~DLM_LOCK_RES_MIGRATING;
2343 spin_unlock(&res->spinlock);
2344 goto leave;
2345 }
2346 /* TODO: if node died: stop, clean up, return error */
2347 } 2681 }
2348 2682
2349 /* all done, set the owner, clear the flag */ 2683 /* all done, set the owner, clear the flag */
@@ -2366,6 +2700,11 @@ leave:
2366 if (ret < 0) 2700 if (ret < 0)
2367 dlm_kick_thread(dlm, res); 2701 dlm_kick_thread(dlm, res);
2368 2702
2703 /* wake up waiters if the MIGRATING flag got set
2704 * but migration failed */
2705 if (wake)
2706 wake_up(&res->wq);
2707
2369 /* TODO: cleanup */ 2708 /* TODO: cleanup */
2370 if (mres) 2709 if (mres)
2371 free_page((unsigned long)mres); 2710 free_page((unsigned long)mres);
@@ -2376,6 +2715,53 @@ leave:
2376 return ret; 2715 return ret;
2377} 2716}
2378 2717
2718#define DLM_MIGRATION_RETRY_MS 100
2719
2720/* Should be called only after beginning the domain leave process.
2721 * There should not be any remaining locks on nonlocal lock resources,
2722 * and there should be no local locks left on locally mastered resources.
2723 *
2724 * Called with the dlm spinlock held, may drop it to do migration, but
2725 * will re-acquire before exit.
2726 *
2727 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2728int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2729{
2730 int ret;
2731 int lock_dropped = 0;
2732
2733 if (res->owner != dlm->node_num) {
2734 if (!__dlm_lockres_unused(res)) {
2735 mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2736 "trying to free this but locks remain\n",
2737 dlm->name, res->lockname.len, res->lockname.name);
2738 }
2739 goto leave;
2740 }
2741
2742 /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2743 spin_unlock(&dlm->spinlock);
2744 lock_dropped = 1;
2745 while (1) {
2746 ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2747 if (ret >= 0)
2748 break;
2749 if (ret == -ENOTEMPTY) {
2750 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2751 res->lockname.len, res->lockname.name);
2752 BUG();
2753 }
2754
2755 mlog(0, "lockres %.*s: migrate failed, "
2756 "retrying\n", res->lockname.len,
2757 res->lockname.name);
2758 msleep(DLM_MIGRATION_RETRY_MS);
2759 }
2760 spin_lock(&dlm->spinlock);
2761leave:
2762 return lock_dropped;
2763}
2764
2379int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) 2765int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2380{ 2766{
2381 int ret; 2767 int ret;
@@ -2405,7 +2791,8 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2405 return can_proceed; 2791 return can_proceed;
2406} 2792}
2407 2793
2408int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 2794static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2795 struct dlm_lock_resource *res)
2409{ 2796{
2410 int ret; 2797 int ret;
2411 spin_lock(&res->spinlock); 2798 spin_lock(&res->spinlock);
@@ -2434,8 +2821,15 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2434 __dlm_lockres_reserve_ast(res); 2821 __dlm_lockres_reserve_ast(res);
2435 spin_unlock(&res->spinlock); 2822 spin_unlock(&res->spinlock);
2436 2823
2437 /* now flush all the pending asts.. hang out for a bit */ 2824 /* now flush all the pending asts */
2438 dlm_kick_thread(dlm, res); 2825 dlm_kick_thread(dlm, res);
2826 /* before waiting on DIRTY, block processes which may
2827 * try to dirty the lockres before MIGRATING is set */
2828 spin_lock(&res->spinlock);
2829 BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2830 res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2831 spin_unlock(&res->spinlock);
2832 /* now wait on any pending asts and the DIRTY state */
2439 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); 2833 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2440 dlm_lockres_release_ast(dlm, res); 2834 dlm_lockres_release_ast(dlm, res);
2441 2835
@@ -2461,6 +2855,13 @@ again:
2461 mlog(0, "trying again...\n"); 2855 mlog(0, "trying again...\n");
2462 goto again; 2856 goto again;
2463 } 2857 }
2858 /* now that we are sure the MIGRATING state is there, drop
2859 * the unneded state which blocked threads trying to DIRTY */
2860 spin_lock(&res->spinlock);
2861 BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2862 BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2863 res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2864 spin_unlock(&res->spinlock);
2464 2865
2465 /* did the target go down or die? */ 2866 /* did the target go down or die? */
2466 spin_lock(&dlm->spinlock); 2867 spin_lock(&dlm->spinlock);
@@ -2490,7 +2891,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2490{ 2891{
2491 struct list_head *iter, *iter2; 2892 struct list_head *iter, *iter2;
2492 struct list_head *queue = &res->granted; 2893 struct list_head *queue = &res->granted;
2493 int i; 2894 int i, bit;
2494 struct dlm_lock *lock; 2895 struct dlm_lock *lock;
2495 2896
2496 assert_spin_locked(&res->spinlock); 2897 assert_spin_locked(&res->spinlock);
@@ -2508,12 +2909,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2508 BUG_ON(!list_empty(&lock->bast_list)); 2909 BUG_ON(!list_empty(&lock->bast_list));
2509 BUG_ON(lock->ast_pending); 2910 BUG_ON(lock->ast_pending);
2510 BUG_ON(lock->bast_pending); 2911 BUG_ON(lock->bast_pending);
2912 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2511 list_del_init(&lock->list); 2913 list_del_init(&lock->list);
2512 dlm_lock_put(lock); 2914 dlm_lock_put(lock);
2513 } 2915 }
2514 } 2916 }
2515 queue++; 2917 queue++;
2516 } 2918 }
2919 bit = 0;
2920 while (1) {
2921 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2922 if (bit >= O2NM_MAX_NODES)
2923 break;
2924 /* do not clear the local node reference, if there is a
2925 * process holding this, let it drop the ref itself */
2926 if (bit != dlm->node_num) {
2927 mlog(0, "%s:%.*s: node %u had a ref to this "
2928 "migrating lockres, clearing\n", dlm->name,
2929 res->lockname.len, res->lockname.name, bit);
2930 dlm_lockres_clear_refmap_bit(bit, res);
2931 }
2932 bit++;
2933 }
2517} 2934}
2518 2935
2519/* for now this is not too intelligent. we will 2936/* for now this is not too intelligent. we will
@@ -2601,6 +3018,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2601 mlog(0, "migrate request (node %u) returned %d!\n", 3018 mlog(0, "migrate request (node %u) returned %d!\n",
2602 nodenum, status); 3019 nodenum, status);
2603 ret = status; 3020 ret = status;
3021 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3022 /* during the migration request we short-circuited
3023 * the mastery of the lockres. make sure we have
3024 * a mastery ref for nodenum */
3025 mlog(0, "%s:%.*s: need ref for node %u\n",
3026 dlm->name, res->lockname.len, res->lockname.name,
3027 nodenum);
3028 spin_lock(&res->spinlock);
3029 dlm_lockres_set_refmap_bit(nodenum, res);
3030 spin_unlock(&res->spinlock);
2604 } 3031 }
2605 } 3032 }
2606 3033
@@ -2619,7 +3046,8 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2619 * we will have no mle in the list to start with. now we can add an mle for 3046 * we will have no mle in the list to start with. now we can add an mle for
2620 * the migration and this should be the only one found for those scanning the 3047 * the migration and this should be the only one found for those scanning the
2621 * list. */ 3048 * list. */
2622int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) 3049int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3050 void **ret_data)
2623{ 3051{
2624 struct dlm_ctxt *dlm = data; 3052 struct dlm_ctxt *dlm = data;
2625 struct dlm_lock_resource *res = NULL; 3053 struct dlm_lock_resource *res = NULL;
@@ -2745,7 +3173,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2745 /* remove it from the list so that only one 3173 /* remove it from the list so that only one
2746 * mle will be found */ 3174 * mle will be found */
2747 list_del_init(&tmp->list); 3175 list_del_init(&tmp->list);
2748 __dlm_mle_detach_hb_events(dlm, mle); 3176 /* this was obviously WRONG. mle is uninited here. should be tmp. */
3177 __dlm_mle_detach_hb_events(dlm, tmp);
3178 ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3179 mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3180 "telling master to get ref for cleared out mle "
3181 "during migration\n", dlm->name, namelen, name,
3182 master, new_master);
2749 } 3183 }
2750 spin_unlock(&tmp->spinlock); 3184 spin_unlock(&tmp->spinlock);
2751 } 3185 }
@@ -2753,6 +3187,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2753 /* now add a migration mle to the tail of the list */ 3187 /* now add a migration mle to the tail of the list */
2754 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); 3188 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2755 mle->new_master = new_master; 3189 mle->new_master = new_master;
3190 /* the new master will be sending an assert master for this.
3191 * at that point we will get the refmap reference */
2756 mle->master = master; 3192 mle->master = master;
2757 /* do this for consistency with other mle types */ 3193 /* do this for consistency with other mle types */
2758 set_bit(new_master, mle->maybe_map); 3194 set_bit(new_master, mle->maybe_map);
@@ -2902,6 +3338,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2902 clear_bit(dlm->node_num, iter.node_map); 3338 clear_bit(dlm->node_num, iter.node_map);
2903 spin_unlock(&dlm->spinlock); 3339 spin_unlock(&dlm->spinlock);
2904 3340
3341 /* ownership of the lockres is changing. account for the
3342 * mastery reference here since old_master will briefly have
3343 * a reference after the migration completes */
3344 spin_lock(&res->spinlock);
3345 dlm_lockres_set_refmap_bit(old_master, res);
3346 spin_unlock(&res->spinlock);
3347
2905 mlog(0, "now time to do a migrate request to other nodes\n"); 3348 mlog(0, "now time to do a migrate request to other nodes\n");
2906 ret = dlm_do_migrate_request(dlm, res, old_master, 3349 ret = dlm_do_migrate_request(dlm, res, old_master,
2907 dlm->node_num, &iter); 3350 dlm->node_num, &iter);
@@ -2914,8 +3357,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2914 res->lockname.len, res->lockname.name); 3357 res->lockname.len, res->lockname.name);
2915 /* this call now finishes out the nodemap 3358 /* this call now finishes out the nodemap
2916 * even if one or more nodes die */ 3359 * even if one or more nodes die */
2917 ret = dlm_do_assert_master(dlm, res->lockname.name, 3360 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2918 res->lockname.len, iter.node_map,
2919 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3361 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2920 if (ret < 0) { 3362 if (ret < 0) {
2921 /* no longer need to retry. all living nodes contacted. */ 3363 /* no longer need to retry. all living nodes contacted. */
@@ -2927,8 +3369,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2927 set_bit(old_master, iter.node_map); 3369 set_bit(old_master, iter.node_map);
2928 mlog(0, "doing assert master of %.*s back to %u\n", 3370 mlog(0, "doing assert master of %.*s back to %u\n",
2929 res->lockname.len, res->lockname.name, old_master); 3371 res->lockname.len, res->lockname.name, old_master);
2930 ret = dlm_do_assert_master(dlm, res->lockname.name, 3372 ret = dlm_do_assert_master(dlm, res, iter.node_map,
2931 res->lockname.len, iter.node_map,
2932 DLM_ASSERT_MASTER_FINISH_MIGRATION); 3373 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2933 if (ret < 0) { 3374 if (ret < 0) {
2934 mlog(0, "assert master to original master failed " 3375 mlog(0, "assert master to original master failed "
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 367a11e9e2ed..6d4a83d50152 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -163,9 +163,6 @@ void dlm_dispatch_work(struct work_struct *work)
163 dlm_workfunc_t *workfunc; 163 dlm_workfunc_t *workfunc;
164 int tot=0; 164 int tot=0;
165 165
166 if (!dlm_joined(dlm))
167 return;
168
169 spin_lock(&dlm->work_lock); 166 spin_lock(&dlm->work_lock);
170 list_splice_init(&dlm->work_list, &tmp_list); 167 list_splice_init(&dlm->work_list, &tmp_list);
171 spin_unlock(&dlm->work_lock); 168 spin_unlock(&dlm->work_lock);
@@ -821,7 +818,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
821 818
822} 819}
823 820
824int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) 821int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
822 void **ret_data)
825{ 823{
826 struct dlm_ctxt *dlm = data; 824 struct dlm_ctxt *dlm = data;
827 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; 825 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -978,7 +976,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
978} 976}
979 977
980 978
981int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) 979int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
980 void **ret_data)
982{ 981{
983 struct dlm_ctxt *dlm = data; 982 struct dlm_ctxt *dlm = data;
984 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; 983 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
@@ -1129,6 +1128,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1129 if (total_locks == mres_total_locks) 1128 if (total_locks == mres_total_locks)
1130 mres->flags |= DLM_MRES_ALL_DONE; 1129 mres->flags |= DLM_MRES_ALL_DONE;
1131 1130
1131 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
1132 dlm->name, res->lockname.len, res->lockname.name,
1133 orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
1134 send_to);
1135
1132 /* send it */ 1136 /* send it */
1133 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, 1137 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
1134 sz, send_to, &status); 1138 sz, send_to, &status);
@@ -1213,6 +1217,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
1213 return 0; 1217 return 0;
1214} 1218}
1215 1219
1220static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
1221 struct dlm_migratable_lockres *mres)
1222{
1223 struct dlm_lock dummy;
1224 memset(&dummy, 0, sizeof(dummy));
1225 dummy.ml.cookie = 0;
1226 dummy.ml.type = LKM_IVMODE;
1227 dummy.ml.convert_type = LKM_IVMODE;
1228 dummy.ml.highest_blocked = LKM_IVMODE;
1229 dummy.lksb = NULL;
1230 dummy.ml.node = dlm->node_num;
1231 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
1232}
1233
1234static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
1235 struct dlm_migratable_lock *ml,
1236 u8 *nodenum)
1237{
1238 if (unlikely(ml->cookie == 0 &&
1239 ml->type == LKM_IVMODE &&
1240 ml->convert_type == LKM_IVMODE &&
1241 ml->highest_blocked == LKM_IVMODE &&
1242 ml->list == DLM_BLOCKED_LIST)) {
1243 *nodenum = ml->node;
1244 return 1;
1245 }
1246 return 0;
1247}
1216 1248
1217int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, 1249int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1218 struct dlm_migratable_lockres *mres, 1250 struct dlm_migratable_lockres *mres,
@@ -1260,6 +1292,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1260 goto error; 1292 goto error;
1261 } 1293 }
1262 } 1294 }
1295 if (total_locks == 0) {
1296 /* send a dummy lock to indicate a mastery reference only */
1297 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
1298 dlm->name, res->lockname.len, res->lockname.name,
1299 send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
1300 "migration");
1301 dlm_add_dummy_lock(dlm, mres);
1302 }
1263 /* flush any remaining locks */ 1303 /* flush any remaining locks */
1264 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); 1304 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1265 if (ret < 0) 1305 if (ret < 0)
@@ -1293,7 +1333,8 @@ error:
1293 * do we spin? returning an error only delays the problem really 1333 * do we spin? returning an error only delays the problem really
1294 */ 1334 */
1295 1335
1296int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) 1336int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
1337 void **ret_data)
1297{ 1338{
1298 struct dlm_ctxt *dlm = data; 1339 struct dlm_ctxt *dlm = data;
1299 struct dlm_migratable_lockres *mres = 1340 struct dlm_migratable_lockres *mres =
@@ -1382,17 +1423,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1382 spin_lock(&res->spinlock); 1423 spin_lock(&res->spinlock);
1383 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 1424 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1384 spin_unlock(&res->spinlock); 1425 spin_unlock(&res->spinlock);
1426 wake_up(&res->wq);
1385 1427
1386 /* add an extra ref for just-allocated lockres 1428 /* add an extra ref for just-allocated lockres
1387 * otherwise the lockres will be purged immediately */ 1429 * otherwise the lockres will be purged immediately */
1388 dlm_lockres_get(res); 1430 dlm_lockres_get(res);
1389
1390 } 1431 }
1391 1432
1392 /* at this point we have allocated everything we need, 1433 /* at this point we have allocated everything we need,
1393 * and we have a hashed lockres with an extra ref and 1434 * and we have a hashed lockres with an extra ref and
1394 * the proper res->state flags. */ 1435 * the proper res->state flags. */
1395 ret = 0; 1436 ret = 0;
1437 spin_lock(&res->spinlock);
1438 /* drop this either when master requery finds a different master
1439 * or when a lock is added by the recovery worker */
1440 dlm_lockres_grab_inflight_ref(dlm, res);
1396 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { 1441 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1397 /* migration cannot have an unknown master */ 1442 /* migration cannot have an unknown master */
1398 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); 1443 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1445,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
1400 "unknown owner.. will need to requery: " 1445 "unknown owner.. will need to requery: "
1401 "%.*s\n", mres->lockname_len, mres->lockname); 1446 "%.*s\n", mres->lockname_len, mres->lockname);
1402 } else { 1447 } else {
1403 spin_lock(&res->spinlock); 1448 /* take a reference now to pin the lockres, drop it
1449 * when locks are added in the worker */
1404 dlm_change_lockres_owner(dlm, res, dlm->node_num); 1450 dlm_change_lockres_owner(dlm, res, dlm->node_num);
1405 spin_unlock(&res->spinlock);
1406 } 1451 }
1452 spin_unlock(&res->spinlock);
1407 1453
1408 /* queue up work for dlm_mig_lockres_worker */ 1454 /* queue up work for dlm_mig_lockres_worker */
1409 dlm_grab(dlm); /* get an extra ref for the work item */ 1455 dlm_grab(dlm); /* get an extra ref for the work item */
@@ -1459,6 +1505,9 @@ again:
1459 "this node will take it.\n", 1505 "this node will take it.\n",
1460 res->lockname.len, res->lockname.name); 1506 res->lockname.len, res->lockname.name);
1461 } else { 1507 } else {
1508 spin_lock(&res->spinlock);
1509 dlm_lockres_drop_inflight_ref(dlm, res);
1510 spin_unlock(&res->spinlock);
1462 mlog(0, "master needs to respond to sender " 1511 mlog(0, "master needs to respond to sender "
1463 "that node %u still owns %.*s\n", 1512 "that node %u still owns %.*s\n",
1464 real_master, res->lockname.len, 1513 real_master, res->lockname.len,
@@ -1578,7 +1627,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1578/* this function cannot error, so unless the sending 1627/* this function cannot error, so unless the sending
1579 * or receiving of the message failed, the owner can 1628 * or receiving of the message failed, the owner can
1580 * be trusted */ 1629 * be trusted */
1581int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) 1630int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
1631 void **ret_data)
1582{ 1632{
1583 struct dlm_ctxt *dlm = data; 1633 struct dlm_ctxt *dlm = data;
1584 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; 1634 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
@@ -1660,21 +1710,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1660{ 1710{
1661 struct dlm_migratable_lock *ml; 1711 struct dlm_migratable_lock *ml;
1662 struct list_head *queue; 1712 struct list_head *queue;
1713 struct list_head *tmpq = NULL;
1663 struct dlm_lock *newlock = NULL; 1714 struct dlm_lock *newlock = NULL;
1664 struct dlm_lockstatus *lksb = NULL; 1715 struct dlm_lockstatus *lksb = NULL;
1665 int ret = 0; 1716 int ret = 0;
1666 int i, bad; 1717 int i, j, bad;
1667 struct list_head *iter; 1718 struct list_head *iter;
1668 struct dlm_lock *lock = NULL; 1719 struct dlm_lock *lock = NULL;
1720 u8 from = O2NM_MAX_NODES;
1721 unsigned int added = 0;
1669 1722
1670 mlog(0, "running %d locks for this lockres\n", mres->num_locks); 1723 mlog(0, "running %d locks for this lockres\n", mres->num_locks);
1671 for (i=0; i<mres->num_locks; i++) { 1724 for (i=0; i<mres->num_locks; i++) {
1672 ml = &(mres->ml[i]); 1725 ml = &(mres->ml[i]);
1726
1727 if (dlm_is_dummy_lock(dlm, ml, &from)) {
1728 /* placeholder, just need to set the refmap bit */
1729 BUG_ON(mres->num_locks != 1);
1730 mlog(0, "%s:%.*s: dummy lock for %u\n",
1731 dlm->name, mres->lockname_len, mres->lockname,
1732 from);
1733 spin_lock(&res->spinlock);
1734 dlm_lockres_set_refmap_bit(from, res);
1735 spin_unlock(&res->spinlock);
1736 added++;
1737 break;
1738 }
1673 BUG_ON(ml->highest_blocked != LKM_IVMODE); 1739 BUG_ON(ml->highest_blocked != LKM_IVMODE);
1674 newlock = NULL; 1740 newlock = NULL;
1675 lksb = NULL; 1741 lksb = NULL;
1676 1742
1677 queue = dlm_list_num_to_pointer(res, ml->list); 1743 queue = dlm_list_num_to_pointer(res, ml->list);
1744 tmpq = NULL;
1678 1745
1679 /* if the lock is for the local node it needs to 1746 /* if the lock is for the local node it needs to
1680 * be moved to the proper location within the queue. 1747 * be moved to the proper location within the queue.
@@ -1684,11 +1751,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1684 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); 1751 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
1685 1752
1686 spin_lock(&res->spinlock); 1753 spin_lock(&res->spinlock);
1687 list_for_each(iter, queue) { 1754 for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
1688 lock = list_entry (iter, struct dlm_lock, list); 1755 tmpq = dlm_list_idx_to_ptr(res, j);
1689 if (lock->ml.cookie != ml->cookie) 1756 list_for_each(iter, tmpq) {
1690 lock = NULL; 1757 lock = list_entry (iter, struct dlm_lock, list);
1691 else 1758 if (lock->ml.cookie != ml->cookie)
1759 lock = NULL;
1760 else
1761 break;
1762 }
1763 if (lock)
1692 break; 1764 break;
1693 } 1765 }
1694 1766
@@ -1698,12 +1770,20 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1698 u64 c = ml->cookie; 1770 u64 c = ml->cookie;
1699 mlog(ML_ERROR, "could not find local lock " 1771 mlog(ML_ERROR, "could not find local lock "
1700 "with cookie %u:%llu!\n", 1772 "with cookie %u:%llu!\n",
1701 dlm_get_lock_cookie_node(c), 1773 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1702 dlm_get_lock_cookie_seq(c)); 1774 dlm_get_lock_cookie_seq(be64_to_cpu(c)));
1775 __dlm_print_one_lock_resource(res);
1703 BUG(); 1776 BUG();
1704 } 1777 }
1705 BUG_ON(lock->ml.node != ml->node); 1778 BUG_ON(lock->ml.node != ml->node);
1706 1779
1780 if (tmpq != queue) {
1781 mlog(0, "lock was on %u instead of %u for %.*s\n",
1782 j, ml->list, res->lockname.len, res->lockname.name);
1783 spin_unlock(&res->spinlock);
1784 continue;
1785 }
1786
1707 /* see NOTE above about why we do not update 1787 /* see NOTE above about why we do not update
1708 * to match the master here */ 1788 * to match the master here */
1709 1789
@@ -1711,6 +1791,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1711 /* do not alter lock refcount. switching lists. */ 1791 /* do not alter lock refcount. switching lists. */
1712 list_move_tail(&lock->list, queue); 1792 list_move_tail(&lock->list, queue);
1713 spin_unlock(&res->spinlock); 1793 spin_unlock(&res->spinlock);
1794 added++;
1714 1795
1715 mlog(0, "just reordered a local lock!\n"); 1796 mlog(0, "just reordered a local lock!\n");
1716 continue; 1797 continue;
@@ -1799,14 +1880,14 @@ skip_lvb:
1799 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " 1880 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
1800 "exists on this lockres!\n", dlm->name, 1881 "exists on this lockres!\n", dlm->name,
1801 res->lockname.len, res->lockname.name, 1882 res->lockname.len, res->lockname.name,
1802 dlm_get_lock_cookie_node(c), 1883 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1803 dlm_get_lock_cookie_seq(c)); 1884 dlm_get_lock_cookie_seq(be64_to_cpu(c)));
1804 1885
1805 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " 1886 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
1806 "node=%u, cookie=%u:%llu, queue=%d\n", 1887 "node=%u, cookie=%u:%llu, queue=%d\n",
1807 ml->type, ml->convert_type, ml->node, 1888 ml->type, ml->convert_type, ml->node,
1808 dlm_get_lock_cookie_node(ml->cookie), 1889 dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
1809 dlm_get_lock_cookie_seq(ml->cookie), 1890 dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
1810 ml->list); 1891 ml->list);
1811 1892
1812 __dlm_print_one_lock_resource(res); 1893 __dlm_print_one_lock_resource(res);
@@ -1817,12 +1898,22 @@ skip_lvb:
1817 if (!bad) { 1898 if (!bad) {
1818 dlm_lock_get(newlock); 1899 dlm_lock_get(newlock);
1819 list_add_tail(&newlock->list, queue); 1900 list_add_tail(&newlock->list, queue);
1901 mlog(0, "%s:%.*s: added lock for node %u, "
1902 "setting refmap bit\n", dlm->name,
1903 res->lockname.len, res->lockname.name, ml->node);
1904 dlm_lockres_set_refmap_bit(ml->node, res);
1905 added++;
1820 } 1906 }
1821 spin_unlock(&res->spinlock); 1907 spin_unlock(&res->spinlock);
1822 } 1908 }
1823 mlog(0, "done running all the locks\n"); 1909 mlog(0, "done running all the locks\n");
1824 1910
1825leave: 1911leave:
1912 /* balance the ref taken when the work was queued */
1913 spin_lock(&res->spinlock);
1914 dlm_lockres_drop_inflight_ref(dlm, res);
1915 spin_unlock(&res->spinlock);
1916
1826 if (ret < 0) { 1917 if (ret < 0) {
1827 mlog_errno(ret); 1918 mlog_errno(ret);
1828 if (newlock) 1919 if (newlock)
@@ -1935,9 +2026,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1935 if (res->owner == dead_node) { 2026 if (res->owner == dead_node) {
1936 list_del_init(&res->recovering); 2027 list_del_init(&res->recovering);
1937 spin_lock(&res->spinlock); 2028 spin_lock(&res->spinlock);
2029 /* new_master has our reference from
2030 * the lock state sent during recovery */
1938 dlm_change_lockres_owner(dlm, res, new_master); 2031 dlm_change_lockres_owner(dlm, res, new_master);
1939 res->state &= ~DLM_LOCK_RES_RECOVERING; 2032 res->state &= ~DLM_LOCK_RES_RECOVERING;
1940 if (!__dlm_lockres_unused(res)) 2033 if (__dlm_lockres_has_locks(res))
1941 __dlm_dirty_lockres(dlm, res); 2034 __dlm_dirty_lockres(dlm, res);
1942 spin_unlock(&res->spinlock); 2035 spin_unlock(&res->spinlock);
1943 wake_up(&res->wq); 2036 wake_up(&res->wq);
@@ -1977,9 +2070,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1977 dlm_lockres_put(res); 2070 dlm_lockres_put(res);
1978 } 2071 }
1979 spin_lock(&res->spinlock); 2072 spin_lock(&res->spinlock);
2073 /* new_master has our reference from
2074 * the lock state sent during recovery */
1980 dlm_change_lockres_owner(dlm, res, new_master); 2075 dlm_change_lockres_owner(dlm, res, new_master);
1981 res->state &= ~DLM_LOCK_RES_RECOVERING; 2076 res->state &= ~DLM_LOCK_RES_RECOVERING;
1982 if (!__dlm_lockres_unused(res)) 2077 if (__dlm_lockres_has_locks(res))
1983 __dlm_dirty_lockres(dlm, res); 2078 __dlm_dirty_lockres(dlm, res);
1984 spin_unlock(&res->spinlock); 2079 spin_unlock(&res->spinlock);
1985 wake_up(&res->wq); 2080 wake_up(&res->wq);
@@ -2048,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2048{ 2143{
2049 struct list_head *iter, *tmpiter; 2144 struct list_head *iter, *tmpiter;
2050 struct dlm_lock *lock; 2145 struct dlm_lock *lock;
2146 unsigned int freed = 0;
2051 2147
2052 /* this node is the lockres master: 2148 /* this node is the lockres master:
2053 * 1) remove any stale locks for the dead node 2149 * 1) remove any stale locks for the dead node
@@ -2062,6 +2158,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2062 if (lock->ml.node == dead_node) { 2158 if (lock->ml.node == dead_node) {
2063 list_del_init(&lock->list); 2159 list_del_init(&lock->list);
2064 dlm_lock_put(lock); 2160 dlm_lock_put(lock);
2161 freed++;
2065 } 2162 }
2066 } 2163 }
2067 list_for_each_safe(iter, tmpiter, &res->converting) { 2164 list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2166,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2069 if (lock->ml.node == dead_node) { 2166 if (lock->ml.node == dead_node) {
2070 list_del_init(&lock->list); 2167 list_del_init(&lock->list);
2071 dlm_lock_put(lock); 2168 dlm_lock_put(lock);
2169 freed++;
2072 } 2170 }
2073 } 2171 }
2074 list_for_each_safe(iter, tmpiter, &res->blocked) { 2172 list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2174,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2076 if (lock->ml.node == dead_node) { 2174 if (lock->ml.node == dead_node) {
2077 list_del_init(&lock->list); 2175 list_del_init(&lock->list);
2078 dlm_lock_put(lock); 2176 dlm_lock_put(lock);
2177 freed++;
2079 } 2178 }
2080 } 2179 }
2081 2180
2181 if (freed) {
2182 mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
2183 "dropping ref from lockres\n", dlm->name,
2184 res->lockname.len, res->lockname.name, freed, dead_node);
2185 BUG_ON(!test_bit(dead_node, res->refmap));
2186 dlm_lockres_clear_refmap_bit(dead_node, res);
2187 } else if (test_bit(dead_node, res->refmap)) {
2188 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2189 "no locks and had not purged before dying\n", dlm->name,
2190 res->lockname.len, res->lockname.name, dead_node);
2191 dlm_lockres_clear_refmap_bit(dead_node, res);
2192 }
2193
2082 /* do not kick thread yet */ 2194 /* do not kick thread yet */
2083 __dlm_dirty_lockres(dlm, res); 2195 __dlm_dirty_lockres(dlm, res);
2084} 2196}
@@ -2141,9 +2253,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2141 spin_lock(&res->spinlock); 2253 spin_lock(&res->spinlock);
2142 /* zero the lvb if necessary */ 2254 /* zero the lvb if necessary */
2143 dlm_revalidate_lvb(dlm, res, dead_node); 2255 dlm_revalidate_lvb(dlm, res, dead_node);
2144 if (res->owner == dead_node) 2256 if (res->owner == dead_node) {
2257 if (res->state & DLM_LOCK_RES_DROPPING_REF)
2258 mlog(0, "%s:%.*s: owned by "
2259 "dead node %u, this node was "
2260 "dropping its ref when it died. "
2261 "continue, dropping the flag.\n",
2262 dlm->name, res->lockname.len,
2263 res->lockname.name, dead_node);
2264
2265 /* the wake_up for this will happen when the
2266 * RECOVERING flag is dropped later */
2267 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
2268
2145 dlm_move_lockres_to_recovery_list(dlm, res); 2269 dlm_move_lockres_to_recovery_list(dlm, res);
2146 else if (res->owner == dlm->node_num) { 2270 } else if (res->owner == dlm->node_num) {
2147 dlm_free_dead_locks(dlm, res, dead_node); 2271 dlm_free_dead_locks(dlm, res, dead_node);
2148 __dlm_lockres_calc_usage(dlm, res); 2272 __dlm_lockres_calc_usage(dlm, res);
2149 } 2273 }
@@ -2480,7 +2604,8 @@ retry:
2480 return ret; 2604 return ret;
2481} 2605}
2482 2606
2483int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) 2607int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2608 void **ret_data)
2484{ 2609{
2485 struct dlm_ctxt *dlm = data; 2610 struct dlm_ctxt *dlm = data;
2486 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf; 2611 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2608,7 +2733,8 @@ stage2:
2608 return ret; 2733 return ret;
2609} 2734}
2610 2735
2611int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) 2736int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2737 void **ret_data)
2612{ 2738{
2613 struct dlm_ctxt *dlm = data; 2739 struct dlm_ctxt *dlm = data;
2614 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; 2740 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 0c822f3ffb05..8ffa0916eb86 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -54,9 +54,6 @@
54#include "cluster/masklog.h" 54#include "cluster/masklog.h"
55 55
56static int dlm_thread(void *data); 56static int dlm_thread(void *data);
57static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
59
60static void dlm_flush_asts(struct dlm_ctxt *dlm); 57static void dlm_flush_asts(struct dlm_ctxt *dlm);
61 58
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 59#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@ repeat:
82 current->state = TASK_RUNNING; 79 current->state = TASK_RUNNING;
83} 80}
84 81
85 82int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
86int __dlm_lockres_unused(struct dlm_lock_resource *res)
87{ 83{
88 if (list_empty(&res->granted) && 84 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) && 85 list_empty(&res->converting) &&
90 list_empty(&res->blocked) && 86 list_empty(&res->blocked))
91 list_empty(&res->dirty)) 87 return 0;
92 return 1; 88 return 1;
89}
90
91/* "unused": the lockres has no locks, is not on the dirty list,
92 * has no inflight locks (in the gap between mastery and acquiring
93 * the first lock), and has no bits in its refmap.
94 * truly ready to be freed. */
95int __dlm_lockres_unused(struct dlm_lock_resource *res)
96{
97 if (!__dlm_lockres_has_locks(res) &&
98 (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
99 /* try not to scan the bitmap unless the first two
100 * conditions are already true */
101 int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
102 if (bit >= O2NM_MAX_NODES) {
103 /* since the bit for dlm->node_num is not
104 * set, inflight_locks better be zero */
105 BUG_ON(res->inflight_locks != 0);
106 return 1;
107 }
108 }
93 return 0; 109 return 0;
94} 110}
95 111
@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
106 assert_spin_locked(&res->spinlock); 122 assert_spin_locked(&res->spinlock);
107 123
108 if (__dlm_lockres_unused(res)){ 124 if (__dlm_lockres_unused(res)){
109 /* For now, just keep any resource we master */
110 if (res->owner == dlm->node_num)
111 {
112 if (!list_empty(&res->purge)) {
113 mlog(0, "we master %s:%.*s, but it is on "
114 "the purge list. Removing\n",
115 dlm->name, res->lockname.len,
116 res->lockname.name);
117 list_del_init(&res->purge);
118 dlm->purge_count--;
119 }
120 return;
121 }
122
123 if (list_empty(&res->purge)) { 125 if (list_empty(&res->purge)) {
124 mlog(0, "putting lockres %.*s from purge list\n", 126 mlog(0, "putting lockres %.*s:%p onto purge list\n",
125 res->lockname.len, res->lockname.name); 127 res->lockname.len, res->lockname.name, res);
126 128
127 res->last_used = jiffies; 129 res->last_used = jiffies;
130 dlm_lockres_get(res);
128 list_add_tail(&res->purge, &dlm->purge_list); 131 list_add_tail(&res->purge, &dlm->purge_list);
129 dlm->purge_count++; 132 dlm->purge_count++;
130
131 /* if this node is not the owner, there is
132 * no way to keep track of who the owner could be.
133 * unhash it to avoid serious problems. */
134 if (res->owner != dlm->node_num) {
135 mlog(0, "%s:%.*s: doing immediate "
136 "purge of lockres owned by %u\n",
137 dlm->name, res->lockname.len,
138 res->lockname.name, res->owner);
139
140 dlm_purge_lockres_now(dlm, res);
141 }
142 } 133 }
143 } else if (!list_empty(&res->purge)) { 134 } else if (!list_empty(&res->purge)) {
144 mlog(0, "removing lockres %.*s from purge list, " 135 mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
145 "owner=%u\n", res->lockname.len, res->lockname.name, 136 res->lockname.len, res->lockname.name, res, res->owner);
146 res->owner);
147 137
148 list_del_init(&res->purge); 138 list_del_init(&res->purge);
139 dlm_lockres_put(res);
149 dlm->purge_count--; 140 dlm->purge_count--;
150 } 141 }
151} 142}
@@ -163,68 +154,65 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
163 spin_unlock(&dlm->spinlock); 154 spin_unlock(&dlm->spinlock);
164} 155}
165 156
166/* TODO: Eventual API: Called with the dlm spinlock held, may drop it 157static int dlm_purge_lockres(struct dlm_ctxt *dlm,
167 * to do migration, but will re-acquire before exit. */ 158 struct dlm_lock_resource *res)
168void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
169{ 159{
170 int master; 160 int master;
171 int ret; 161 int ret = 0;
172
173 spin_lock(&lockres->spinlock);
174 master = lockres->owner == dlm->node_num;
175 spin_unlock(&lockres->spinlock);
176 162
177 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len, 163 spin_lock(&res->spinlock);
178 lockres->lockname.name, master); 164 if (!__dlm_lockres_unused(res)) {
179 165 spin_unlock(&res->spinlock);
180 /* Non master is the easy case -- no migration required, just 166 mlog(0, "%s:%.*s: tried to purge but not unused\n",
181 * quit. */ 167 dlm->name, res->lockname.len, res->lockname.name);
168 return -ENOTEMPTY;
169 }
170 master = (res->owner == dlm->node_num);
182 if (!master) 171 if (!master)
183 goto finish; 172 res->state |= DLM_LOCK_RES_DROPPING_REF;
184 173 spin_unlock(&res->spinlock);
185 /* Wheee! Migrate lockres here! */
186 spin_unlock(&dlm->spinlock);
187again:
188 174
189 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); 175 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
190 if (ret == -ENOTEMPTY) { 176 res->lockname.name, master);
191 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
192 lockres->lockname.len, lockres->lockname.name);
193 177
194 BUG(); 178 if (!master) {
195 } else if (ret < 0) { 179 spin_lock(&res->spinlock);
196 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", 180 /* This ensures that clear refmap is sent after the set */
197 lockres->lockname.len, lockres->lockname.name); 181 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
198 msleep(100); 182 spin_unlock(&res->spinlock);
199 goto again; 183 /* drop spinlock to do messaging, retake below */
184 spin_unlock(&dlm->spinlock);
185 /* clear our bit from the master's refmap, ignore errors */
186 ret = dlm_drop_lockres_ref(dlm, res);
187 if (ret < 0) {
188 mlog_errno(ret);
189 if (!dlm_is_host_down(ret))
190 BUG();
191 }
192 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
193 dlm->name, res->lockname.len, res->lockname.name, ret);
194 spin_lock(&dlm->spinlock);
200 } 195 }
201 196
202 spin_lock(&dlm->spinlock); 197 if (!list_empty(&res->purge)) {
203 198 mlog(0, "removing lockres %.*s:%p from purgelist, "
204finish: 199 "master = %d\n", res->lockname.len, res->lockname.name,
205 if (!list_empty(&lockres->purge)) { 200 res, master);
206 list_del_init(&lockres->purge); 201 list_del_init(&res->purge);
202 dlm_lockres_put(res);
207 dlm->purge_count--; 203 dlm->purge_count--;
208 } 204 }
209 __dlm_unhash_lockres(lockres); 205 __dlm_unhash_lockres(res);
210}
211
212/* make an unused lockres go away immediately.
213 * as soon as the dlm spinlock is dropped, this lockres
214 * will not be found. kfree still happens on last put. */
215static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
216 struct dlm_lock_resource *lockres)
217{
218 assert_spin_locked(&dlm->spinlock);
219 assert_spin_locked(&lockres->spinlock);
220 206
221 BUG_ON(!__dlm_lockres_unused(lockres)); 207 /* lockres is not in the hash now. drop the flag and wake up
222 208 * any processes waiting in dlm_get_lock_resource. */
223 if (!list_empty(&lockres->purge)) { 209 if (!master) {
224 list_del_init(&lockres->purge); 210 spin_lock(&res->spinlock);
225 dlm->purge_count--; 211 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
212 spin_unlock(&res->spinlock);
213 wake_up(&res->wq);
226 } 214 }
227 __dlm_unhash_lockres(lockres); 215 return 0;
228} 216}
229 217
230static void dlm_run_purge_list(struct dlm_ctxt *dlm, 218static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -268,13 +256,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
268 break; 256 break;
269 } 257 }
270 258
259 mlog(0, "removing lockres %.*s:%p from purgelist\n",
260 lockres->lockname.len, lockres->lockname.name, lockres);
271 list_del_init(&lockres->purge); 261 list_del_init(&lockres->purge);
262 dlm_lockres_put(lockres);
272 dlm->purge_count--; 263 dlm->purge_count--;
273 264
274 /* This may drop and reacquire the dlm spinlock if it 265 /* This may drop and reacquire the dlm spinlock if it
275 * has to do migration. */ 266 * has to do migration. */
276 mlog(0, "calling dlm_purge_lockres!\n"); 267 mlog(0, "calling dlm_purge_lockres!\n");
277 dlm_purge_lockres(dlm, lockres); 268 if (dlm_purge_lockres(dlm, lockres))
269 BUG();
278 mlog(0, "DONE calling dlm_purge_lockres!\n"); 270 mlog(0, "DONE calling dlm_purge_lockres!\n");
279 271
280 /* Avoid adding any scheduling latencies */ 272 /* Avoid adding any scheduling latencies */
@@ -467,12 +459,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
467 assert_spin_locked(&res->spinlock); 459 assert_spin_locked(&res->spinlock);
468 460
469 /* don't shuffle secondary queues */ 461 /* don't shuffle secondary queues */
470 if ((res->owner == dlm->node_num) && 462 if ((res->owner == dlm->node_num)) {
471 !(res->state & DLM_LOCK_RES_DIRTY)) { 463 if (res->state & (DLM_LOCK_RES_MIGRATING |
472 /* ref for dirty_list */ 464 DLM_LOCK_RES_BLOCK_DIRTY))
473 dlm_lockres_get(res); 465 return;
474 list_add_tail(&res->dirty, &dlm->dirty_list); 466
475 res->state |= DLM_LOCK_RES_DIRTY; 467 if (list_empty(&res->dirty)) {
468 /* ref for dirty_list */
469 dlm_lockres_get(res);
470 list_add_tail(&res->dirty, &dlm->dirty_list);
471 res->state |= DLM_LOCK_RES_DIRTY;
472 }
476 } 473 }
477} 474}
478 475
@@ -651,7 +648,7 @@ static int dlm_thread(void *data)
651 dlm_lockres_get(res); 648 dlm_lockres_get(res);
652 649
653 spin_lock(&res->spinlock); 650 spin_lock(&res->spinlock);
654 res->state &= ~DLM_LOCK_RES_DIRTY; 651 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
655 list_del_init(&res->dirty); 652 list_del_init(&res->dirty);
656 spin_unlock(&res->spinlock); 653 spin_unlock(&res->spinlock);
657 spin_unlock(&dlm->spinlock); 654 spin_unlock(&dlm->spinlock);
@@ -675,10 +672,11 @@ static int dlm_thread(void *data)
675 /* it is now ok to move lockreses in these states 672 /* it is now ok to move lockreses in these states
676 * to the dirty list, assuming that they will only be 673 * to the dirty list, assuming that they will only be
677 * dirty for a short while. */ 674 * dirty for a short while. */
675 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
678 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 676 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
679 DLM_LOCK_RES_MIGRATING |
680 DLM_LOCK_RES_RECOVERING)) { 677 DLM_LOCK_RES_RECOVERING)) {
681 /* move it to the tail and keep going */ 678 /* move it to the tail and keep going */
679 res->state &= ~DLM_LOCK_RES_DIRTY;
682 spin_unlock(&res->spinlock); 680 spin_unlock(&res->spinlock);
683 mlog(0, "delaying list shuffling for in-" 681 mlog(0, "delaying list shuffling for in-"
684 "progress lockres %.*s, state=%d\n", 682 "progress lockres %.*s, state=%d\n",
@@ -699,6 +697,7 @@ static int dlm_thread(void *data)
699 697
700 /* called while holding lockres lock */ 698 /* called while holding lockres lock */
701 dlm_shuffle_lists(dlm, res); 699 dlm_shuffle_lists(dlm, res);
700 res->state &= ~DLM_LOCK_RES_DIRTY;
702 spin_unlock(&res->spinlock); 701 spin_unlock(&res->spinlock);
703 702
704 dlm_lockres_calc_usage(dlm, res); 703 dlm_lockres_calc_usage(dlm, res);
@@ -709,11 +708,8 @@ in_progress:
709 /* if the lock was in-progress, stick 708 /* if the lock was in-progress, stick
710 * it on the back of the list */ 709 * it on the back of the list */
711 if (delay) { 710 if (delay) {
712 /* ref for dirty_list */
713 dlm_lockres_get(res);
714 spin_lock(&res->spinlock); 711 spin_lock(&res->spinlock);
715 list_add_tail(&res->dirty, &dlm->dirty_list); 712 __dlm_dirty_lockres(dlm, res);
716 res->state |= DLM_LOCK_RES_DIRTY;
717 spin_unlock(&res->spinlock); 713 spin_unlock(&res->spinlock);
718 } 714 }
719 dlm_lockres_put(res); 715 dlm_lockres_put(res);
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index 37be4b2e0d4a..86ca085ef324 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -147,6 +147,10 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
147 goto leave; 147 goto leave;
148 } 148 }
149 149
150 if (res->state & DLM_LOCK_RES_MIGRATING) {
151 status = DLM_MIGRATING;
152 goto leave;
153 }
150 154
151 /* see above for what the spec says about 155 /* see above for what the spec says about
152 * LKM_CANCEL and the lock queue state */ 156 * LKM_CANCEL and the lock queue state */
@@ -244,8 +248,8 @@ leave:
244 /* this should always be coupled with list removal */ 248 /* this should always be coupled with list removal */
245 BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK)); 249 BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
246 mlog(0, "lock %u:%llu should be gone now! refs=%d\n", 250 mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
247 dlm_get_lock_cookie_node(lock->ml.cookie), 251 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
248 dlm_get_lock_cookie_seq(lock->ml.cookie), 252 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
249 atomic_read(&lock->lock_refs.refcount)-1); 253 atomic_read(&lock->lock_refs.refcount)-1);
250 dlm_lock_put(lock); 254 dlm_lock_put(lock);
251 } 255 }
@@ -379,7 +383,8 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
379 * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID, 383 * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
380 * return value from dlmunlock_master 384 * return value from dlmunlock_master
381 */ 385 */
382int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data) 386int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
387 void **ret_data)
383{ 388{
384 struct dlm_ctxt *dlm = data; 389 struct dlm_ctxt *dlm = data;
385 struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf; 390 struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
@@ -502,8 +507,8 @@ not_found:
502 if (!found) 507 if (!found)
503 mlog(ML_ERROR, "failed to find lock to unlock! " 508 mlog(ML_ERROR, "failed to find lock to unlock! "
504 "cookie=%u:%llu\n", 509 "cookie=%u:%llu\n",
505 dlm_get_lock_cookie_node(unlock->cookie), 510 dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
506 dlm_get_lock_cookie_seq(unlock->cookie)); 511 dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
507 else 512 else
508 dlm_lock_put(lock); 513 dlm_lock_put(lock);
509 514
diff --git a/fs/ocfs2/vote.c b/fs/ocfs2/vote.c
index 0afd8b9af70f..f30e63b9910c 100644
--- a/fs/ocfs2/vote.c
+++ b/fs/ocfs2/vote.c
@@ -887,7 +887,7 @@ static inline int ocfs2_translate_response(int response)
887 887
888static int ocfs2_handle_response_message(struct o2net_msg *msg, 888static int ocfs2_handle_response_message(struct o2net_msg *msg,
889 u32 len, 889 u32 len,
890 void *data) 890 void *data, void **ret_data)
891{ 891{
892 unsigned int response_id, node_num; 892 unsigned int response_id, node_num;
893 int response_status; 893 int response_status;
@@ -943,7 +943,7 @@ bail:
943 943
944static int ocfs2_handle_vote_message(struct o2net_msg *msg, 944static int ocfs2_handle_vote_message(struct o2net_msg *msg,
945 u32 len, 945 u32 len,
946 void *data) 946 void *data, void **ret_data)
947{ 947{
948 int status; 948 int status;
949 struct ocfs2_super *osb = data; 949 struct ocfs2_super *osb = data;
@@ -1007,7 +1007,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
1007 osb->net_key, 1007 osb->net_key,
1008 sizeof(struct ocfs2_response_msg), 1008 sizeof(struct ocfs2_response_msg),
1009 ocfs2_handle_response_message, 1009 ocfs2_handle_response_message,
1010 osb, &osb->osb_net_handlers); 1010 osb, NULL, &osb->osb_net_handlers);
1011 if (status) { 1011 if (status) {
1012 mlog_errno(status); 1012 mlog_errno(status);
1013 goto bail; 1013 goto bail;
@@ -1017,7 +1017,7 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
1017 osb->net_key, 1017 osb->net_key,
1018 sizeof(struct ocfs2_vote_msg), 1018 sizeof(struct ocfs2_vote_msg),
1019 ocfs2_handle_vote_message, 1019 ocfs2_handle_vote_message,
1020 osb, &osb->osb_net_handlers); 1020 osb, NULL, &osb->osb_net_handlers);
1021 if (status) { 1021 if (status) {
1022 mlog_errno(status); 1022 mlog_errno(status);
1023 goto bail; 1023 goto bail;