aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2
diff options
context:
space:
mode:
authorDave Kleikamp <shaggy@austin.ibm.com>2006-03-14 18:05:45 -0500
committerDave Kleikamp <shaggy@austin.ibm.com>2006-03-14 18:05:45 -0500
commitc5111f504d2a9b0d258d7c4752b4093523315989 (patch)
tree6a52864aff79691689aea21cb0cb928327d5de5b /fs/ocfs2
parent69eb66d7da7dba2696281981347698e1693c2340 (diff)
parenta488edc914aa1d766a4e2c982b5ae03d5657ec1b (diff)
Merge with /home/shaggy/git/linus-clean/
Diffstat (limited to 'fs/ocfs2')
-rw-r--r--fs/ocfs2/buffer_head_io.c10
-rw-r--r--fs/ocfs2/cluster/heartbeat.c5
-rw-r--r--fs/ocfs2/cluster/masklog.c1
-rw-r--r--fs/ocfs2/cluster/masklog.h2
-rw-r--r--fs/ocfs2/cluster/nodemanager.c4
-rw-r--r--fs/ocfs2/cluster/tcp.c30
-rw-r--r--fs/ocfs2/cluster/tcp.h5
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h13
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c12
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c12
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c57
-rw-r--r--fs/ocfs2/dlm/dlmlock.c25
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c35
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c315
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c13
-rw-r--r--fs/ocfs2/dlm/userdlm.c2
-rw-r--r--fs/ocfs2/extent_map.c50
-rw-r--r--fs/ocfs2/file.c61
-rw-r--r--fs/ocfs2/heartbeat.c1
-rw-r--r--fs/ocfs2/inode.c52
-rw-r--r--fs/ocfs2/inode.h4
-rw-r--r--fs/ocfs2/journal.c163
-rw-r--r--fs/ocfs2/journal.h2
-rw-r--r--fs/ocfs2/ocfs2.h10
-rw-r--r--fs/ocfs2/ocfs2_fs.h1
-rw-r--r--fs/ocfs2/super.c22
-rw-r--r--fs/ocfs2/sysfile.c6
-rw-r--r--fs/ocfs2/uptodate.c12
-rw-r--r--fs/ocfs2/uptodate.h2
29 files changed, 660 insertions, 267 deletions
diff --git a/fs/ocfs2/buffer_head_io.c b/fs/ocfs2/buffer_head_io.c
index d424041b38e9..bae3d7548bea 100644
--- a/fs/ocfs2/buffer_head_io.c
+++ b/fs/ocfs2/buffer_head_io.c
@@ -58,7 +58,7 @@ int ocfs2_write_block(struct ocfs2_super *osb, struct buffer_head *bh,
58 goto out; 58 goto out;
59 } 59 }
60 60
61 down(&OCFS2_I(inode)->ip_io_sem); 61 mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
62 62
63 lock_buffer(bh); 63 lock_buffer(bh);
64 set_buffer_uptodate(bh); 64 set_buffer_uptodate(bh);
@@ -82,7 +82,7 @@ int ocfs2_write_block(struct ocfs2_super *osb, struct buffer_head *bh,
82 brelse(bh); 82 brelse(bh);
83 } 83 }
84 84
85 up(&OCFS2_I(inode)->ip_io_sem); 85 mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
86out: 86out:
87 mlog_exit(ret); 87 mlog_exit(ret);
88 return ret; 88 return ret;
@@ -125,13 +125,13 @@ int ocfs2_read_blocks(struct ocfs2_super *osb, u64 block, int nr,
125 flags &= ~OCFS2_BH_CACHED; 125 flags &= ~OCFS2_BH_CACHED;
126 126
127 if (inode) 127 if (inode)
128 down(&OCFS2_I(inode)->ip_io_sem); 128 mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
129 for (i = 0 ; i < nr ; i++) { 129 for (i = 0 ; i < nr ; i++) {
130 if (bhs[i] == NULL) { 130 if (bhs[i] == NULL) {
131 bhs[i] = sb_getblk(sb, block++); 131 bhs[i] = sb_getblk(sb, block++);
132 if (bhs[i] == NULL) { 132 if (bhs[i] == NULL) {
133 if (inode) 133 if (inode)
134 up(&OCFS2_I(inode)->ip_io_sem); 134 mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
135 status = -EIO; 135 status = -EIO;
136 mlog_errno(status); 136 mlog_errno(status);
137 goto bail; 137 goto bail;
@@ -220,7 +220,7 @@ int ocfs2_read_blocks(struct ocfs2_super *osb, u64 block, int nr,
220 ocfs2_set_buffer_uptodate(inode, bh); 220 ocfs2_set_buffer_uptodate(inode, bh);
221 } 221 }
222 if (inode) 222 if (inode)
223 up(&OCFS2_I(inode)->ip_io_sem); 223 mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
224 224
225 mlog(ML_BH_IO, "block=(%"MLFu64"), nr=(%d), cached=%s\n", block, nr, 225 mlog(ML_BH_IO, "block=(%"MLFu64"), nr=(%d), cached=%s\n", block, nr,
226 (!(flags & OCFS2_BH_CACHED) || ignore_cache) ? "no" : "yes"); 226 (!(flags & OCFS2_BH_CACHED) || ignore_cache) ? "no" : "yes");
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 7307ba528913..d08971d29b63 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -917,8 +917,9 @@ static int o2hb_thread(void *data)
917 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 917 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
918 918
919 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 919 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
920 before_hb.tv_sec, before_hb.tv_usec, 920 before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
921 after_hb.tv_sec, after_hb.tv_usec, elapsed_msec); 921 after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
922 elapsed_msec);
922 923
923 if (elapsed_msec < reg->hr_timeout_ms) { 924 if (elapsed_msec < reg->hr_timeout_ms) {
924 /* the kthread api has blocked signals for us so no 925 /* the kthread api has blocked signals for us so no
diff --git a/fs/ocfs2/cluster/masklog.c b/fs/ocfs2/cluster/masklog.c
index fd741cea5705..636593bf4d17 100644
--- a/fs/ocfs2/cluster/masklog.c
+++ b/fs/ocfs2/cluster/masklog.c
@@ -74,6 +74,7 @@ struct mlog_attribute {
74#define define_mask(_name) { \ 74#define define_mask(_name) { \
75 .attr = { \ 75 .attr = { \
76 .name = #_name, \ 76 .name = #_name, \
77 .owner = THIS_MODULE, \
77 .mode = S_IRUGO | S_IWUSR, \ 78 .mode = S_IRUGO | S_IWUSR, \
78 }, \ 79 }, \
79 .mask = ML_##_name, \ 80 .mask = ML_##_name, \
diff --git a/fs/ocfs2/cluster/masklog.h b/fs/ocfs2/cluster/masklog.h
index e8c56a3d9c64..2cadc3009c83 100644
--- a/fs/ocfs2/cluster/masklog.h
+++ b/fs/ocfs2/cluster/masklog.h
@@ -256,7 +256,7 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits;
256 } \ 256 } \
257} while (0) 257} while (0)
258 258
259#if (BITS_PER_LONG == 32) || defined(CONFIG_X86_64) 259#if (BITS_PER_LONG == 32) || defined(CONFIG_X86_64) || (defined(CONFIG_UML_X86) && defined(CONFIG_64BIT))
260#define MLFi64 "lld" 260#define MLFi64 "lld"
261#define MLFu64 "llu" 261#define MLFu64 "llu"
262#define MLFx64 "llx" 262#define MLFx64 "llx"
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index cf7828f23361..e1fceb8aa32d 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -756,7 +756,7 @@ static int __init init_o2nm(void)
756 if (!ocfs2_table_header) { 756 if (!ocfs2_table_header) {
757 printk(KERN_ERR "nodemanager: unable to register sysctl\n"); 757 printk(KERN_ERR "nodemanager: unable to register sysctl\n");
758 ret = -ENOMEM; /* or something. */ 758 ret = -ENOMEM; /* or something. */
759 goto out; 759 goto out_o2net;
760 } 760 }
761 761
762 ret = o2net_register_hb_callbacks(); 762 ret = o2net_register_hb_callbacks();
@@ -780,6 +780,8 @@ out_callbacks:
780 o2net_unregister_hb_callbacks(); 780 o2net_unregister_hb_callbacks();
781out_sysctl: 781out_sysctl:
782 unregister_sysctl_table(ocfs2_table_header); 782 unregister_sysctl_table(ocfs2_table_header);
783out_o2net:
784 o2net_exit();
783out: 785out:
784 return ret; 786 return ret;
785} 787}
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index 35d92c01a972..0f60cc0d3985 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -1285,14 +1285,16 @@ static void o2net_idle_timer(unsigned long data)
1285 mlog(ML_NOTICE, "here are some times that might help debug the " 1285 mlog(ML_NOTICE, "here are some times that might help debug the "
1286 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " 1286 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
1287 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", 1287 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
1288 sc->sc_tv_timer.tv_sec, sc->sc_tv_timer.tv_usec, 1288 sc->sc_tv_timer.tv_sec, (long) sc->sc_tv_timer.tv_usec,
1289 now.tv_sec, now.tv_usec, 1289 now.tv_sec, (long) now.tv_usec,
1290 sc->sc_tv_data_ready.tv_sec, sc->sc_tv_data_ready.tv_usec, 1290 sc->sc_tv_data_ready.tv_sec, (long) sc->sc_tv_data_ready.tv_usec,
1291 sc->sc_tv_advance_start.tv_sec, sc->sc_tv_advance_start.tv_usec, 1291 sc->sc_tv_advance_start.tv_sec,
1292 sc->sc_tv_advance_stop.tv_sec, sc->sc_tv_advance_stop.tv_usec, 1292 (long) sc->sc_tv_advance_start.tv_usec,
1293 sc->sc_tv_advance_stop.tv_sec,
1294 (long) sc->sc_tv_advance_stop.tv_usec,
1293 sc->sc_msg_key, sc->sc_msg_type, 1295 sc->sc_msg_key, sc->sc_msg_type,
1294 sc->sc_tv_func_start.tv_sec, sc->sc_tv_func_start.tv_usec, 1296 sc->sc_tv_func_start.tv_sec, (long) sc->sc_tv_func_start.tv_usec,
1295 sc->sc_tv_func_stop.tv_sec, sc->sc_tv_func_stop.tv_usec); 1297 sc->sc_tv_func_stop.tv_sec, (long) sc->sc_tv_func_stop.tv_usec);
1296 1298
1297 o2net_sc_queue_work(sc, &sc->sc_shutdown_work); 1299 o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
1298} 1300}
@@ -1316,7 +1318,7 @@ static void o2net_start_connect(void *arg)
1316{ 1318{
1317 struct o2net_node *nn = arg; 1319 struct o2net_node *nn = arg;
1318 struct o2net_sock_container *sc = NULL; 1320 struct o2net_sock_container *sc = NULL;
1319 struct o2nm_node *node = NULL; 1321 struct o2nm_node *node = NULL, *mynode = NULL;
1320 struct socket *sock = NULL; 1322 struct socket *sock = NULL;
1321 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; 1323 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1322 int ret = 0; 1324 int ret = 0;
@@ -1332,6 +1334,12 @@ static void o2net_start_connect(void *arg)
1332 goto out; 1334 goto out;
1333 } 1335 }
1334 1336
1337 mynode = o2nm_get_node_by_num(o2nm_this_node());
1338 if (mynode == NULL) {
1339 ret = 0;
1340 goto out;
1341 }
1342
1335 spin_lock(&nn->nn_lock); 1343 spin_lock(&nn->nn_lock);
1336 /* see if we already have one pending or have given up */ 1344 /* see if we already have one pending or have given up */
1337 if (nn->nn_sc || nn->nn_persistent_error) 1345 if (nn->nn_sc || nn->nn_persistent_error)
@@ -1359,12 +1367,14 @@ static void o2net_start_connect(void *arg)
1359 sock->sk->sk_allocation = GFP_ATOMIC; 1367 sock->sk->sk_allocation = GFP_ATOMIC;
1360 1368
1361 myaddr.sin_family = AF_INET; 1369 myaddr.sin_family = AF_INET;
1370 myaddr.sin_addr.s_addr = (__force u32)mynode->nd_ipv4_address;
1362 myaddr.sin_port = (__force u16)htons(0); /* any port */ 1371 myaddr.sin_port = (__force u16)htons(0); /* any port */
1363 1372
1364 ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr, 1373 ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
1365 sizeof(myaddr)); 1374 sizeof(myaddr));
1366 if (ret) { 1375 if (ret) {
1367 mlog(0, "bind failed: %d\n", ret); 1376 mlog(ML_ERROR, "bind failed with %d at address %u.%u.%u.%u\n",
1377 ret, NIPQUAD(mynode->nd_ipv4_address));
1368 goto out; 1378 goto out;
1369 } 1379 }
1370 1380
@@ -1405,6 +1415,8 @@ out:
1405 sc_put(sc); 1415 sc_put(sc);
1406 if (node) 1416 if (node)
1407 o2nm_node_put(node); 1417 o2nm_node_put(node);
1418 if (mynode)
1419 o2nm_node_put(mynode);
1408 1420
1409 return; 1421 return;
1410} 1422}
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index a6f4585501c8..616ff2b8434a 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -85,13 +85,10 @@ enum {
85 O2NET_DRIVER_READY, 85 O2NET_DRIVER_READY,
86}; 86};
87 87
88int o2net_init_tcp_sock(struct inode *inode);
89int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len, 88int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
90 u8 target_node, int *status); 89 u8 target_node, int *status);
91int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec, 90int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
92 size_t veclen, u8 target_node, int *status); 91 size_t veclen, u8 target_node, int *status);
93int o2net_broadcast_message(u32 msg_type, u32 key, void *data, u32 len,
94 struct inode *group);
95 92
96int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, 93int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
97 o2net_msg_handler_func *func, void *data, 94 o2net_msg_handler_func *func, void *data,
@@ -107,7 +104,5 @@ void o2net_disconnect_node(struct o2nm_node *node);
107 104
108int o2net_init(void); 105int o2net_init(void);
109void o2net_exit(void); 106void o2net_exit(void);
110int o2net_proc_init(struct proc_dir_entry *parent);
111void o2net_proc_exit(struct proc_dir_entry *parent);
112 107
113#endif /* O2CLUSTER_TCP_H */ 108#endif /* O2CLUSTER_TCP_H */
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 3fecba0a6023..9c772583744a 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -37,9 +37,7 @@
37#define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes 37#define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes
38#define DLM_THREAD_MS 200 // flush at least every 200 ms 38#define DLM_THREAD_MS 200 // flush at least every 200 ms
39 39
40#define DLM_HASH_BITS 7 40#define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head))
41#define DLM_HASH_SIZE (1 << DLM_HASH_BITS)
42#define DLM_HASH_MASK (DLM_HASH_SIZE - 1)
43 41
44enum dlm_ast_type { 42enum dlm_ast_type {
45 DLM_AST = 0, 43 DLM_AST = 0,
@@ -87,7 +85,7 @@ enum dlm_ctxt_state {
87struct dlm_ctxt 85struct dlm_ctxt
88{ 86{
89 struct list_head list; 87 struct list_head list;
90 struct list_head *resources; 88 struct hlist_head *lockres_hash;
91 struct list_head dirty_list; 89 struct list_head dirty_list;
92 struct list_head purge_list; 90 struct list_head purge_list;
93 struct list_head pending_asts; 91 struct list_head pending_asts;
@@ -208,13 +206,16 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
208#define DLM_LOCK_RES_IN_PROGRESS 0x00000010 206#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
209#define DLM_LOCK_RES_MIGRATING 0x00000020 207#define DLM_LOCK_RES_MIGRATING 0x00000020
210 208
209/* max milliseconds to wait to sync up a network failure with a node death */
210#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
211
211#define DLM_PURGE_INTERVAL_MS (8 * 1000) 212#define DLM_PURGE_INTERVAL_MS (8 * 1000)
212 213
213struct dlm_lock_resource 214struct dlm_lock_resource
214{ 215{
215 /* WARNING: Please see the comment in dlm_init_lockres before 216 /* WARNING: Please see the comment in dlm_init_lockres before
216 * adding fields here. */ 217 * adding fields here. */
217 struct list_head list; 218 struct hlist_node hash_node;
218 struct kref refs; 219 struct kref refs;
219 220
220 /* please keep these next 3 in this order 221 /* please keep these next 3 in this order
@@ -657,6 +658,8 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
657int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); 658int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
658void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); 659void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
659void dlm_wait_for_recovery(struct dlm_ctxt *dlm); 660void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
661int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
662int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
660 663
661void dlm_put(struct dlm_ctxt *dlm); 664void dlm_put(struct dlm_ctxt *dlm);
662struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); 665struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index 6001b22a997d..f66e2d818ccd 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -392,6 +392,11 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
392 } else { 392 } else {
393 mlog_errno(tmpret); 393 mlog_errno(tmpret);
394 if (dlm_is_host_down(tmpret)) { 394 if (dlm_is_host_down(tmpret)) {
395 /* instead of logging the same network error over
396 * and over, sleep here and wait for the heartbeat
397 * to notice the node is dead. times out after 5s. */
398 dlm_wait_for_node_death(dlm, res->owner,
399 DLM_NODE_DEATH_WAIT_MAX);
395 ret = DLM_RECOVERING; 400 ret = DLM_RECOVERING;
396 mlog(0, "node %u died so returning DLM_RECOVERING " 401 mlog(0, "node %u died so returning DLM_RECOVERING "
397 "from convert message!\n", res->owner); 402 "from convert message!\n", res->owner);
@@ -421,7 +426,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
421 struct dlm_lockstatus *lksb; 426 struct dlm_lockstatus *lksb;
422 enum dlm_status status = DLM_NORMAL; 427 enum dlm_status status = DLM_NORMAL;
423 u32 flags; 428 u32 flags;
424 int call_ast = 0, kick_thread = 0; 429 int call_ast = 0, kick_thread = 0, ast_reserved = 0;
425 430
426 if (!dlm_grab(dlm)) { 431 if (!dlm_grab(dlm)) {
427 dlm_error(DLM_REJECTED); 432 dlm_error(DLM_REJECTED);
@@ -490,6 +495,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
490 status = __dlm_lockres_state_to_status(res); 495 status = __dlm_lockres_state_to_status(res);
491 if (status == DLM_NORMAL) { 496 if (status == DLM_NORMAL) {
492 __dlm_lockres_reserve_ast(res); 497 __dlm_lockres_reserve_ast(res);
498 ast_reserved = 1;
493 res->state |= DLM_LOCK_RES_IN_PROGRESS; 499 res->state |= DLM_LOCK_RES_IN_PROGRESS;
494 status = __dlmconvert_master(dlm, res, lock, flags, 500 status = __dlmconvert_master(dlm, res, lock, flags,
495 cnv->requested_type, 501 cnv->requested_type,
@@ -512,10 +518,10 @@ leave:
512 else 518 else
513 dlm_lock_put(lock); 519 dlm_lock_put(lock);
514 520
515 /* either queue the ast or release it */ 521 /* either queue the ast or release it, if reserved */
516 if (call_ast) 522 if (call_ast)
517 dlm_queue_ast(dlm, lock); 523 dlm_queue_ast(dlm, lock);
518 else 524 else if (ast_reserved)
519 dlm_lockres_release_ast(dlm, res); 525 dlm_lockres_release_ast(dlm, res);
520 526
521 if (kick_thread) 527 if (kick_thread)
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index f339fe27975a..54f61b76ab51 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -117,8 +117,8 @@ EXPORT_SYMBOL_GPL(dlm_print_one_lock);
117void dlm_dump_lock_resources(struct dlm_ctxt *dlm) 117void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
118{ 118{
119 struct dlm_lock_resource *res; 119 struct dlm_lock_resource *res;
120 struct list_head *iter; 120 struct hlist_node *iter;
121 struct list_head *bucket; 121 struct hlist_head *bucket;
122 int i; 122 int i;
123 123
124 mlog(ML_NOTICE, "struct dlm_ctxt: %s, node=%u, key=%u\n", 124 mlog(ML_NOTICE, "struct dlm_ctxt: %s, node=%u, key=%u\n",
@@ -129,12 +129,10 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
129 } 129 }
130 130
131 spin_lock(&dlm->spinlock); 131 spin_lock(&dlm->spinlock);
132 for (i=0; i<DLM_HASH_SIZE; i++) { 132 for (i=0; i<DLM_HASH_BUCKETS; i++) {
133 bucket = &(dlm->resources[i]); 133 bucket = &(dlm->lockres_hash[i]);
134 list_for_each(iter, bucket) { 134 hlist_for_each_entry(res, iter, bucket, hash_node)
135 res = list_entry(iter, struct dlm_lock_resource, list);
136 dlm_print_one_lock_resource(res); 135 dlm_print_one_lock_resource(res);
137 }
138 } 136 }
139 spin_unlock(&dlm->spinlock); 137 spin_unlock(&dlm->spinlock);
140} 138}
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index da3c22045f89..8f3a9e3106fd 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -77,26 +77,26 @@ static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
77 77
78void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) 78void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
79{ 79{
80 list_del_init(&lockres->list); 80 hlist_del_init(&lockres->hash_node);
81 dlm_lockres_put(lockres); 81 dlm_lockres_put(lockres);
82} 82}
83 83
84void __dlm_insert_lockres(struct dlm_ctxt *dlm, 84void __dlm_insert_lockres(struct dlm_ctxt *dlm,
85 struct dlm_lock_resource *res) 85 struct dlm_lock_resource *res)
86{ 86{
87 struct list_head *bucket; 87 struct hlist_head *bucket;
88 struct qstr *q; 88 struct qstr *q;
89 89
90 assert_spin_locked(&dlm->spinlock); 90 assert_spin_locked(&dlm->spinlock);
91 91
92 q = &res->lockname; 92 q = &res->lockname;
93 q->hash = full_name_hash(q->name, q->len); 93 q->hash = full_name_hash(q->name, q->len);
94 bucket = &(dlm->resources[q->hash & DLM_HASH_MASK]); 94 bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
95 95
96 /* get a reference for our hashtable */ 96 /* get a reference for our hashtable */
97 dlm_lockres_get(res); 97 dlm_lockres_get(res);
98 98
99 list_add_tail(&res->list, bucket); 99 hlist_add_head(&res->hash_node, bucket);
100} 100}
101 101
102struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, 102struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
@@ -104,9 +104,9 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
104 unsigned int len) 104 unsigned int len)
105{ 105{
106 unsigned int hash; 106 unsigned int hash;
107 struct list_head *iter; 107 struct hlist_node *iter;
108 struct dlm_lock_resource *tmpres=NULL; 108 struct dlm_lock_resource *tmpres=NULL;
109 struct list_head *bucket; 109 struct hlist_head *bucket;
110 110
111 mlog_entry("%.*s\n", len, name); 111 mlog_entry("%.*s\n", len, name);
112 112
@@ -114,11 +114,11 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
114 114
115 hash = full_name_hash(name, len); 115 hash = full_name_hash(name, len);
116 116
117 bucket = &(dlm->resources[hash & DLM_HASH_MASK]); 117 bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]);
118 118
119 /* check for pre-existing lock */ 119 /* check for pre-existing lock */
120 list_for_each(iter, bucket) { 120 hlist_for_each(iter, bucket) {
121 tmpres = list_entry(iter, struct dlm_lock_resource, list); 121 tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node);
122 if (tmpres->lockname.len == len && 122 if (tmpres->lockname.len == len &&
123 memcmp(tmpres->lockname.name, name, len) == 0) { 123 memcmp(tmpres->lockname.name, name, len) == 0) {
124 dlm_lockres_get(tmpres); 124 dlm_lockres_get(tmpres);
@@ -193,8 +193,8 @@ static int dlm_wait_on_domain_helper(const char *domain)
193 193
194static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) 194static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
195{ 195{
196 if (dlm->resources) 196 if (dlm->lockres_hash)
197 free_page((unsigned long) dlm->resources); 197 free_page((unsigned long) dlm->lockres_hash);
198 198
199 if (dlm->name) 199 if (dlm->name)
200 kfree(dlm->name); 200 kfree(dlm->name);
@@ -303,10 +303,10 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
303 mlog(0, "Migrating locks from domain %s\n", dlm->name); 303 mlog(0, "Migrating locks from domain %s\n", dlm->name);
304restart: 304restart:
305 spin_lock(&dlm->spinlock); 305 spin_lock(&dlm->spinlock);
306 for (i=0; i<DLM_HASH_SIZE; i++) { 306 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
307 while (!list_empty(&dlm->resources[i])) { 307 while (!hlist_empty(&dlm->lockres_hash[i])) {
308 res = list_entry(dlm->resources[i].next, 308 res = hlist_entry(dlm->lockres_hash[i].first,
309 struct dlm_lock_resource, list); 309 struct dlm_lock_resource, hash_node);
310 /* need reference when manually grabbing lockres */ 310 /* need reference when manually grabbing lockres */
311 dlm_lockres_get(res); 311 dlm_lockres_get(res);
312 /* this should unhash the lockres 312 /* this should unhash the lockres
@@ -573,8 +573,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
573 spin_lock(&dlm_domain_lock); 573 spin_lock(&dlm_domain_lock);
574 dlm = __dlm_lookup_domain_full(query->domain, query->name_len); 574 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
575 /* Once the dlm ctxt is marked as leaving then we don't want 575 /* Once the dlm ctxt is marked as leaving then we don't want
576 * to be put in someone's domain map. */ 576 * to be put in someone's domain map.
577 * Also, explicitly disallow joining at certain troublesome
578 * times (ie. during recovery). */
577 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { 579 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
580 int bit = query->node_idx;
578 spin_lock(&dlm->spinlock); 581 spin_lock(&dlm->spinlock);
579 582
580 if (dlm->dlm_state == DLM_CTXT_NEW && 583 if (dlm->dlm_state == DLM_CTXT_NEW &&
@@ -586,6 +589,19 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
586 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { 589 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
587 /* Disallow parallel joins. */ 590 /* Disallow parallel joins. */
588 response = JOIN_DISALLOW; 591 response = JOIN_DISALLOW;
592 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
593 mlog(ML_NOTICE, "node %u trying to join, but recovery "
594 "is ongoing.\n", bit);
595 response = JOIN_DISALLOW;
596 } else if (test_bit(bit, dlm->recovery_map)) {
597 mlog(ML_NOTICE, "node %u trying to join, but it "
598 "still needs recovery.\n", bit);
599 response = JOIN_DISALLOW;
600 } else if (test_bit(bit, dlm->domain_map)) {
601 mlog(ML_NOTICE, "node %u trying to join, but it "
602 "is still in the domain! needs recovery?\n",
603 bit);
604 response = JOIN_DISALLOW;
589 } else { 605 } else {
590 /* Alright we're fully a part of this domain 606 /* Alright we're fully a part of this domain
591 * so we keep some state as to who's joining 607 * so we keep some state as to who's joining
@@ -1175,18 +1191,17 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1175 goto leave; 1191 goto leave;
1176 } 1192 }
1177 1193
1178 dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL); 1194 dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL);
1179 if (!dlm->resources) { 1195 if (!dlm->lockres_hash) {
1180 mlog_errno(-ENOMEM); 1196 mlog_errno(-ENOMEM);
1181 kfree(dlm->name); 1197 kfree(dlm->name);
1182 kfree(dlm); 1198 kfree(dlm);
1183 dlm = NULL; 1199 dlm = NULL;
1184 goto leave; 1200 goto leave;
1185 } 1201 }
1186 memset(dlm->resources, 0, PAGE_SIZE);
1187 1202
1188 for (i=0; i<DLM_HASH_SIZE; i++) 1203 for (i=0; i<DLM_HASH_BUCKETS; i++)
1189 INIT_LIST_HEAD(&dlm->resources[i]); 1204 INIT_HLIST_HEAD(&dlm->lockres_hash[i]);
1190 1205
1191 strcpy(dlm->name, domain); 1206 strcpy(dlm->name, domain);
1192 dlm->key = key; 1207 dlm->key = key;
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index d1a0038557a3..671d4ff222cc 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -220,6 +220,17 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
220 dlm_error(status); 220 dlm_error(status);
221 dlm_revert_pending_lock(res, lock); 221 dlm_revert_pending_lock(res, lock);
222 dlm_lock_put(lock); 222 dlm_lock_put(lock);
223 } else if (dlm_is_recovery_lock(res->lockname.name,
224 res->lockname.len)) {
225 /* special case for the $RECOVERY lock.
226 * there will never be an AST delivered to put
227 * this lock on the proper secondary queue
228 * (granted), so do it manually. */
229 mlog(0, "%s: $RECOVERY lock for this node (%u) is "
230 "mastered by %u; got lock, manually granting (no ast)\n",
231 dlm->name, dlm->node_num, res->owner);
232 list_del_init(&lock->list);
233 list_add_tail(&lock->list, &res->granted);
223 } 234 }
224 spin_unlock(&res->spinlock); 235 spin_unlock(&res->spinlock);
225 236
@@ -646,7 +657,19 @@ retry_lock:
646 mlog(0, "retrying lock with migration/" 657 mlog(0, "retrying lock with migration/"
647 "recovery/in progress\n"); 658 "recovery/in progress\n");
648 msleep(100); 659 msleep(100);
649 dlm_wait_for_recovery(dlm); 660 /* no waiting for dlm_reco_thread */
661 if (recovery) {
662 if (status == DLM_RECOVERING) {
663 mlog(0, "%s: got RECOVERING "
664 "for $REOCVERY lock, master "
665 "was %u\n", dlm->name,
666 res->owner);
667 dlm_wait_for_node_death(dlm, res->owner,
668 DLM_NODE_DEATH_WAIT_MAX);
669 }
670 } else {
671 dlm_wait_for_recovery(dlm);
672 }
650 goto retry_lock; 673 goto retry_lock;
651 } 674 }
652 675
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 27e984f7e4cd..847dd3cc4cf5 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -564,7 +564,7 @@ static void dlm_lockres_release(struct kref *kref)
564 564
565 /* By the time we're ready to blow this guy away, we shouldn't 565 /* By the time we're ready to blow this guy away, we shouldn't
566 * be on any lists. */ 566 * be on any lists. */
567 BUG_ON(!list_empty(&res->list)); 567 BUG_ON(!hlist_unhashed(&res->hash_node));
568 BUG_ON(!list_empty(&res->granted)); 568 BUG_ON(!list_empty(&res->granted));
569 BUG_ON(!list_empty(&res->converting)); 569 BUG_ON(!list_empty(&res->converting));
570 BUG_ON(!list_empty(&res->blocked)); 570 BUG_ON(!list_empty(&res->blocked));
@@ -605,7 +605,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
605 605
606 init_waitqueue_head(&res->wq); 606 init_waitqueue_head(&res->wq);
607 spin_lock_init(&res->spinlock); 607 spin_lock_init(&res->spinlock);
608 INIT_LIST_HEAD(&res->list); 608 INIT_HLIST_NODE(&res->hash_node);
609 INIT_LIST_HEAD(&res->granted); 609 INIT_LIST_HEAD(&res->granted);
610 INIT_LIST_HEAD(&res->converting); 610 INIT_LIST_HEAD(&res->converting);
611 INIT_LIST_HEAD(&res->blocked); 611 INIT_LIST_HEAD(&res->blocked);
@@ -1050,17 +1050,10 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1050 node = dlm_bitmap_diff_iter_next(&bdi, &sc); 1050 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1051 while (node >= 0) { 1051 while (node >= 0) {
1052 if (sc == NODE_UP) { 1052 if (sc == NODE_UP) {
1053 /* a node came up. easy. might not even need 1053 /* a node came up. clear any old vote from
1054 * to talk to it if its node number is higher 1054 * the response map and set it in the vote map
1055 * or if we are already blocked. */ 1055 * then restart the mastery. */
1056 mlog(0, "node up! %d\n", node); 1056 mlog(ML_NOTICE, "node %d up while restarting\n", node);
1057 if (blocked)
1058 goto next;
1059
1060 if (node > dlm->node_num) {
1061 mlog(0, "node > this node. skipping.\n");
1062 goto next;
1063 }
1064 1057
1065 /* redo the master request, but only for the new node */ 1058 /* redo the master request, but only for the new node */
1066 mlog(0, "sending request to new node\n"); 1059 mlog(0, "sending request to new node\n");
@@ -2005,6 +1998,15 @@ fail:
2005 break; 1998 break;
2006 1999
2007 mlog(0, "timed out during migration\n"); 2000 mlog(0, "timed out during migration\n");
2001 /* avoid hang during shutdown when migrating lockres
2002 * to a node which also goes down */
2003 if (dlm_is_node_dead(dlm, target)) {
2004 mlog(0, "%s:%.*s: expected migration target %u "
2005 "is no longer up. restarting.\n",
2006 dlm->name, res->lockname.len,
2007 res->lockname.name, target);
2008 ret = -ERESTARTSYS;
2009 }
2008 } 2010 }
2009 if (ret == -ERESTARTSYS) { 2011 if (ret == -ERESTARTSYS) {
2010 /* migration failed, detach and clean up mle */ 2012 /* migration failed, detach and clean up mle */
@@ -2480,7 +2482,9 @@ top:
2480 atomic_set(&mle->woken, 1); 2482 atomic_set(&mle->woken, 1);
2481 spin_unlock(&mle->spinlock); 2483 spin_unlock(&mle->spinlock);
2482 wake_up(&mle->wq); 2484 wake_up(&mle->wq);
2483 /* final put will take care of list removal */ 2485 /* do not need events any longer, so detach
2486 * from heartbeat */
2487 __dlm_mle_detach_hb_events(dlm, mle);
2484 __dlm_put_mle(mle); 2488 __dlm_put_mle(mle);
2485 } 2489 }
2486 continue; 2490 continue;
@@ -2535,6 +2539,9 @@ top:
2535 spin_unlock(&res->spinlock); 2539 spin_unlock(&res->spinlock);
2536 dlm_lockres_put(res); 2540 dlm_lockres_put(res);
2537 2541
2542 /* about to get rid of mle, detach from heartbeat */
2543 __dlm_mle_detach_hb_events(dlm, mle);
2544
2538 /* dump the mle */ 2545 /* dump the mle */
2539 spin_lock(&dlm->master_lock); 2546 spin_lock(&dlm->master_lock);
2540 __dlm_put_mle(mle); 2547 __dlm_put_mle(mle);
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 0c8eb1093f00..1e232000f3f7 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -39,6 +39,7 @@
39#include <linux/inet.h> 39#include <linux/inet.h>
40#include <linux/timer.h> 40#include <linux/timer.h>
41#include <linux/kthread.h> 41#include <linux/kthread.h>
42#include <linux/delay.h>
42 43
43 44
44#include "cluster/heartbeat.h" 45#include "cluster/heartbeat.h"
@@ -256,6 +257,45 @@ static int dlm_recovery_thread(void *data)
256 return 0; 257 return 0;
257} 258}
258 259
260/* returns true when the recovery master has contacted us */
261static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
262{
263 int ready;
264 spin_lock(&dlm->spinlock);
265 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
266 spin_unlock(&dlm->spinlock);
267 return ready;
268}
269
270/* returns true if node is no longer in the domain
271 * could be dead or just not joined */
272int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
273{
274 int dead;
275 spin_lock(&dlm->spinlock);
276 dead = test_bit(node, dlm->domain_map);
277 spin_unlock(&dlm->spinlock);
278 return dead;
279}
280
281int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
282{
283 if (timeout) {
284 mlog(ML_NOTICE, "%s: waiting %dms for notification of "
285 "death of node %u\n", dlm->name, timeout, node);
286 wait_event_timeout(dlm->dlm_reco_thread_wq,
287 dlm_is_node_dead(dlm, node),
288 msecs_to_jiffies(timeout));
289 } else {
290 mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
291 "of death of node %u\n", dlm->name, node);
292 wait_event(dlm->dlm_reco_thread_wq,
293 dlm_is_node_dead(dlm, node));
294 }
295 /* for now, return 0 */
296 return 0;
297}
298
259/* callers of the top-level api calls (dlmlock/dlmunlock) should 299/* callers of the top-level api calls (dlmlock/dlmunlock) should
260 * block on the dlm->reco.event when recovery is in progress. 300 * block on the dlm->reco.event when recovery is in progress.
261 * the dlm recovery thread will set this state when it begins 301 * the dlm recovery thread will set this state when it begins
@@ -297,6 +337,7 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
297static int dlm_do_recovery(struct dlm_ctxt *dlm) 337static int dlm_do_recovery(struct dlm_ctxt *dlm)
298{ 338{
299 int status = 0; 339 int status = 0;
340 int ret;
300 341
301 spin_lock(&dlm->spinlock); 342 spin_lock(&dlm->spinlock);
302 343
@@ -343,10 +384,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
343 goto master_here; 384 goto master_here;
344 385
345 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { 386 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
346 /* choose a new master */ 387 /* choose a new master, returns 0 if this node
347 if (!dlm_pick_recovery_master(dlm)) { 388 * is the master, -EEXIST if it's another node.
389 * this does not return until a new master is chosen
390 * or recovery completes entirely. */
391 ret = dlm_pick_recovery_master(dlm);
392 if (!ret) {
348 /* already notified everyone. go. */ 393 /* already notified everyone. go. */
349 dlm->reco.new_master = dlm->node_num;
350 goto master_here; 394 goto master_here;
351 } 395 }
352 mlog(0, "another node will master this recovery session.\n"); 396 mlog(0, "another node will master this recovery session.\n");
@@ -371,8 +415,13 @@ master_here:
371 if (status < 0) { 415 if (status < 0) {
372 mlog(ML_ERROR, "error %d remastering locks for node %u, " 416 mlog(ML_ERROR, "error %d remastering locks for node %u, "
373 "retrying.\n", status, dlm->reco.dead_node); 417 "retrying.\n", status, dlm->reco.dead_node);
418 /* yield a bit to allow any final network messages
419 * to get handled on remaining nodes */
420 msleep(100);
374 } else { 421 } else {
375 /* success! see if any other nodes need recovery */ 422 /* success! see if any other nodes need recovery */
423 mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
424 dlm->name, dlm->reco.dead_node, dlm->node_num);
376 dlm_reset_recovery(dlm); 425 dlm_reset_recovery(dlm);
377 } 426 }
378 dlm_end_recovery(dlm); 427 dlm_end_recovery(dlm);
@@ -477,7 +526,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
477 BUG(); 526 BUG();
478 break; 527 break;
479 case DLM_RECO_NODE_DATA_DEAD: 528 case DLM_RECO_NODE_DATA_DEAD:
480 mlog(0, "node %u died after " 529 mlog(ML_NOTICE, "node %u died after "
481 "requesting recovery info for " 530 "requesting recovery info for "
482 "node %u\n", ndata->node_num, 531 "node %u\n", ndata->node_num,
483 dead_node); 532 dead_node);
@@ -485,6 +534,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
485 // start all over 534 // start all over
486 destroy = 1; 535 destroy = 1;
487 status = -EAGAIN; 536 status = -EAGAIN;
537 /* instead of spinning like crazy here,
538 * wait for the domain map to catch up
539 * with the network state. otherwise this
540 * can be hit hundreds of times before
541 * the node is really seen as dead. */
542 wait_event_timeout(dlm->dlm_reco_thread_wq,
543 dlm_is_node_dead(dlm,
544 ndata->node_num),
545 msecs_to_jiffies(1000));
546 mlog(0, "waited 1 sec for %u, "
547 "dead? %s\n", ndata->node_num,
548 dlm_is_node_dead(dlm, ndata->node_num) ?
549 "yes" : "no");
488 goto leave; 550 goto leave;
489 case DLM_RECO_NODE_DATA_RECEIVING: 551 case DLM_RECO_NODE_DATA_RECEIVING:
490 case DLM_RECO_NODE_DATA_REQUESTED: 552 case DLM_RECO_NODE_DATA_REQUESTED:
@@ -678,11 +740,27 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
678 dlm = item->dlm; 740 dlm = item->dlm;
679 dead_node = item->u.ral.dead_node; 741 dead_node = item->u.ral.dead_node;
680 reco_master = item->u.ral.reco_master; 742 reco_master = item->u.ral.reco_master;
743 mres = (struct dlm_migratable_lockres *)data;
744
745 if (dead_node != dlm->reco.dead_node ||
746 reco_master != dlm->reco.new_master) {
747 /* show extra debug info if the recovery state is messed */
748 mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
749 "request(dead=%u, master=%u)\n",
750 dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
751 dead_node, reco_master);
752 mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
753 "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
754 dlm->name, mres->lockname_len, mres->lockname, mres->master,
755 mres->num_locks, mres->total_locks, mres->flags,
756 mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
757 mres->ml[0].type, mres->ml[0].convert_type,
758 mres->ml[0].highest_blocked, mres->ml[0].node);
759 BUG();
760 }
681 BUG_ON(dead_node != dlm->reco.dead_node); 761 BUG_ON(dead_node != dlm->reco.dead_node);
682 BUG_ON(reco_master != dlm->reco.new_master); 762 BUG_ON(reco_master != dlm->reco.new_master);
683 763
684 mres = (struct dlm_migratable_lockres *)data;
685
686 /* lock resources should have already been moved to the 764 /* lock resources should have already been moved to the
687 * dlm->reco.resources list. now move items from that list 765 * dlm->reco.resources list. now move items from that list
688 * to a temp list if the dead owner matches. note that the 766 * to a temp list if the dead owner matches. note that the
@@ -757,15 +835,18 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
757 continue; 835 continue;
758 836
759 switch (ndata->state) { 837 switch (ndata->state) {
838 /* should have moved beyond INIT but not to FINALIZE yet */
760 case DLM_RECO_NODE_DATA_INIT: 839 case DLM_RECO_NODE_DATA_INIT:
761 case DLM_RECO_NODE_DATA_DEAD: 840 case DLM_RECO_NODE_DATA_DEAD:
762 case DLM_RECO_NODE_DATA_DONE:
763 case DLM_RECO_NODE_DATA_FINALIZE_SENT: 841 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
764 mlog(ML_ERROR, "bad ndata state for node %u:" 842 mlog(ML_ERROR, "bad ndata state for node %u:"
765 " state=%d\n", ndata->node_num, 843 " state=%d\n", ndata->node_num,
766 ndata->state); 844 ndata->state);
767 BUG(); 845 BUG();
768 break; 846 break;
847 /* these states are possible at this point, anywhere along
848 * the line of recovery */
849 case DLM_RECO_NODE_DATA_DONE:
769 case DLM_RECO_NODE_DATA_RECEIVING: 850 case DLM_RECO_NODE_DATA_RECEIVING:
770 case DLM_RECO_NODE_DATA_REQUESTED: 851 case DLM_RECO_NODE_DATA_REQUESTED:
771 case DLM_RECO_NODE_DATA_REQUESTING: 852 case DLM_RECO_NODE_DATA_REQUESTING:
@@ -799,13 +880,31 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
799{ 880{
800 struct dlm_lock_resource *res; 881 struct dlm_lock_resource *res;
801 struct list_head *iter, *iter2; 882 struct list_head *iter, *iter2;
883 struct dlm_lock *lock;
802 884
803 spin_lock(&dlm->spinlock); 885 spin_lock(&dlm->spinlock);
804 list_for_each_safe(iter, iter2, &dlm->reco.resources) { 886 list_for_each_safe(iter, iter2, &dlm->reco.resources) {
805 res = list_entry (iter, struct dlm_lock_resource, recovering); 887 res = list_entry (iter, struct dlm_lock_resource, recovering);
888 /* always prune any $RECOVERY entries for dead nodes,
889 * otherwise hangs can occur during later recovery */
806 if (dlm_is_recovery_lock(res->lockname.name, 890 if (dlm_is_recovery_lock(res->lockname.name,
807 res->lockname.len)) 891 res->lockname.len)) {
892 spin_lock(&res->spinlock);
893 list_for_each_entry(lock, &res->granted, list) {
894 if (lock->ml.node == dead_node) {
895 mlog(0, "AHA! there was "
896 "a $RECOVERY lock for dead "
897 "node %u (%s)!\n",
898 dead_node, dlm->name);
899 list_del_init(&lock->list);
900 dlm_lock_put(lock);
901 break;
902 }
903 }
904 spin_unlock(&res->spinlock);
808 continue; 905 continue;
906 }
907
809 if (res->owner == dead_node) { 908 if (res->owner == dead_node) {
810 mlog(0, "found lockres owned by dead node while " 909 mlog(0, "found lockres owned by dead node while "
811 "doing recovery for node %u. sending it.\n", 910 "doing recovery for node %u. sending it.\n",
@@ -1179,7 +1278,7 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
1179again: 1278again:
1180 ret = dlm_lockres_master_requery(dlm, res, &real_master); 1279 ret = dlm_lockres_master_requery(dlm, res, &real_master);
1181 if (ret < 0) { 1280 if (ret < 0) {
1182 mlog(0, "dlm_lockres_master_requery failure: %d\n", 1281 mlog(0, "dlm_lockres_master_requery ret=%d\n",
1183 ret); 1282 ret);
1184 goto again; 1283 goto again;
1185 } 1284 }
@@ -1594,7 +1693,10 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1594 u8 dead_node, u8 new_master) 1693 u8 dead_node, u8 new_master)
1595{ 1694{
1596 int i; 1695 int i;
1597 struct list_head *iter, *iter2, *bucket; 1696 struct list_head *iter, *iter2;
1697 struct hlist_node *hash_iter;
1698 struct hlist_head *bucket;
1699
1598 struct dlm_lock_resource *res; 1700 struct dlm_lock_resource *res;
1599 1701
1600 mlog_entry_void(); 1702 mlog_entry_void();
@@ -1618,10 +1720,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
1618 * for now we need to run the whole hash, clear 1720 * for now we need to run the whole hash, clear
1619 * the RECOVERING state and set the owner 1721 * the RECOVERING state and set the owner
1620 * if necessary */ 1722 * if necessary */
1621 for (i=0; i<DLM_HASH_SIZE; i++) { 1723 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1622 bucket = &(dlm->resources[i]); 1724 bucket = &(dlm->lockres_hash[i]);
1623 list_for_each(iter, bucket) { 1725 hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
1624 res = list_entry (iter, struct dlm_lock_resource, list);
1625 if (res->state & DLM_LOCK_RES_RECOVERING) { 1726 if (res->state & DLM_LOCK_RES_RECOVERING) {
1626 if (res->owner == dead_node) { 1727 if (res->owner == dead_node) {
1627 mlog(0, "(this=%u) res %.*s owner=%u " 1728 mlog(0, "(this=%u) res %.*s owner=%u "
@@ -1753,10 +1854,11 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
1753 1854
1754static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) 1855static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1755{ 1856{
1756 struct list_head *iter; 1857 struct hlist_node *iter;
1757 struct dlm_lock_resource *res; 1858 struct dlm_lock_resource *res;
1758 int i; 1859 int i;
1759 struct list_head *bucket; 1860 struct hlist_head *bucket;
1861 struct dlm_lock *lock;
1760 1862
1761 1863
1762 /* purge any stale mles */ 1864 /* purge any stale mles */
@@ -1776,14 +1878,28 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
1776 * can be kicked again to see if any ASTs or BASTs 1878 * can be kicked again to see if any ASTs or BASTs
1777 * need to be fired as a result. 1879 * need to be fired as a result.
1778 */ 1880 */
1779 for (i=0; i<DLM_HASH_SIZE; i++) { 1881 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
1780 bucket = &(dlm->resources[i]); 1882 bucket = &(dlm->lockres_hash[i]);
1781 list_for_each(iter, bucket) { 1883 hlist_for_each_entry(res, iter, bucket, hash_node) {
1782 res = list_entry (iter, struct dlm_lock_resource, list); 1884 /* always prune any $RECOVERY entries for dead nodes,
1885 * otherwise hangs can occur during later recovery */
1783 if (dlm_is_recovery_lock(res->lockname.name, 1886 if (dlm_is_recovery_lock(res->lockname.name,
1784 res->lockname.len)) 1887 res->lockname.len)) {
1888 spin_lock(&res->spinlock);
1889 list_for_each_entry(lock, &res->granted, list) {
1890 if (lock->ml.node == dead_node) {
1891 mlog(0, "AHA! there was "
1892 "a $RECOVERY lock for dead "
1893 "node %u (%s)!\n",
1894 dead_node, dlm->name);
1895 list_del_init(&lock->list);
1896 dlm_lock_put(lock);
1897 break;
1898 }
1899 }
1900 spin_unlock(&res->spinlock);
1785 continue; 1901 continue;
1786 1902 }
1787 spin_lock(&res->spinlock); 1903 spin_lock(&res->spinlock);
1788 /* zero the lvb if necessary */ 1904 /* zero the lvb if necessary */
1789 dlm_revalidate_lvb(dlm, res, dead_node); 1905 dlm_revalidate_lvb(dlm, res, dead_node);
@@ -1869,12 +1985,9 @@ void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data)
1869 return; 1985 return;
1870 1986
1871 spin_lock(&dlm->spinlock); 1987 spin_lock(&dlm->spinlock);
1872
1873 set_bit(idx, dlm->live_nodes_map); 1988 set_bit(idx, dlm->live_nodes_map);
1874 1989 /* do NOT notify mle attached to the heartbeat events.
1875 /* notify any mles attached to the heartbeat events */ 1990 * new nodes are not interesting in mastery until joined. */
1876 dlm_hb_event_notify_attached(dlm, idx, 1);
1877
1878 spin_unlock(&dlm->spinlock); 1991 spin_unlock(&dlm->spinlock);
1879 1992
1880 dlm_put(dlm); 1993 dlm_put(dlm);
@@ -1897,7 +2010,18 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
1897 mlog(0, "unlockast for recovery lock fired!\n"); 2010 mlog(0, "unlockast for recovery lock fired!\n");
1898} 2011}
1899 2012
1900 2013/*
2014 * dlm_pick_recovery_master will continually attempt to use
2015 * dlmlock() on the special "$RECOVERY" lockres with the
2016 * LKM_NOQUEUE flag to get an EX. every thread that enters
2017 * this function on each node racing to become the recovery
2018 * master will not stop attempting this until either:
2019 * a) this node gets the EX (and becomes the recovery master),
2020 * or b) dlm->reco.new_master gets set to some nodenum
2021 * != O2NM_INVALID_NODE_NUM (another node will do the reco).
2022 * so each time a recovery master is needed, the entire cluster
2023 * will sync at this point. if the new master dies, that will
2024 * be detected in dlm_do_recovery */
1901static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) 2025static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
1902{ 2026{
1903 enum dlm_status ret; 2027 enum dlm_status ret;
@@ -1906,23 +2030,69 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
1906 2030
1907 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", 2031 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
1908 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); 2032 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
1909retry: 2033again:
1910 memset(&lksb, 0, sizeof(lksb)); 2034 memset(&lksb, 0, sizeof(lksb));
1911 2035
1912 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, 2036 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
1913 DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast); 2037 DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
1914 2038
2039 mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
2040 dlm->name, ret, lksb.status);
2041
1915 if (ret == DLM_NORMAL) { 2042 if (ret == DLM_NORMAL) {
1916 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", 2043 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
1917 dlm->name, dlm->node_num); 2044 dlm->name, dlm->node_num);
1918 /* I am master, send message to all nodes saying 2045
1919 * that I am beginning a recovery session */ 2046 /* got the EX lock. check to see if another node
1920 status = dlm_send_begin_reco_message(dlm, 2047 * just became the reco master */
1921 dlm->reco.dead_node); 2048 if (dlm_reco_master_ready(dlm)) {
2049 mlog(0, "%s: got reco EX lock, but %u will "
2050 "do the recovery\n", dlm->name,
2051 dlm->reco.new_master);
2052 status = -EEXIST;
2053 } else {
2054 status = 0;
2055
2056 /* see if recovery was already finished elsewhere */
2057 spin_lock(&dlm->spinlock);
2058 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
2059 status = -EINVAL;
2060 mlog(0, "%s: got reco EX lock, but "
2061 "node got recovered already\n", dlm->name);
2062 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2063 mlog(ML_ERROR, "%s: new master is %u "
2064 "but no dead node!\n",
2065 dlm->name, dlm->reco.new_master);
2066 BUG();
2067 }
2068 }
2069 spin_unlock(&dlm->spinlock);
2070 }
2071
2072 /* if this node has actually become the recovery master,
2073 * set the master and send the messages to begin recovery */
2074 if (!status) {
2075 mlog(0, "%s: dead=%u, this=%u, sending "
2076 "begin_reco now\n", dlm->name,
2077 dlm->reco.dead_node, dlm->node_num);
2078 status = dlm_send_begin_reco_message(dlm,
2079 dlm->reco.dead_node);
2080 /* this always succeeds */
2081 BUG_ON(status);
2082
2083 /* set the new_master to this node */
2084 spin_lock(&dlm->spinlock);
2085 dlm->reco.new_master = dlm->node_num;
2086 spin_unlock(&dlm->spinlock);
2087 }
1922 2088
1923 /* recovery lock is a special case. ast will not get fired, 2089 /* recovery lock is a special case. ast will not get fired,
1924 * so just go ahead and unlock it. */ 2090 * so just go ahead and unlock it. */
1925 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); 2091 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
2092 if (ret == DLM_DENIED) {
2093 mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");
2094 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
2095 }
1926 if (ret != DLM_NORMAL) { 2096 if (ret != DLM_NORMAL) {
1927 /* this would really suck. this could only happen 2097 /* this would really suck. this could only happen
1928 * if there was a network error during the unlock 2098 * if there was a network error during the unlock
@@ -1930,20 +2100,42 @@ retry:
1930 * is actually "done" and the lock structure is 2100 * is actually "done" and the lock structure is
1931 * even freed. we can continue, but only 2101 * even freed. we can continue, but only
1932 * because this specific lock name is special. */ 2102 * because this specific lock name is special. */
1933 mlog(0, "dlmunlock returned %d\n", ret); 2103 mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
1934 }
1935
1936 if (status < 0) {
1937 mlog(0, "failed to send recovery message. "
1938 "must retry with new node map.\n");
1939 goto retry;
1940 } 2104 }
1941 } else if (ret == DLM_NOTQUEUED) { 2105 } else if (ret == DLM_NOTQUEUED) {
1942 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", 2106 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
1943 dlm->name, dlm->node_num); 2107 dlm->name, dlm->node_num);
1944 /* another node is master. wait on 2108 /* another node is master. wait on
1945 * reco.new_master != O2NM_INVALID_NODE_NUM */ 2109 * reco.new_master != O2NM_INVALID_NODE_NUM
2110 * for at most one second */
2111 wait_event_timeout(dlm->dlm_reco_thread_wq,
2112 dlm_reco_master_ready(dlm),
2113 msecs_to_jiffies(1000));
2114 if (!dlm_reco_master_ready(dlm)) {
2115 mlog(0, "%s: reco master taking awhile\n",
2116 dlm->name);
2117 goto again;
2118 }
2119 /* another node has informed this one that it is reco master */
2120 mlog(0, "%s: reco master %u is ready to recover %u\n",
2121 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
1946 status = -EEXIST; 2122 status = -EEXIST;
2123 } else {
2124 struct dlm_lock_resource *res;
2125
2126 /* dlmlock returned something other than NOTQUEUED or NORMAL */
2127 mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "
2128 "lksb.status=%s\n", dlm->name, dlm_errname(ret),
2129 dlm_errname(lksb.status));
2130 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2131 DLM_RECOVERY_LOCK_NAME_LEN);
2132 if (res) {
2133 dlm_print_one_lock_resource(res);
2134 dlm_lockres_put(res);
2135 } else {
2136 mlog(ML_ERROR, "recovery lock not found\n");
2137 }
2138 BUG();
1947 } 2139 }
1948 2140
1949 return status; 2141 return status;
@@ -1982,7 +2174,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
1982 mlog(0, "not sending begin reco to self\n"); 2174 mlog(0, "not sending begin reco to self\n");
1983 continue; 2175 continue;
1984 } 2176 }
1985 2177retry:
1986 ret = -EINVAL; 2178 ret = -EINVAL;
1987 mlog(0, "attempting to send begin reco msg to %d\n", 2179 mlog(0, "attempting to send begin reco msg to %d\n",
1988 nodenum); 2180 nodenum);
@@ -1991,8 +2183,17 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
1991 /* negative status is handled ok by caller here */ 2183 /* negative status is handled ok by caller here */
1992 if (ret >= 0) 2184 if (ret >= 0)
1993 ret = status; 2185 ret = status;
2186 if (dlm_is_host_down(ret)) {
2187 /* node is down. not involved in recovery
2188 * so just keep going */
2189 mlog(0, "%s: node %u was down when sending "
2190 "begin reco msg (%d)\n", dlm->name, nodenum, ret);
2191 ret = 0;
2192 }
1994 if (ret < 0) { 2193 if (ret < 0) {
1995 struct dlm_lock_resource *res; 2194 struct dlm_lock_resource *res;
2195 /* this is now a serious problem, possibly ENOMEM
2196 * in the network stack. must retry */
1996 mlog_errno(ret); 2197 mlog_errno(ret);
1997 mlog(ML_ERROR, "begin reco of dlm %s to node %u " 2198 mlog(ML_ERROR, "begin reco of dlm %s to node %u "
1998 " returned %d\n", dlm->name, nodenum, ret); 2199 " returned %d\n", dlm->name, nodenum, ret);
@@ -2004,7 +2205,10 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2004 } else { 2205 } else {
2005 mlog(ML_ERROR, "recovery lock not found\n"); 2206 mlog(ML_ERROR, "recovery lock not found\n");
2006 } 2207 }
2007 break; 2208 /* sleep for a bit in hopes that we can avoid
2209 * another ENOMEM */
2210 msleep(100);
2211 goto retry;
2008 } 2212 }
2009 } 2213 }
2010 2214
@@ -2027,19 +2231,34 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
2027 2231
2028 spin_lock(&dlm->spinlock); 2232 spin_lock(&dlm->spinlock);
2029 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { 2233 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2030 mlog(0, "new_master already set to %u!\n", 2234 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
2031 dlm->reco.new_master); 2235 mlog(0, "%s: new_master %u died, changing "
2236 "to %u\n", dlm->name, dlm->reco.new_master,
2237 br->node_idx);
2238 } else {
2239 mlog(0, "%s: new_master %u NOT DEAD, changing "
2240 "to %u\n", dlm->name, dlm->reco.new_master,
2241 br->node_idx);
2242 /* may not have seen the new master as dead yet */
2243 }
2032 } 2244 }
2033 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { 2245 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
2034 mlog(0, "dead_node already set to %u!\n", 2246 mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
2035 dlm->reco.dead_node); 2247 "node %u changing it to %u\n", dlm->name,
2248 dlm->reco.dead_node, br->node_idx, br->dead_node);
2036 } 2249 }
2037 dlm->reco.new_master = br->node_idx; 2250 dlm->reco.new_master = br->node_idx;
2038 dlm->reco.dead_node = br->dead_node; 2251 dlm->reco.dead_node = br->dead_node;
2039 if (!test_bit(br->dead_node, dlm->recovery_map)) { 2252 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2040 mlog(ML_ERROR, "recovery master %u sees %u as dead, but this " 2253 mlog(0, "recovery master %u sees %u as dead, but this "
2041 "node has not yet. marking %u as dead\n", 2254 "node has not yet. marking %u as dead\n",
2042 br->node_idx, br->dead_node, br->dead_node); 2255 br->node_idx, br->dead_node, br->dead_node);
2256 if (!test_bit(br->dead_node, dlm->domain_map) ||
2257 !test_bit(br->dead_node, dlm->live_nodes_map))
2258 mlog(0, "%u not in domain/live_nodes map "
2259 "so setting it in reco map manually\n",
2260 br->dead_node);
2261 set_bit(br->dead_node, dlm->recovery_map);
2043 __dlm_hb_node_down(dlm, br->dead_node); 2262 __dlm_hb_node_down(dlm, br->dead_node);
2044 } 2263 }
2045 spin_unlock(&dlm->spinlock); 2264 spin_unlock(&dlm->spinlock);
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index cec2ce1cd318..c95f08d2e925 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -188,6 +188,19 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
188 actions &= ~(DLM_UNLOCK_REMOVE_LOCK| 188 actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
189 DLM_UNLOCK_REGRANT_LOCK| 189 DLM_UNLOCK_REGRANT_LOCK|
190 DLM_UNLOCK_CLEAR_CONVERT_TYPE); 190 DLM_UNLOCK_CLEAR_CONVERT_TYPE);
191 } else if (status == DLM_RECOVERING ||
192 status == DLM_MIGRATING ||
193 status == DLM_FORWARD) {
194 /* must clear the actions because this unlock
195 * is about to be retried. cannot free or do
196 * any list manipulation. */
197 mlog(0, "%s:%.*s: clearing actions, %s\n",
198 dlm->name, res->lockname.len,
199 res->lockname.name,
200 status==DLM_RECOVERING?"recovering":
201 (status==DLM_MIGRATING?"migrating":
202 "forward"));
203 actions = 0;
191 } 204 }
192 if (flags & LKM_CANCEL) 205 if (flags & LKM_CANCEL)
193 lock->cancel_pending = 0; 206 lock->cancel_pending = 0;
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c
index e1fdd288796e..c3764f4744ee 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlm/userdlm.c
@@ -27,7 +27,7 @@
27 * Boston, MA 021110-1307, USA. 27 * Boston, MA 021110-1307, USA.
28 */ 28 */
29 29
30#include <asm/signal.h> 30#include <linux/signal.h>
31 31
32#include <linux/module.h> 32#include <linux/module.h>
33#include <linux/fs.h> 33#include <linux/fs.h>
diff --git a/fs/ocfs2/extent_map.c b/fs/ocfs2/extent_map.c
index f2fb40cd296a..e6f207eebab4 100644
--- a/fs/ocfs2/extent_map.c
+++ b/fs/ocfs2/extent_map.c
@@ -181,6 +181,12 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
181 ret = -EBADR; 181 ret = -EBADR;
182 if (rec_end > OCFS2_I(inode)->ip_clusters) { 182 if (rec_end > OCFS2_I(inode)->ip_clusters) {
183 mlog_errno(ret); 183 mlog_errno(ret);
184 ocfs2_error(inode->i_sb,
185 "Extent %d at e_blkno %"MLFu64" of inode %"MLFu64" goes past ip_clusters of %u\n",
186 i,
187 le64_to_cpu(rec->e_blkno),
188 OCFS2_I(inode)->ip_blkno,
189 OCFS2_I(inode)->ip_clusters);
184 goto out_free; 190 goto out_free;
185 } 191 }
186 192
@@ -226,6 +232,12 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
226 ret = -EBADR; 232 ret = -EBADR;
227 if (blkno) { 233 if (blkno) {
228 mlog_errno(ret); 234 mlog_errno(ret);
235 ocfs2_error(inode->i_sb,
236 "Multiple extents for (cpos = %u, clusters = %u) on inode %"MLFu64"; e_blkno %"MLFu64" and rec %d at e_blkno %"MLFu64"\n",
237 cpos, clusters,
238 OCFS2_I(inode)->ip_blkno,
239 blkno, i,
240 le64_to_cpu(rec->e_blkno));
229 goto out_free; 241 goto out_free;
230 } 242 }
231 243
@@ -238,6 +250,10 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
238 */ 250 */
239 ret = -EBADR; 251 ret = -EBADR;
240 if (!blkno) { 252 if (!blkno) {
253 ocfs2_error(inode->i_sb,
254 "No record found for (cpos = %u, clusters = %u) on inode %"MLFu64"\n",
255 cpos, clusters,
256 OCFS2_I(inode)->ip_blkno);
241 mlog_errno(ret); 257 mlog_errno(ret);
242 goto out_free; 258 goto out_free;
243 } 259 }
@@ -262,11 +278,24 @@ static int ocfs2_extent_map_find_leaf(struct inode *inode,
262 el = &eb->h_list; 278 el = &eb->h_list;
263 } 279 }
264 280
265 if (el->l_tree_depth) 281 BUG_ON(el->l_tree_depth);
266 BUG();
267 282
268 for (i = 0; i < le16_to_cpu(el->l_next_free_rec); i++) { 283 for (i = 0; i < le16_to_cpu(el->l_next_free_rec); i++) {
269 rec = &el->l_recs[i]; 284 rec = &el->l_recs[i];
285
286 if ((le32_to_cpu(rec->e_cpos) + le32_to_cpu(rec->e_clusters)) >
287 OCFS2_I(inode)->ip_clusters) {
288 ret = -EBADR;
289 mlog_errno(ret);
290 ocfs2_error(inode->i_sb,
291 "Extent %d at e_blkno %"MLFu64" of inode %"MLFu64" goes past ip_clusters of %u\n",
292 i,
293 le64_to_cpu(rec->e_blkno),
294 OCFS2_I(inode)->ip_blkno,
295 OCFS2_I(inode)->ip_clusters);
296 return ret;
297 }
298
270 ret = ocfs2_extent_map_insert(inode, rec, 299 ret = ocfs2_extent_map_insert(inode, rec,
271 le16_to_cpu(el->l_tree_depth)); 300 le16_to_cpu(el->l_tree_depth));
272 if (ret) { 301 if (ret) {
@@ -364,8 +393,8 @@ static int ocfs2_extent_map_lookup_read(struct inode *inode,
364 return ret; 393 return ret;
365 } 394 }
366 395
367 if (ent->e_tree_depth) 396 /* FIXME: Make sure this isn't a corruption */
368 BUG(); /* FIXME: Make sure this isn't a corruption */ 397 BUG_ON(ent->e_tree_depth);
369 398
370 *ret_ent = ent; 399 *ret_ent = ent;
371 400
@@ -423,8 +452,7 @@ static int ocfs2_extent_map_try_insert(struct inode *inode,
423 le32_to_cpu(rec->e_clusters), NULL, 452 le32_to_cpu(rec->e_clusters), NULL,
424 NULL); 453 NULL);
425 454
426 if (!old_ent) 455 BUG_ON(!old_ent);
427 BUG();
428 456
429 ret = -EEXIST; 457 ret = -EEXIST;
430 if (old_ent->e_tree_depth < tree_depth) 458 if (old_ent->e_tree_depth < tree_depth)
@@ -528,6 +556,10 @@ static int ocfs2_extent_map_insert(struct inode *inode,
528 OCFS2_I(inode)->ip_map.em_clusters) { 556 OCFS2_I(inode)->ip_map.em_clusters) {
529 ret = -EBADR; 557 ret = -EBADR;
530 mlog_errno(ret); 558 mlog_errno(ret);
559 ocfs2_error(inode->i_sb,
560 "Zero e_clusters on non-tail extent record at e_blkno %"MLFu64" on inode %"MLFu64"\n",
561 le64_to_cpu(rec->e_blkno),
562 OCFS2_I(inode)->ip_blkno);
531 return ret; 563 return ret;
532 } 564 }
533 565
@@ -590,12 +622,12 @@ static int ocfs2_extent_map_insert(struct inode *inode,
590 * Existing record in the extent map: 622 * Existing record in the extent map:
591 * 623 *
592 * cpos = 10, len = 10 624 * cpos = 10, len = 10
593 * |---------| 625 * |---------|
594 * 626 *
595 * New Record: 627 * New Record:
596 * 628 *
597 * cpos = 10, len = 20 629 * cpos = 10, len = 20
598 * |------------------| 630 * |------------------|
599 * 631 *
600 * The passed record is the new on-disk record. The new_clusters value 632 * The passed record is the new on-disk record. The new_clusters value
601 * is how many clusters were added to the file. If the append is a 633 * is how many clusters were added to the file. If the append is a
@@ -988,7 +1020,7 @@ int __init init_ocfs2_extent_maps(void)
988 return 0; 1020 return 0;
989} 1021}
990 1022
991void __exit exit_ocfs2_extent_maps(void) 1023void exit_ocfs2_extent_maps(void)
992{ 1024{
993 kmem_cache_destroy(ocfs2_em_ent_cachep); 1025 kmem_cache_destroy(ocfs2_em_ent_cachep);
994} 1026}
diff --git a/fs/ocfs2/file.c b/fs/ocfs2/file.c
index eaf33caa0a1f..8a4048b55fdc 100644
--- a/fs/ocfs2/file.c
+++ b/fs/ocfs2/file.c
@@ -933,9 +933,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
933 struct file *filp = iocb->ki_filp; 933 struct file *filp = iocb->ki_filp;
934 struct inode *inode = filp->f_dentry->d_inode; 934 struct inode *inode = filp->f_dentry->d_inode;
935 loff_t newsize, saved_pos; 935 loff_t newsize, saved_pos;
936#ifdef OCFS2_ORACORE_WORKAROUNDS
937 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
938#endif
939 936
940 mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf, 937 mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf,
941 (unsigned int)count, 938 (unsigned int)count,
@@ -951,14 +948,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
951 return -EIO; 948 return -EIO;
952 } 949 }
953 950
954#ifdef OCFS2_ORACORE_WORKAROUNDS
955 /* ugh, work around some applications which open everything O_DIRECT +
956 * O_APPEND and really don't mean to use O_DIRECT. */
957 if (osb->s_mount_opt & OCFS2_MOUNT_COMPAT_OCFS &&
958 (filp->f_flags & O_APPEND) && (filp->f_flags & O_DIRECT))
959 filp->f_flags &= ~O_DIRECT;
960#endif
961
962 mutex_lock(&inode->i_mutex); 951 mutex_lock(&inode->i_mutex);
963 /* to match setattr's i_mutex -> i_alloc_sem -> rw_lock ordering */ 952 /* to match setattr's i_mutex -> i_alloc_sem -> rw_lock ordering */
964 if (filp->f_flags & O_DIRECT) { 953 if (filp->f_flags & O_DIRECT) {
@@ -1022,8 +1011,9 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
1022 } 1011 }
1023 newsize = count + saved_pos; 1012 newsize = count + saved_pos;
1024 1013
1025 mlog(0, "pos=%lld newsize=%"MLFu64" cursize=%lld\n", 1014 mlog(0, "pos=%lld newsize=%lld cursize=%lld\n",
1026 saved_pos, newsize, i_size_read(inode)); 1015 (long long) saved_pos, (long long) newsize,
1016 (long long) i_size_read(inode));
1027 1017
1028 /* No need for a higher level metadata lock if we're 1018 /* No need for a higher level metadata lock if we're
1029 * never going past i_size. */ 1019 * never going past i_size. */
@@ -1042,8 +1032,9 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
1042 spin_unlock(&OCFS2_I(inode)->ip_lock); 1032 spin_unlock(&OCFS2_I(inode)->ip_lock);
1043 1033
1044 mlog(0, "Writing at EOF, may need more allocation: " 1034 mlog(0, "Writing at EOF, may need more allocation: "
1045 "i_size = %lld, newsize = %"MLFu64", need %u clusters\n", 1035 "i_size = %lld, newsize = %lld, need %u clusters\n",
1046 i_size_read(inode), newsize, clusters); 1036 (long long) i_size_read(inode), (long long) newsize,
1037 clusters);
1047 1038
1048 /* We only want to continue the rest of this loop if 1039 /* We only want to continue the rest of this loop if
1049 * our extend will actually require more 1040 * our extend will actually require more
@@ -1077,27 +1068,7 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
1077 /* communicate with ocfs2_dio_end_io */ 1068 /* communicate with ocfs2_dio_end_io */
1078 ocfs2_iocb_set_rw_locked(iocb); 1069 ocfs2_iocb_set_rw_locked(iocb);
1079 1070
1080#ifdef OCFS2_ORACORE_WORKAROUNDS 1071 ret = generic_file_aio_write_nolock(iocb, &local_iov, 1, &iocb->ki_pos);
1081 if (osb->s_mount_opt & OCFS2_MOUNT_COMPAT_OCFS &&
1082 filp->f_flags & O_DIRECT) {
1083 unsigned int saved_flags = filp->f_flags;
1084 int sector_size = 1 << osb->s_sectsize_bits;
1085
1086 if ((saved_pos & (sector_size - 1)) ||
1087 (count & (sector_size - 1)) ||
1088 ((unsigned long)buf & (sector_size - 1))) {
1089 filp->f_flags |= O_SYNC;
1090 filp->f_flags &= ~O_DIRECT;
1091 }
1092
1093 ret = generic_file_aio_write_nolock(iocb, &local_iov, 1,
1094 &iocb->ki_pos);
1095
1096 filp->f_flags = saved_flags;
1097 } else
1098#endif
1099 ret = generic_file_aio_write_nolock(iocb, &local_iov, 1,
1100 &iocb->ki_pos);
1101 1072
1102 /* buffered aio wouldn't have proper lock coverage today */ 1073 /* buffered aio wouldn't have proper lock coverage today */
1103 BUG_ON(ret == -EIOCBQUEUED && !(filp->f_flags & O_DIRECT)); 1074 BUG_ON(ret == -EIOCBQUEUED && !(filp->f_flags & O_DIRECT));
@@ -1138,9 +1109,6 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
1138 int ret = 0, rw_level = -1, have_alloc_sem = 0; 1109 int ret = 0, rw_level = -1, have_alloc_sem = 0;
1139 struct file *filp = iocb->ki_filp; 1110 struct file *filp = iocb->ki_filp;
1140 struct inode *inode = filp->f_dentry->d_inode; 1111 struct inode *inode = filp->f_dentry->d_inode;
1141#ifdef OCFS2_ORACORE_WORKAROUNDS
1142 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1143#endif
1144 1112
1145 mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf, 1113 mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", filp, buf,
1146 (unsigned int)count, 1114 (unsigned int)count,
@@ -1153,21 +1121,6 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
1153 goto bail; 1121 goto bail;
1154 } 1122 }
1155 1123
1156#ifdef OCFS2_ORACORE_WORKAROUNDS
1157 if (osb->s_mount_opt & OCFS2_MOUNT_COMPAT_OCFS) {
1158 if (filp->f_flags & O_DIRECT) {
1159 int sector_size = 1 << osb->s_sectsize_bits;
1160
1161 if ((pos & (sector_size - 1)) ||
1162 (count & (sector_size - 1)) ||
1163 ((unsigned long)buf & (sector_size - 1)) ||
1164 (i_size_read(inode) & (sector_size -1))) {
1165 filp->f_flags &= ~O_DIRECT;
1166 }
1167 }
1168 }
1169#endif
1170
1171 /* 1124 /*
1172 * buffered reads protect themselves in ->readpage(). O_DIRECT reads 1125 * buffered reads protect themselves in ->readpage(). O_DIRECT reads
1173 * need locks to protect pending reads from racing with truncate. 1126 * need locks to protect pending reads from racing with truncate.
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index 0bbd22f46c80..cbfd45a97a63 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -67,6 +67,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
67 ocfs2_node_map_init(&osb->mounted_map); 67 ocfs2_node_map_init(&osb->mounted_map);
68 ocfs2_node_map_init(&osb->recovery_map); 68 ocfs2_node_map_init(&osb->recovery_map);
69 ocfs2_node_map_init(&osb->umount_map); 69 ocfs2_node_map_init(&osb->umount_map);
70 ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
70} 71}
71 72
72static void ocfs2_do_node_down(int node_num, 73static void ocfs2_do_node_down(int node_num,
diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c
index d4ecc0627716..315472a5c192 100644
--- a/fs/ocfs2/inode.c
+++ b/fs/ocfs2/inode.c
@@ -41,6 +41,7 @@
41#include "dlmglue.h" 41#include "dlmglue.h"
42#include "extent_map.h" 42#include "extent_map.h"
43#include "file.h" 43#include "file.h"
44#include "heartbeat.h"
44#include "inode.h" 45#include "inode.h"
45#include "journal.h" 46#include "journal.h"
46#include "namei.h" 47#include "namei.h"
@@ -544,6 +545,42 @@ bail:
544 return status; 545 return status;
545} 546}
546 547
548/*
549 * Serialize with orphan dir recovery. If the process doing
550 * recovery on this orphan dir does an iget() with the dir
551 * i_mutex held, we'll deadlock here. Instead we detect this
552 * and exit early - recovery will wipe this inode for us.
553 */
554static int ocfs2_check_orphan_recovery_state(struct ocfs2_super *osb,
555 int slot)
556{
557 int ret = 0;
558
559 spin_lock(&osb->osb_lock);
560 if (ocfs2_node_map_test_bit(osb, &osb->osb_recovering_orphan_dirs, slot)) {
561 mlog(0, "Recovery is happening on orphan dir %d, will skip "
562 "this inode\n", slot);
563 ret = -EDEADLK;
564 goto out;
565 }
566 /* This signals to the orphan recovery process that it should
567 * wait for us to handle the wipe. */
568 osb->osb_orphan_wipes[slot]++;
569out:
570 spin_unlock(&osb->osb_lock);
571 return ret;
572}
573
574static void ocfs2_signal_wipe_completion(struct ocfs2_super *osb,
575 int slot)
576{
577 spin_lock(&osb->osb_lock);
578 osb->osb_orphan_wipes[slot]--;
579 spin_unlock(&osb->osb_lock);
580
581 wake_up(&osb->osb_wipe_event);
582}
583
547static int ocfs2_wipe_inode(struct inode *inode, 584static int ocfs2_wipe_inode(struct inode *inode,
548 struct buffer_head *di_bh) 585 struct buffer_head *di_bh)
549{ 586{
@@ -555,6 +592,11 @@ static int ocfs2_wipe_inode(struct inode *inode,
555 /* We've already voted on this so it should be readonly - no 592 /* We've already voted on this so it should be readonly - no
556 * spinlock needed. */ 593 * spinlock needed. */
557 orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot; 594 orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;
595
596 status = ocfs2_check_orphan_recovery_state(osb, orphaned_slot);
597 if (status)
598 return status;
599
558 orphan_dir_inode = ocfs2_get_system_file_inode(osb, 600 orphan_dir_inode = ocfs2_get_system_file_inode(osb,
559 ORPHAN_DIR_SYSTEM_INODE, 601 ORPHAN_DIR_SYSTEM_INODE,
560 orphaned_slot); 602 orphaned_slot);
@@ -597,6 +639,7 @@ bail_unlock_dir:
597 brelse(orphan_dir_bh); 639 brelse(orphan_dir_bh);
598bail: 640bail:
599 iput(orphan_dir_inode); 641 iput(orphan_dir_inode);
642 ocfs2_signal_wipe_completion(osb, orphaned_slot);
600 643
601 return status; 644 return status;
602} 645}
@@ -822,7 +865,8 @@ void ocfs2_delete_inode(struct inode *inode)
822 865
823 status = ocfs2_wipe_inode(inode, di_bh); 866 status = ocfs2_wipe_inode(inode, di_bh);
824 if (status < 0) { 867 if (status < 0) {
825 mlog_errno(status); 868 if (status != -EDEADLK)
869 mlog_errno(status);
826 goto bail_unlock_inode; 870 goto bail_unlock_inode;
827 } 871 }
828 872
@@ -903,10 +947,10 @@ void ocfs2_clear_inode(struct inode *inode)
903 "Clear inode of %"MLFu64", inode is locked\n", 947 "Clear inode of %"MLFu64", inode is locked\n",
904 oi->ip_blkno); 948 oi->ip_blkno);
905 949
906 mlog_bug_on_msg(down_trylock(&oi->ip_io_sem), 950 mlog_bug_on_msg(!mutex_trylock(&oi->ip_io_mutex),
907 "Clear inode of %"MLFu64", io_sem is locked\n", 951 "Clear inode of %"MLFu64", io_mutex is locked\n",
908 oi->ip_blkno); 952 oi->ip_blkno);
909 up(&oi->ip_io_sem); 953 mutex_unlock(&oi->ip_io_mutex);
910 954
911 /* 955 /*
912 * down_trylock() returns 0, down_write_trylock() returns 1 956 * down_trylock() returns 0, down_write_trylock() returns 1
diff --git a/fs/ocfs2/inode.h b/fs/ocfs2/inode.h
index 9b0177433653..84c507961287 100644
--- a/fs/ocfs2/inode.h
+++ b/fs/ocfs2/inode.h
@@ -46,10 +46,10 @@ struct ocfs2_inode_info
46 struct list_head ip_io_markers; 46 struct list_head ip_io_markers;
47 int ip_orphaned_slot; 47 int ip_orphaned_slot;
48 48
49 struct semaphore ip_io_sem; 49 struct mutex ip_io_mutex;
50 50
51 /* Used by the journalling code to attach an inode to a 51 /* Used by the journalling code to attach an inode to a
52 * handle. These are protected by ip_io_sem in order to lock 52 * handle. These are protected by ip_io_mutex in order to lock
53 * out other I/O to the inode until we either commit or 53 * out other I/O to the inode until we either commit or
54 * abort. */ 54 * abort. */
55 struct list_head ip_handle_list; 55 struct list_head ip_handle_list;
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 303c8d96457f..4be801f4559b 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -147,8 +147,7 @@ struct ocfs2_journal_handle *ocfs2_start_trans(struct ocfs2_super *osb,
147 147
148 mlog_entry("(max_buffs = %d)\n", max_buffs); 148 mlog_entry("(max_buffs = %d)\n", max_buffs);
149 149
150 if (!osb || !osb->journal->j_journal) 150 BUG_ON(!osb || !osb->journal->j_journal);
151 BUG();
152 151
153 if (ocfs2_is_hard_readonly(osb)) { 152 if (ocfs2_is_hard_readonly(osb)) {
154 ret = -EROFS; 153 ret = -EROFS;
@@ -401,7 +400,7 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
401 * j_trans_barrier for us. */ 400 * j_trans_barrier for us. */
402 ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode); 401 ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode);
403 402
404 down(&OCFS2_I(inode)->ip_io_sem); 403 mutex_lock(&OCFS2_I(inode)->ip_io_mutex);
405 switch (type) { 404 switch (type) {
406 case OCFS2_JOURNAL_ACCESS_CREATE: 405 case OCFS2_JOURNAL_ACCESS_CREATE:
407 case OCFS2_JOURNAL_ACCESS_WRITE: 406 case OCFS2_JOURNAL_ACCESS_WRITE:
@@ -416,7 +415,7 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
416 status = -EINVAL; 415 status = -EINVAL;
417 mlog(ML_ERROR, "Uknown access type!\n"); 416 mlog(ML_ERROR, "Uknown access type!\n");
418 } 417 }
419 up(&OCFS2_I(inode)->ip_io_sem); 418 mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
420 419
421 if (status < 0) 420 if (status < 0)
422 mlog(ML_ERROR, "Error %d getting %d access to buffer!\n", 421 mlog(ML_ERROR, "Error %d getting %d access to buffer!\n",
@@ -561,7 +560,11 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
561 SET_INODE_JOURNAL(inode); 560 SET_INODE_JOURNAL(inode);
562 OCFS2_I(inode)->ip_open_count++; 561 OCFS2_I(inode)->ip_open_count++;
563 562
564 status = ocfs2_meta_lock(inode, NULL, &bh, 1); 563 /* Skip recovery waits here - journal inode metadata never
564 * changes in a live cluster so it can be considered an
565 * exception to the rule. */
566 status = ocfs2_meta_lock_full(inode, NULL, &bh, 1,
567 OCFS2_META_LOCK_RECOVERY);
565 if (status < 0) { 568 if (status < 0) {
566 if (status != -ERESTARTSYS) 569 if (status != -ERESTARTSYS)
567 mlog(ML_ERROR, "Could not get lock on journal!\n"); 570 mlog(ML_ERROR, "Could not get lock on journal!\n");
@@ -672,8 +675,7 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
672 675
673 mlog_entry_void(); 676 mlog_entry_void();
674 677
675 if (!osb) 678 BUG_ON(!osb);
676 BUG();
677 679
678 journal = osb->journal; 680 journal = osb->journal;
679 if (!journal) 681 if (!journal)
@@ -805,8 +807,7 @@ int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full)
805 807
806 mlog_entry_void(); 808 mlog_entry_void();
807 809
808 if (!journal) 810 BUG_ON(!journal);
809 BUG();
810 811
811 status = journal_wipe(journal->j_journal, full); 812 status = journal_wipe(journal->j_journal, full);
812 if (status < 0) { 813 if (status < 0) {
@@ -1072,10 +1073,10 @@ restart:
1072 NULL); 1073 NULL);
1073 1074
1074bail: 1075bail:
1075 down(&osb->recovery_lock); 1076 mutex_lock(&osb->recovery_lock);
1076 if (!status && 1077 if (!status &&
1077 !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) { 1078 !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
1078 up(&osb->recovery_lock); 1079 mutex_unlock(&osb->recovery_lock);
1079 goto restart; 1080 goto restart;
1080 } 1081 }
1081 1082
@@ -1083,7 +1084,7 @@ bail:
1083 mb(); /* sync with ocfs2_recovery_thread_running */ 1084 mb(); /* sync with ocfs2_recovery_thread_running */
1084 wake_up(&osb->recovery_event); 1085 wake_up(&osb->recovery_event);
1085 1086
1086 up(&osb->recovery_lock); 1087 mutex_unlock(&osb->recovery_lock);
1087 1088
1088 mlog_exit(status); 1089 mlog_exit(status);
1089 /* no one is callint kthread_stop() for us so the kthread() api 1090 /* no one is callint kthread_stop() for us so the kthread() api
@@ -1098,7 +1099,7 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
1098 mlog_entry("(node_num=%d, osb->node_num = %d)\n", 1099 mlog_entry("(node_num=%d, osb->node_num = %d)\n",
1099 node_num, osb->node_num); 1100 node_num, osb->node_num);
1100 1101
1101 down(&osb->recovery_lock); 1102 mutex_lock(&osb->recovery_lock);
1102 if (osb->disable_recovery) 1103 if (osb->disable_recovery)
1103 goto out; 1104 goto out;
1104 1105
@@ -1120,7 +1121,7 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
1120 } 1121 }
1121 1122
1122out: 1123out:
1123 up(&osb->recovery_lock); 1124 mutex_unlock(&osb->recovery_lock);
1124 wake_up(&osb->recovery_event); 1125 wake_up(&osb->recovery_event);
1125 1126
1126 mlog_exit_void(); 1127 mlog_exit_void();
@@ -1271,8 +1272,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
1271 1272
1272 /* Should not ever be called to recover ourselves -- in that 1273 /* Should not ever be called to recover ourselves -- in that
1273 * case we should've called ocfs2_journal_load instead. */ 1274 * case we should've called ocfs2_journal_load instead. */
1274 if (osb->node_num == node_num) 1275 BUG_ON(osb->node_num == node_num);
1275 BUG();
1276 1276
1277 slot_num = ocfs2_node_num_to_slot(si, node_num); 1277 slot_num = ocfs2_node_num_to_slot(si, node_num);
1278 if (slot_num == OCFS2_INVALID_SLOT) { 1278 if (slot_num == OCFS2_INVALID_SLOT) {
@@ -1408,21 +1408,17 @@ bail:
1408 return status; 1408 return status;
1409} 1409}
1410 1410
1411static int ocfs2_recover_orphans(struct ocfs2_super *osb, 1411static int ocfs2_queue_orphans(struct ocfs2_super *osb,
1412 int slot) 1412 int slot,
1413 struct inode **head)
1413{ 1414{
1414 int status = 0; 1415 int status;
1415 int have_disk_lock = 0;
1416 struct inode *inode = NULL;
1417 struct inode *iter;
1418 struct inode *orphan_dir_inode = NULL; 1416 struct inode *orphan_dir_inode = NULL;
1417 struct inode *iter;
1419 unsigned long offset, blk, local; 1418 unsigned long offset, blk, local;
1420 struct buffer_head *bh = NULL; 1419 struct buffer_head *bh = NULL;
1421 struct ocfs2_dir_entry *de; 1420 struct ocfs2_dir_entry *de;
1422 struct super_block *sb = osb->sb; 1421 struct super_block *sb = osb->sb;
1423 struct ocfs2_inode_info *oi;
1424
1425 mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
1426 1422
1427 orphan_dir_inode = ocfs2_get_system_file_inode(osb, 1423 orphan_dir_inode = ocfs2_get_system_file_inode(osb,
1428 ORPHAN_DIR_SYSTEM_INODE, 1424 ORPHAN_DIR_SYSTEM_INODE,
@@ -1430,17 +1426,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1430 if (!orphan_dir_inode) { 1426 if (!orphan_dir_inode) {
1431 status = -ENOENT; 1427 status = -ENOENT;
1432 mlog_errno(status); 1428 mlog_errno(status);
1433 goto out; 1429 return status;
1434 } 1430 }
1435 1431
1436 mutex_lock(&orphan_dir_inode->i_mutex); 1432 mutex_lock(&orphan_dir_inode->i_mutex);
1437 status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0); 1433 status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
1438 if (status < 0) { 1434 if (status < 0) {
1439 mutex_unlock(&orphan_dir_inode->i_mutex);
1440 mlog_errno(status); 1435 mlog_errno(status);
1441 goto out; 1436 goto out;
1442 } 1437 }
1443 have_disk_lock = 1;
1444 1438
1445 offset = 0; 1439 offset = 0;
1446 iter = NULL; 1440 iter = NULL;
@@ -1451,11 +1445,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1451 if (!bh) 1445 if (!bh)
1452 status = -EINVAL; 1446 status = -EINVAL;
1453 if (status < 0) { 1447 if (status < 0) {
1454 mutex_unlock(&orphan_dir_inode->i_mutex);
1455 if (bh) 1448 if (bh)
1456 brelse(bh); 1449 brelse(bh);
1457 mlog_errno(status); 1450 mlog_errno(status);
1458 goto out; 1451 goto out_unlock;
1459 } 1452 }
1460 1453
1461 local = 0; 1454 local = 0;
@@ -1465,11 +1458,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1465 1458
1466 if (!ocfs2_check_dir_entry(orphan_dir_inode, 1459 if (!ocfs2_check_dir_entry(orphan_dir_inode,
1467 de, bh, local)) { 1460 de, bh, local)) {
1468 mutex_unlock(&orphan_dir_inode->i_mutex);
1469 status = -EINVAL; 1461 status = -EINVAL;
1470 mlog_errno(status); 1462 mlog_errno(status);
1471 brelse(bh); 1463 brelse(bh);
1472 goto out; 1464 goto out_unlock;
1473 } 1465 }
1474 1466
1475 local += le16_to_cpu(de->rec_len); 1467 local += le16_to_cpu(de->rec_len);
@@ -1504,18 +1496,95 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1504 1496
1505 mlog(0, "queue orphan %"MLFu64"\n", 1497 mlog(0, "queue orphan %"MLFu64"\n",
1506 OCFS2_I(iter)->ip_blkno); 1498 OCFS2_I(iter)->ip_blkno);
1507 OCFS2_I(iter)->ip_next_orphan = inode; 1499 /* No locking is required for the next_orphan
1508 inode = iter; 1500 * queue as there is only ever a single
1501 * process doing orphan recovery. */
1502 OCFS2_I(iter)->ip_next_orphan = *head;
1503 *head = iter;
1509 } 1504 }
1510 brelse(bh); 1505 brelse(bh);
1511 } 1506 }
1512 mutex_unlock(&orphan_dir_inode->i_mutex);
1513 1507
1508out_unlock:
1514 ocfs2_meta_unlock(orphan_dir_inode, 0); 1509 ocfs2_meta_unlock(orphan_dir_inode, 0);
1515 have_disk_lock = 0; 1510out:
1516 1511 mutex_unlock(&orphan_dir_inode->i_mutex);
1517 iput(orphan_dir_inode); 1512 iput(orphan_dir_inode);
1518 orphan_dir_inode = NULL; 1513 return status;
1514}
1515
1516static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
1517 int slot)
1518{
1519 int ret;
1520
1521 spin_lock(&osb->osb_lock);
1522 ret = !osb->osb_orphan_wipes[slot];
1523 spin_unlock(&osb->osb_lock);
1524 return ret;
1525}
1526
1527static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
1528 int slot)
1529{
1530 spin_lock(&osb->osb_lock);
1531 /* Mark ourselves such that new processes in delete_inode()
1532 * know to quit early. */
1533 ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
1534 while (osb->osb_orphan_wipes[slot]) {
1535 /* If any processes are already in the middle of an
1536 * orphan wipe on this dir, then we need to wait for
1537 * them. */
1538 spin_unlock(&osb->osb_lock);
1539 wait_event_interruptible(osb->osb_wipe_event,
1540 ocfs2_orphan_recovery_can_continue(osb, slot));
1541 spin_lock(&osb->osb_lock);
1542 }
1543 spin_unlock(&osb->osb_lock);
1544}
1545
1546static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
1547 int slot)
1548{
1549 ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
1550}
1551
1552/*
1553 * Orphan recovery. Each mounted node has it's own orphan dir which we
1554 * must run during recovery. Our strategy here is to build a list of
1555 * the inodes in the orphan dir and iget/iput them. The VFS does
1556 * (most) of the rest of the work.
1557 *
1558 * Orphan recovery can happen at any time, not just mount so we have a
1559 * couple of extra considerations.
1560 *
1561 * - We grab as many inodes as we can under the orphan dir lock -
1562 * doing iget() outside the orphan dir risks getting a reference on
1563 * an invalid inode.
1564 * - We must be sure not to deadlock with other processes on the
1565 * system wanting to run delete_inode(). This can happen when they go
1566 * to lock the orphan dir and the orphan recovery process attempts to
1567 * iget() inside the orphan dir lock. This can be avoided by
1568 * advertising our state to ocfs2_delete_inode().
1569 */
1570static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1571 int slot)
1572{
1573 int ret = 0;
1574 struct inode *inode = NULL;
1575 struct inode *iter;
1576 struct ocfs2_inode_info *oi;
1577
1578 mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
1579
1580 ocfs2_mark_recovering_orphan_dir(osb, slot);
1581 ret = ocfs2_queue_orphans(osb, slot, &inode);
1582 ocfs2_clear_recovering_orphan_dir(osb, slot);
1583
1584 /* Error here should be noted, but we want to continue with as
1585 * many queued inodes as we've got. */
1586 if (ret)
1587 mlog_errno(ret);
1519 1588
1520 while (inode) { 1589 while (inode) {
1521 oi = OCFS2_I(inode); 1590 oi = OCFS2_I(inode);
@@ -1541,14 +1610,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
1541 inode = iter; 1610 inode = iter;
1542 } 1611 }
1543 1612
1544out: 1613 return ret;
1545 if (have_disk_lock)
1546 ocfs2_meta_unlock(orphan_dir_inode, 0);
1547
1548 if (orphan_dir_inode)
1549 iput(orphan_dir_inode);
1550
1551 return status;
1552} 1614}
1553 1615
1554static int ocfs2_wait_on_mount(struct ocfs2_super *osb) 1616static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
@@ -1584,10 +1646,9 @@ static int ocfs2_commit_thread(void *arg)
1584 while (!(kthread_should_stop() && 1646 while (!(kthread_should_stop() &&
1585 atomic_read(&journal->j_num_trans) == 0)) { 1647 atomic_read(&journal->j_num_trans) == 0)) {
1586 1648
1587 wait_event_interruptible_timeout(osb->checkpoint_event, 1649 wait_event_interruptible(osb->checkpoint_event,
1588 atomic_read(&journal->j_num_trans) 1650 atomic_read(&journal->j_num_trans)
1589 || kthread_should_stop(), 1651 || kthread_should_stop());
1590 OCFS2_CHECKPOINT_INTERVAL);
1591 1652
1592 status = ocfs2_commit_cache(osb); 1653 status = ocfs2_commit_cache(osb);
1593 if (status < 0) 1654 if (status < 0)
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 7d0a816184fa..2f3a6acdac45 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -29,8 +29,6 @@
29#include <linux/fs.h> 29#include <linux/fs.h>
30#include <linux/jbd.h> 30#include <linux/jbd.h>
31 31
32#define OCFS2_CHECKPOINT_INTERVAL (8 * HZ)
33
34enum ocfs2_journal_state { 32enum ocfs2_journal_state {
35 OCFS2_JOURNAL_FREE = 0, 33 OCFS2_JOURNAL_FREE = 0,
36 OCFS2_JOURNAL_LOADED, 34 OCFS2_JOURNAL_LOADED,
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index f468c600cf92..e89de9b6e491 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -33,6 +33,7 @@
33#include <linux/rbtree.h> 33#include <linux/rbtree.h>
34#include <linux/workqueue.h> 34#include <linux/workqueue.h>
35#include <linux/kref.h> 35#include <linux/kref.h>
36#include <linux/mutex.h>
36 37
37#include "cluster/nodemanager.h" 38#include "cluster/nodemanager.h"
38#include "cluster/heartbeat.h" 39#include "cluster/heartbeat.h"
@@ -173,9 +174,6 @@ enum ocfs2_mount_options
173 OCFS2_MOUNT_NOINTR = 1 << 2, /* Don't catch signals */ 174 OCFS2_MOUNT_NOINTR = 1 << 2, /* Don't catch signals */
174 OCFS2_MOUNT_ERRORS_PANIC = 1 << 3, /* Panic on errors */ 175 OCFS2_MOUNT_ERRORS_PANIC = 1 << 3, /* Panic on errors */
175 OCFS2_MOUNT_DATA_WRITEBACK = 1 << 4, /* No data ordering */ 176 OCFS2_MOUNT_DATA_WRITEBACK = 1 << 4, /* No data ordering */
176#ifdef OCFS2_ORACORE_WORKAROUNDS
177 OCFS2_MOUNT_COMPAT_OCFS = 1 << 30, /* ocfs1 compatibility mode */
178#endif
179}; 177};
180 178
181#define OCFS2_OSB_SOFT_RO 0x0001 179#define OCFS2_OSB_SOFT_RO 0x0001
@@ -233,7 +231,7 @@ struct ocfs2_super
233 struct proc_dir_entry *proc_sub_dir; /* points to /proc/fs/ocfs2/<maj_min> */ 231 struct proc_dir_entry *proc_sub_dir; /* points to /proc/fs/ocfs2/<maj_min> */
234 232
235 atomic_t vol_state; 233 atomic_t vol_state;
236 struct semaphore recovery_lock; 234 struct mutex recovery_lock;
237 struct task_struct *recovery_thread_task; 235 struct task_struct *recovery_thread_task;
238 int disable_recovery; 236 int disable_recovery;
239 wait_queue_head_t checkpoint_event; 237 wait_queue_head_t checkpoint_event;
@@ -289,6 +287,10 @@ struct ocfs2_super
289 struct inode *osb_tl_inode; 287 struct inode *osb_tl_inode;
290 struct buffer_head *osb_tl_bh; 288 struct buffer_head *osb_tl_bh;
291 struct work_struct osb_truncate_log_wq; 289 struct work_struct osb_truncate_log_wq;
290
291 struct ocfs2_node_map osb_recovering_orphan_dirs;
292 unsigned int *osb_orphan_wipes;
293 wait_queue_head_t osb_wipe_event;
292}; 294};
293 295
294#define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info) 296#define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info)
diff --git a/fs/ocfs2/ocfs2_fs.h b/fs/ocfs2/ocfs2_fs.h
index dfb8a5bedfc8..c5b1ac547c15 100644
--- a/fs/ocfs2/ocfs2_fs.h
+++ b/fs/ocfs2/ocfs2_fs.h
@@ -138,7 +138,6 @@
138 138
139/* Journal limits (in bytes) */ 139/* Journal limits (in bytes) */
140#define OCFS2_MIN_JOURNAL_SIZE (4 * 1024 * 1024) 140#define OCFS2_MIN_JOURNAL_SIZE (4 * 1024 * 1024)
141#define OCFS2_MAX_JOURNAL_SIZE (500 * 1024 * 1024)
142 141
143struct ocfs2_system_inode_info { 142struct ocfs2_system_inode_info {
144 char *si_name; 143 char *si_name;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 364d64bd5f10..8dd3aafec499 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -932,7 +932,7 @@ static void ocfs2_inode_init_once(void *data,
932 oi->ip_dir_start_lookup = 0; 932 oi->ip_dir_start_lookup = 0;
933 933
934 init_rwsem(&oi->ip_alloc_sem); 934 init_rwsem(&oi->ip_alloc_sem);
935 init_MUTEX(&(oi->ip_io_sem)); 935 mutex_init(&oi->ip_io_mutex);
936 936
937 oi->ip_blkno = 0ULL; 937 oi->ip_blkno = 0ULL;
938 oi->ip_clusters = 0; 938 oi->ip_clusters = 0;
@@ -1137,9 +1137,9 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1137 1137
1138 /* disable any new recovery threads and wait for any currently 1138 /* disable any new recovery threads and wait for any currently
1139 * running ones to exit. Do this before setting the vol_state. */ 1139 * running ones to exit. Do this before setting the vol_state. */
1140 down(&osb->recovery_lock); 1140 mutex_lock(&osb->recovery_lock);
1141 osb->disable_recovery = 1; 1141 osb->disable_recovery = 1;
1142 up(&osb->recovery_lock); 1142 mutex_unlock(&osb->recovery_lock);
1143 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb)); 1143 wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
1144 1144
1145 /* At this point, we know that no more recovery threads can be 1145 /* At this point, we know that no more recovery threads can be
@@ -1254,8 +1254,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
1254 osb->sb = sb; 1254 osb->sb = sb;
1255 /* Save off for ocfs2_rw_direct */ 1255 /* Save off for ocfs2_rw_direct */
1256 osb->s_sectsize_bits = blksize_bits(sector_size); 1256 osb->s_sectsize_bits = blksize_bits(sector_size);
1257 if (!osb->s_sectsize_bits) 1257 BUG_ON(!osb->s_sectsize_bits);
1258 BUG();
1259 1258
1260 osb->net_response_ids = 0; 1259 osb->net_response_ids = 0;
1261 spin_lock_init(&osb->net_response_lock); 1260 spin_lock_init(&osb->net_response_lock);
@@ -1283,7 +1282,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
1283 snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u", 1282 snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u",
1284 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); 1283 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
1285 1284
1286 init_MUTEX(&osb->recovery_lock); 1285 mutex_init(&osb->recovery_lock);
1287 1286
1288 osb->disable_recovery = 0; 1287 osb->disable_recovery = 0;
1289 osb->recovery_thread_task = NULL; 1288 osb->recovery_thread_task = NULL;
@@ -1326,6 +1325,16 @@ static int ocfs2_initialize_super(struct super_block *sb,
1326 } 1325 }
1327 mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots); 1326 mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots);
1328 1327
1328 init_waitqueue_head(&osb->osb_wipe_event);
1329 osb->osb_orphan_wipes = kcalloc(osb->max_slots,
1330 sizeof(*osb->osb_orphan_wipes),
1331 GFP_KERNEL);
1332 if (!osb->osb_orphan_wipes) {
1333 status = -ENOMEM;
1334 mlog_errno(status);
1335 goto bail;
1336 }
1337
1329 osb->s_feature_compat = 1338 osb->s_feature_compat =
1330 le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat); 1339 le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat);
1331 osb->s_feature_ro_compat = 1340 osb->s_feature_ro_compat =
@@ -1639,6 +1648,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb)
1639 if (osb->slot_info) 1648 if (osb->slot_info)
1640 ocfs2_free_slot_info(osb->slot_info); 1649 ocfs2_free_slot_info(osb->slot_info);
1641 1650
1651 kfree(osb->osb_orphan_wipes);
1642 /* FIXME 1652 /* FIXME
1643 * This belongs in journal shutdown, but because we have to 1653 * This belongs in journal shutdown, but because we have to
1644 * allocate osb->journal at the start of ocfs2_initalize_osb(), 1654 * allocate osb->journal at the start of ocfs2_initalize_osb(),
diff --git a/fs/ocfs2/sysfile.c b/fs/ocfs2/sysfile.c
index 600a8bc5b541..fc29cb7a437d 100644
--- a/fs/ocfs2/sysfile.c
+++ b/fs/ocfs2/sysfile.c
@@ -77,8 +77,7 @@ struct inode *ocfs2_get_system_file_inode(struct ocfs2_super *osb,
77 if (arr && ((inode = *arr) != NULL)) { 77 if (arr && ((inode = *arr) != NULL)) {
78 /* get a ref in addition to the array ref */ 78 /* get a ref in addition to the array ref */
79 inode = igrab(inode); 79 inode = igrab(inode);
80 if (!inode) 80 BUG_ON(!inode);
81 BUG();
82 81
83 return inode; 82 return inode;
84 } 83 }
@@ -89,8 +88,7 @@ struct inode *ocfs2_get_system_file_inode(struct ocfs2_super *osb,
89 /* add one more if putting into array for first time */ 88 /* add one more if putting into array for first time */
90 if (arr && inode) { 89 if (arr && inode) {
91 *arr = igrab(inode); 90 *arr = igrab(inode);
92 if (!*arr) 91 BUG_ON(!*arr);
93 BUG();
94 } 92 }
95 return inode; 93 return inode;
96} 94}
diff --git a/fs/ocfs2/uptodate.c b/fs/ocfs2/uptodate.c
index 3a0458fd3e1b..300b5bedfb21 100644
--- a/fs/ocfs2/uptodate.c
+++ b/fs/ocfs2/uptodate.c
@@ -388,7 +388,7 @@ out_free:
388 } 388 }
389} 389}
390 390
391/* Item insertion is guarded by ip_io_sem, so the insertion path takes 391/* Item insertion is guarded by ip_io_mutex, so the insertion path takes
392 * advantage of this by not rechecking for a duplicate insert during 392 * advantage of this by not rechecking for a duplicate insert during
393 * the slow case. Additionally, if the cache needs to be bumped up to 393 * the slow case. Additionally, if the cache needs to be bumped up to
394 * a tree, the code will not recheck after acquiring the lock -- 394 * a tree, the code will not recheck after acquiring the lock --
@@ -418,7 +418,7 @@ void ocfs2_set_buffer_uptodate(struct inode *inode,
418 (unsigned long long) bh->b_blocknr); 418 (unsigned long long) bh->b_blocknr);
419 419
420 /* No need to recheck under spinlock - insertion is guarded by 420 /* No need to recheck under spinlock - insertion is guarded by
421 * ip_io_sem */ 421 * ip_io_mutex */
422 spin_lock(&oi->ip_lock); 422 spin_lock(&oi->ip_lock);
423 if (ocfs2_insert_can_use_array(oi, ci)) { 423 if (ocfs2_insert_can_use_array(oi, ci)) {
424 /* Fast case - it's an array and there's a free 424 /* Fast case - it's an array and there's a free
@@ -440,7 +440,7 @@ void ocfs2_set_buffer_uptodate(struct inode *inode,
440 440
441/* Called against a newly allocated buffer. Most likely nobody should 441/* Called against a newly allocated buffer. Most likely nobody should
442 * be able to read this sort of metadata while it's still being 442 * be able to read this sort of metadata while it's still being
443 * allocated, but this is careful to take ip_io_sem anyway. */ 443 * allocated, but this is careful to take ip_io_mutex anyway. */
444void ocfs2_set_new_buffer_uptodate(struct inode *inode, 444void ocfs2_set_new_buffer_uptodate(struct inode *inode,
445 struct buffer_head *bh) 445 struct buffer_head *bh)
446{ 446{
@@ -451,9 +451,9 @@ void ocfs2_set_new_buffer_uptodate(struct inode *inode,
451 451
452 set_buffer_uptodate(bh); 452 set_buffer_uptodate(bh);
453 453
454 down(&oi->ip_io_sem); 454 mutex_lock(&oi->ip_io_mutex);
455 ocfs2_set_buffer_uptodate(inode, bh); 455 ocfs2_set_buffer_uptodate(inode, bh);
456 up(&oi->ip_io_sem); 456 mutex_unlock(&oi->ip_io_mutex);
457} 457}
458 458
459/* Requires ip_lock. */ 459/* Requires ip_lock. */
@@ -537,7 +537,7 @@ int __init init_ocfs2_uptodate_cache(void)
537 return 0; 537 return 0;
538} 538}
539 539
540void __exit exit_ocfs2_uptodate_cache(void) 540void exit_ocfs2_uptodate_cache(void)
541{ 541{
542 if (ocfs2_uptodate_cachep) 542 if (ocfs2_uptodate_cachep)
543 kmem_cache_destroy(ocfs2_uptodate_cachep); 543 kmem_cache_destroy(ocfs2_uptodate_cachep);
diff --git a/fs/ocfs2/uptodate.h b/fs/ocfs2/uptodate.h
index e5aacdf4eabf..01cd32d26b06 100644
--- a/fs/ocfs2/uptodate.h
+++ b/fs/ocfs2/uptodate.h
@@ -27,7 +27,7 @@
27#define OCFS2_UPTODATE_H 27#define OCFS2_UPTODATE_H
28 28
29int __init init_ocfs2_uptodate_cache(void); 29int __init init_ocfs2_uptodate_cache(void);
30void __exit exit_ocfs2_uptodate_cache(void); 30void exit_ocfs2_uptodate_cache(void);
31 31
32void ocfs2_metadata_cache_init(struct inode *inode); 32void ocfs2_metadata_cache_init(struct inode *inode);
33void ocfs2_metadata_cache_purge(struct inode *inode); 33void ocfs2_metadata_cache_purge(struct inode *inode);