aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ocfs2/dlmglue.c97
-rw-r--r--fs/ocfs2/dlmglue.h1
-rw-r--r--fs/ocfs2/heartbeat.c40
-rw-r--r--fs/ocfs2/heartbeat.h2
-rw-r--r--fs/ocfs2/ocfs2.h4
-rw-r--r--fs/ocfs2/stackglue.c131
-rw-r--r--fs/ocfs2/stackglue.h35
-rw-r--r--fs/ocfs2/super.c13
8 files changed, 221 insertions, 102 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 459037653e5a..6652a48cbe0f 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -27,7 +27,6 @@
27#include <linux/slab.h> 27#include <linux/slab.h>
28#include <linux/highmem.h> 28#include <linux/highmem.h>
29#include <linux/mm.h> 29#include <linux/mm.h>
30#include <linux/crc32.h>
31#include <linux/kthread.h> 30#include <linux/kthread.h>
32#include <linux/pagemap.h> 31#include <linux/pagemap.h>
33#include <linux/debugfs.h> 32#include <linux/debugfs.h>
@@ -259,31 +258,6 @@ static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
259 .flags = 0, 258 .flags = 0,
260}; 259};
261 260
262/*
263 * This is the filesystem locking protocol version.
264 *
265 * Whenever the filesystem does new things with locks (adds or removes a
266 * lock, orders them differently, does different things underneath a lock),
267 * the version must be changed. The protocol is negotiated when joining
268 * the dlm domain. A node may join the domain if its major version is
269 * identical to all other nodes and its minor version is greater than
270 * or equal to all other nodes. When its minor version is greater than
271 * the other nodes, it will run at the minor version specified by the
272 * other nodes.
273 *
274 * If a locking change is made that will not be compatible with older
275 * versions, the major number must be increased and the minor version set
276 * to zero. If a change merely adds a behavior that can be disabled when
277 * speaking to older versions, the minor version must be increased. If a
278 * change adds a fully backwards compatible change (eg, LVB changes that
279 * are just ignored by older versions), the version does not need to be
280 * updated.
281 */
282const struct dlm_protocol_version ocfs2_locking_protocol = {
283 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
284 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
285};
286
287static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 261static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
288{ 262{
289 return lockres->l_type == OCFS2_LOCK_TYPE_META || 263 return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -886,7 +860,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
886 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 860 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
887 spin_unlock_irqrestore(&lockres->l_lock, flags); 861 spin_unlock_irqrestore(&lockres->l_lock, flags);
888 862
889 ret = ocfs2_dlm_lock(osb->dlm, 863 ret = ocfs2_dlm_lock(osb->cconn,
890 level, 864 level,
891 &lockres->l_lksb, 865 &lockres->l_lksb,
892 dlm_flags, 866 dlm_flags,
@@ -1085,7 +1059,7 @@ again:
1085 lockres->l_name, lockres->l_level, level); 1059 lockres->l_name, lockres->l_level, level);
1086 1060
1087 /* call dlm_lock to upgrade lock now */ 1061 /* call dlm_lock to upgrade lock now */
1088 ret = ocfs2_dlm_lock(osb->dlm, 1062 ret = ocfs2_dlm_lock(osb->cconn,
1089 level, 1063 level,
1090 &lockres->l_lksb, 1064 &lockres->l_lksb,
1091 lkm_flags, 1065 lkm_flags,
@@ -1492,7 +1466,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock)
1492 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1466 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1493 spin_unlock_irqrestore(&lockres->l_lock, flags); 1467 spin_unlock_irqrestore(&lockres->l_lock, flags);
1494 1468
1495 ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags, 1469 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1496 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1470 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1497 lockres); 1471 lockres);
1498 if (ret) { 1472 if (ret) {
@@ -2485,8 +2459,7 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2485int ocfs2_dlm_init(struct ocfs2_super *osb) 2459int ocfs2_dlm_init(struct ocfs2_super *osb)
2486{ 2460{
2487 int status = 0; 2461 int status = 0;
2488 u32 dlm_key; 2462 struct ocfs2_cluster_connection *conn = NULL;
2489 struct dlm_ctxt *dlm = NULL;
2490 2463
2491 mlog_entry_void(); 2464 mlog_entry_void();
2492 2465
@@ -2508,26 +2481,21 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
2508 goto bail; 2481 goto bail;
2509 } 2482 }
2510 2483
2511 /* used by the dlm code to make message headers unique, each
2512 * node in this domain must agree on this. */
2513 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2514
2515 /* for now, uuid == domain */ 2484 /* for now, uuid == domain */
2516 dlm = dlm_register_domain(osb->uuid_str, dlm_key, 2485 status = ocfs2_cluster_connect(osb->uuid_str,
2517 &osb->osb_locking_proto); 2486 strlen(osb->uuid_str),
2518 if (IS_ERR(dlm)) { 2487 ocfs2_do_node_down, osb,
2519 status = PTR_ERR(dlm); 2488 &conn);
2489 if (status) {
2520 mlog_errno(status); 2490 mlog_errno(status);
2521 goto bail; 2491 goto bail;
2522 } 2492 }
2523 2493
2524 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2525
2526local: 2494local:
2527 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2495 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2528 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2496 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2529 2497
2530 osb->dlm = dlm; 2498 osb->cconn = conn;
2531 2499
2532 status = 0; 2500 status = 0;
2533bail: 2501bail:
@@ -2545,10 +2513,14 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2545{ 2513{
2546 mlog_entry_void(); 2514 mlog_entry_void();
2547 2515
2548 dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2549
2550 ocfs2_drop_osb_locks(osb); 2516 ocfs2_drop_osb_locks(osb);
2551 2517
2518 /*
2519 * Now that we have dropped all locks and ocfs2_dismount_volume()
2520 * has disabled recovery, the DLM won't be talking to us. It's
2521 * safe to tear things down before disconnecting the cluster.
2522 */
2523
2552 if (osb->dc_task) { 2524 if (osb->dc_task) {
2553 kthread_stop(osb->dc_task); 2525 kthread_stop(osb->dc_task);
2554 osb->dc_task = NULL; 2526 osb->dc_task = NULL;
@@ -2557,8 +2529,8 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2557 ocfs2_lock_res_free(&osb->osb_super_lockres); 2529 ocfs2_lock_res_free(&osb->osb_super_lockres);
2558 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2530 ocfs2_lock_res_free(&osb->osb_rename_lockres);
2559 2531
2560 dlm_unregister_domain(osb->dlm); 2532 ocfs2_cluster_disconnect(osb->cconn);
2561 osb->dlm = NULL; 2533 osb->cconn = NULL;
2562 2534
2563 ocfs2_dlm_shutdown_debug(osb); 2535 ocfs2_dlm_shutdown_debug(osb);
2564 2536
@@ -2689,7 +2661,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb,
2689 2661
2690 mlog(0, "lock %s\n", lockres->l_name); 2662 mlog(0, "lock %s\n", lockres->l_name);
2691 2663
2692 ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2664 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
2693 lockres); 2665 lockres);
2694 if (ret) { 2666 if (ret) {
2695 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2667 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -2823,7 +2795,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2823 if (lvb) 2795 if (lvb)
2824 dlm_flags |= DLM_LKF_VALBLK; 2796 dlm_flags |= DLM_LKF_VALBLK;
2825 2797
2826 ret = ocfs2_dlm_lock(osb->dlm, 2798 ret = ocfs2_dlm_lock(osb->cconn,
2827 new_level, 2799 new_level,
2828 &lockres->l_lksb, 2800 &lockres->l_lksb,
2829 dlm_flags, 2801 dlm_flags,
@@ -2882,7 +2854,7 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2882 mlog_entry_void(); 2854 mlog_entry_void();
2883 mlog(0, "lock %s\n", lockres->l_name); 2855 mlog(0, "lock %s\n", lockres->l_name);
2884 2856
2885 ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, 2857 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
2886 DLM_LKF_CANCEL, lockres); 2858 DLM_LKF_CANCEL, lockres);
2887 if (ret) { 2859 if (ret) {
2888 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2860 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -3193,7 +3165,34 @@ static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3193 return UNBLOCK_CONTINUE_POST; 3165 return UNBLOCK_CONTINUE_POST;
3194} 3166}
3195 3167
3168/*
3169 * This is the filesystem locking protocol. It provides the lock handling
3170 * hooks for the underlying DLM. It has a maximum version number.
3171 * The version number allows interoperability with systems running at
3172 * the same major number and an equal or smaller minor number.
3173 *
3174 * Whenever the filesystem does new things with locks (adds or removes a
3175 * lock, orders them differently, does different things underneath a lock),
3176 * the version must be changed. The protocol is negotiated when joining
3177 * the dlm domain. A node may join the domain if its major version is
3178 * identical to all other nodes and its minor version is greater than
3179 * or equal to all other nodes. When its minor version is greater than
3180 * the other nodes, it will run at the minor version specified by the
3181 * other nodes.
3182 *
3183 * If a locking change is made that will not be compatible with older
3184 * versions, the major number must be increased and the minor version set
3185 * to zero. If a change merely adds a behavior that can be disabled when
3186 * speaking to older versions, the minor version must be increased. If a
3187 * change adds a fully backwards compatible change (eg, LVB changes that
3188 * are just ignored by older versions), the version does not need to be
3189 * updated.
3190 */
3196static struct ocfs2_locking_protocol lproto = { 3191static struct ocfs2_locking_protocol lproto = {
3192 .lp_max_version = {
3193 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3194 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3195 },
3197 .lp_lock_ast = ocfs2_locking_ast, 3196 .lp_lock_ast = ocfs2_locking_ast,
3198 .lp_blocking_ast = ocfs2_blocking_ast, 3197 .lp_blocking_ast = ocfs2_blocking_ast,
3199 .lp_unlock_ast = ocfs2_unlock_ast, 3198 .lp_unlock_ast = ocfs2_unlock_ast,
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index 32380439401f..2d0a8a03c431 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -117,5 +117,4 @@ void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug);
117void dlmglue_init_stack(void); 117void dlmglue_init_stack(void);
118void dlmglue_exit_stack(void); 118void dlmglue_exit_stack(void);
119 119
120extern const struct dlm_protocol_version ocfs2_locking_protocol;
121#endif /* DLMGLUE_H */ 120#endif /* DLMGLUE_H */
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index 80de2397c161..dcac1a487288 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -30,8 +30,6 @@
30#include <linux/highmem.h> 30#include <linux/highmem.h>
31#include <linux/kmod.h> 31#include <linux/kmod.h>
32 32
33#include <dlm/dlmapi.h>
34
35#define MLOG_MASK_PREFIX ML_SUPER 33#define MLOG_MASK_PREFIX ML_SUPER
36#include <cluster/masklog.h> 34#include <cluster/masklog.h>
37 35
@@ -64,19 +62,20 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
64 ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs); 62 ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
65} 63}
66 64
67static void ocfs2_do_node_down(int node_num, 65void ocfs2_do_node_down(int node_num, void *data)
68 struct ocfs2_super *osb)
69{ 66{
67 struct ocfs2_super *osb = data;
68
70 BUG_ON(osb->node_num == node_num); 69 BUG_ON(osb->node_num == node_num);
71 70
72 mlog(0, "ocfs2: node down event for %d\n", node_num); 71 mlog(0, "ocfs2: node down event for %d\n", node_num);
73 72
74 if (!osb->dlm) { 73 if (!osb->cconn) {
75 /* 74 /*
76 * No DLM means we're not even ready to participate yet. 75 * No cluster connection means we're not even ready to
77 * We check the slots after the DLM comes up, so we will 76 * participate yet. We check the slots after the cluster
78 * notice the node death then. We can safely ignore it 77 * comes up, so we will notice the node death then. We
79 * here. 78 * can safely ignore it here.
80 */ 79 */
81 return; 80 return;
82 } 81 }
@@ -84,29 +83,6 @@ static void ocfs2_do_node_down(int node_num,
84 ocfs2_recovery_thread(osb, node_num); 83 ocfs2_recovery_thread(osb, node_num);
85} 84}
86 85
87/* Called from the dlm when it's about to evict a node. We may also
88 * get a heartbeat callback later. */
89static void ocfs2_dlm_eviction_cb(int node_num,
90 void *data)
91{
92 struct ocfs2_super *osb = (struct ocfs2_super *) data;
93 struct super_block *sb = osb->sb;
94
95 mlog(ML_NOTICE, "device (%u,%u): dlm has evicted node %d\n",
96 MAJOR(sb->s_dev), MINOR(sb->s_dev), node_num);
97
98 ocfs2_do_node_down(node_num, osb);
99}
100
101void ocfs2_setup_hb_callbacks(struct ocfs2_super *osb)
102{
103 /* Not exactly a heartbeat callback, but leads to essentially
104 * the same path so we set it up here. */
105 dlm_setup_eviction_cb(&osb->osb_eviction_cb,
106 ocfs2_dlm_eviction_cb,
107 osb);
108}
109
110void ocfs2_stop_heartbeat(struct ocfs2_super *osb) 86void ocfs2_stop_heartbeat(struct ocfs2_super *osb)
111{ 87{
112 int ret; 88 int ret;
diff --git a/fs/ocfs2/heartbeat.h b/fs/ocfs2/heartbeat.h
index 98d8ffc995b1..38e2450bf14c 100644
--- a/fs/ocfs2/heartbeat.h
+++ b/fs/ocfs2/heartbeat.h
@@ -28,7 +28,7 @@
28 28
29void ocfs2_init_node_maps(struct ocfs2_super *osb); 29void ocfs2_init_node_maps(struct ocfs2_super *osb);
30 30
31void ocfs2_setup_hb_callbacks(struct ocfs2_super *osb); 31void ocfs2_do_node_down(int node_num, void *data);
32void ocfs2_stop_heartbeat(struct ocfs2_super *osb); 32void ocfs2_stop_heartbeat(struct ocfs2_super *osb);
33 33
34/* node map functions - used to keep track of mounted and in-recovery 34/* node map functions - used to keep track of mounted and in-recovery
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 6d7c6d2d0c23..664e4fe3eab5 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -248,12 +248,10 @@ struct ocfs2_super
248 struct ocfs2_alloc_stats alloc_stats; 248 struct ocfs2_alloc_stats alloc_stats;
249 char dev_str[20]; /* "major,minor" of the device */ 249 char dev_str[20]; /* "major,minor" of the device */
250 250
251 struct dlm_ctxt *dlm; 251 struct ocfs2_cluster_connection *cconn;
252 struct ocfs2_lock_res osb_super_lockres; 252 struct ocfs2_lock_res osb_super_lockres;
253 struct ocfs2_lock_res osb_rename_lockres; 253 struct ocfs2_lock_res osb_rename_lockres;
254 struct dlm_eviction_cb osb_eviction_cb;
255 struct ocfs2_dlm_debug *osb_dlm_debug; 254 struct ocfs2_dlm_debug *osb_dlm_debug;
256 struct dlm_protocol_version osb_locking_proto;
257 255
258 struct dentry *osb_debug_root; 256 struct dentry *osb_debug_root;
259 257
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index eb88854cb976..f6f309a08344 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -18,11 +18,21 @@
18 * General Public License for more details. 18 * General Public License for more details.
19 */ 19 */
20 20
21#include <linux/slab.h>
22#include <linux/crc32.h>
23
24/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
25#include <linux/fs.h>
26
21#include "cluster/masklog.h" 27#include "cluster/masklog.h"
22#include "stackglue.h" 28#include "stackglue.h"
23 29
24static struct ocfs2_locking_protocol *lproto; 30static struct ocfs2_locking_protocol *lproto;
25 31
32struct o2dlm_private {
33 struct dlm_eviction_cb op_eviction_cb;
34};
35
26/* These should be identical */ 36/* These should be identical */
27#if (DLM_LOCK_IV != LKM_IVMODE) 37#if (DLM_LOCK_IV != LKM_IVMODE)
28# error Lock modes do not match 38# error Lock modes do not match
@@ -197,7 +207,7 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status)
197 lproto->lp_unlock_ast(astarg, error); 207 lproto->lp_unlock_ast(astarg, error);
198} 208}
199 209
200int ocfs2_dlm_lock(struct dlm_ctxt *dlm, 210int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
201 int mode, 211 int mode,
202 union ocfs2_dlm_lksb *lksb, 212 union ocfs2_dlm_lksb *lksb,
203 u32 flags, 213 u32 flags,
@@ -212,15 +222,15 @@ int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
212 222
213 BUG_ON(lproto == NULL); 223 BUG_ON(lproto == NULL);
214 224
215 status = dlmlock(dlm, o2dlm_mode, &lksb->lksb_o2dlm, o2dlm_flags, 225 status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
216 name, namelen, 226 o2dlm_flags, name, namelen,
217 o2dlm_lock_ast_wrapper, astarg, 227 o2dlm_lock_ast_wrapper, astarg,
218 o2dlm_blocking_ast_wrapper); 228 o2dlm_blocking_ast_wrapper);
219 ret = dlm_status_to_errno(status); 229 ret = dlm_status_to_errno(status);
220 return ret; 230 return ret;
221} 231}
222 232
223int ocfs2_dlm_unlock(struct dlm_ctxt *dlm, 233int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
224 union ocfs2_dlm_lksb *lksb, 234 union ocfs2_dlm_lksb *lksb,
225 u32 flags, 235 u32 flags,
226 void *astarg) 236 void *astarg)
@@ -231,8 +241,8 @@ int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
231 241
232 BUG_ON(lproto == NULL); 242 BUG_ON(lproto == NULL);
233 243
234 status = dlmunlock(dlm, &lksb->lksb_o2dlm, o2dlm_flags, 244 status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
235 o2dlm_unlock_ast_wrapper, astarg); 245 o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
236 ret = dlm_status_to_errno(status); 246 ret = dlm_status_to_errno(status);
237 return ret; 247 return ret;
238} 248}
@@ -252,6 +262,115 @@ void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb)
252 return (void *)(lksb->lksb_o2dlm.lvb); 262 return (void *)(lksb->lksb_o2dlm.lvb);
253} 263}
254 264
265/*
266 * Called from the dlm when it's about to evict a node. This is how the
267 * classic stack signals node death.
268 */
269static void o2dlm_eviction_cb(int node_num, void *data)
270{
271 struct ocfs2_cluster_connection *conn = data;
272
273 mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n",
274 node_num, conn->cc_namelen, conn->cc_name);
275
276 conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
277}
278
279int ocfs2_cluster_connect(const char *group,
280 int grouplen,
281 void (*recovery_handler)(int node_num,
282 void *recovery_data),
283 void *recovery_data,
284 struct ocfs2_cluster_connection **conn)
285{
286 int rc = 0;
287 struct ocfs2_cluster_connection *new_conn;
288 u32 dlm_key;
289 struct dlm_ctxt *dlm;
290 struct o2dlm_private *priv;
291 struct dlm_protocol_version dlm_version;
292
293 BUG_ON(group == NULL);
294 BUG_ON(conn == NULL);
295 BUG_ON(recovery_handler == NULL);
296
297 if (grouplen > GROUP_NAME_MAX) {
298 rc = -EINVAL;
299 goto out;
300 }
301
302 new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
303 GFP_KERNEL);
304 if (!new_conn) {
305 rc = -ENOMEM;
306 goto out;
307 }
308
309 memcpy(new_conn->cc_name, group, grouplen);
310 new_conn->cc_namelen = grouplen;
311 new_conn->cc_recovery_handler = recovery_handler;
312 new_conn->cc_recovery_data = recovery_data;
313
314 /* Start the new connection at our maximum compatibility level */
315 new_conn->cc_version = lproto->lp_max_version;
316
317 priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
318 if (!priv) {
319 rc = -ENOMEM;
320 goto out_free;
321 }
322
323 /* This just fills the structure in. It is safe to use new_conn. */
324 dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
325 new_conn);
326
327 new_conn->cc_private = priv;
328
329 /* used by the dlm code to make message headers unique, each
330 * node in this domain must agree on this. */
331 dlm_key = crc32_le(0, group, grouplen);
332 dlm_version.pv_major = new_conn->cc_version.pv_major;
333 dlm_version.pv_minor = new_conn->cc_version.pv_minor;
334
335 dlm = dlm_register_domain(group, dlm_key, &dlm_version);
336 if (IS_ERR(dlm)) {
337 rc = PTR_ERR(dlm);
338 mlog_errno(rc);
339 goto out_free;
340 }
341
342 new_conn->cc_version.pv_major = dlm_version.pv_major;
343 new_conn->cc_version.pv_minor = dlm_version.pv_minor;
344 new_conn->cc_lockspace = dlm;
345
346 dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
347
348 *conn = new_conn;
349
350out_free:
351 if (rc) {
352 kfree(new_conn->cc_private);
353 kfree(new_conn);
354 }
355
356out:
357 return rc;
358}
359
360int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
361{
362 struct dlm_ctxt *dlm = conn->cc_lockspace;
363 struct o2dlm_private *priv = conn->cc_private;
364
365 dlm_unregister_eviction_cb(&priv->op_eviction_cb);
366 dlm_unregister_domain(dlm);
367
368 kfree(priv);
369 kfree(conn);
370
371 return 0;
372}
373
255void o2cb_get_stack(struct ocfs2_locking_protocol *proto) 374void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
256{ 375{
257 BUG_ON(proto == NULL); 376 BUG_ON(proto == NULL);
diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h
index 3c91e241892b..3900b5c3933c 100644
--- a/fs/ocfs2/stackglue.h
+++ b/fs/ocfs2/stackglue.h
@@ -32,9 +32,22 @@
32 */ 32 */
33#define DLM_LKF_LOCAL 0x00100000 33#define DLM_LKF_LOCAL 0x00100000
34 34
35/*
36 * This shadows DLM_LOCKSPACE_LEN in fs/dlm/dlm_internal.h. That probably
37 * wants to be in a public header.
38 */
39#define GROUP_NAME_MAX 64
40
41
35#include "dlm/dlmapi.h" 42#include "dlm/dlmapi.h"
36 43
44struct ocfs2_protocol_version {
45 u8 pv_major;
46 u8 pv_minor;
47};
48
37struct ocfs2_locking_protocol { 49struct ocfs2_locking_protocol {
50 struct ocfs2_protocol_version lp_max_version;
38 void (*lp_lock_ast)(void *astarg); 51 void (*lp_lock_ast)(void *astarg);
39 void (*lp_blocking_ast)(void *astarg, int level); 52 void (*lp_blocking_ast)(void *astarg, int level);
40 void (*lp_unlock_ast)(void *astarg, int error); 53 void (*lp_unlock_ast)(void *astarg, int error);
@@ -44,14 +57,32 @@ union ocfs2_dlm_lksb {
44 struct dlm_lockstatus lksb_o2dlm; 57 struct dlm_lockstatus lksb_o2dlm;
45}; 58};
46 59
47int ocfs2_dlm_lock(struct dlm_ctxt *dlm, 60struct ocfs2_cluster_connection {
61 char cc_name[GROUP_NAME_MAX];
62 int cc_namelen;
63 struct ocfs2_protocol_version cc_version;
64 void (*cc_recovery_handler)(int node_num, void *recovery_data);
65 void *cc_recovery_data;
66 void *cc_lockspace;
67 void *cc_private;
68};
69
70int ocfs2_cluster_connect(const char *group,
71 int grouplen,
72 void (*recovery_handler)(int node_num,
73 void *recovery_data),
74 void *recovery_data,
75 struct ocfs2_cluster_connection **conn);
76int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn);
77
78int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
48 int mode, 79 int mode,
49 union ocfs2_dlm_lksb *lksb, 80 union ocfs2_dlm_lksb *lksb,
50 u32 flags, 81 u32 flags,
51 void *name, 82 void *name,
52 unsigned int namelen, 83 unsigned int namelen,
53 void *astarg); 84 void *astarg);
54int ocfs2_dlm_unlock(struct dlm_ctxt *dlm, 85int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
55 union ocfs2_dlm_lksb *lksb, 86 union ocfs2_dlm_lksb *lksb,
56 u32 flags, 87 u32 flags,
57 void *astarg); 88 void *astarg);
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index c8675464e299..0ee49757467d 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1251,9 +1251,9 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1251 1251
1252 ocfs2_sync_blockdev(sb); 1252 ocfs2_sync_blockdev(sb);
1253 1253
1254 /* No dlm means we've failed during mount, so skip all the 1254 /* No cluster connection means we've failed during mount, so skip
1255 * steps which depended on that to complete. */ 1255 * all the steps which depended on that to complete. */
1256 if (osb->dlm) { 1256 if (osb->cconn) {
1257 tmp = ocfs2_super_lock(osb, 1); 1257 tmp = ocfs2_super_lock(osb, 1);
1258 if (tmp < 0) { 1258 if (tmp < 0) {
1259 mlog_errno(tmp); 1259 mlog_errno(tmp);
@@ -1264,12 +1264,12 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
1264 if (osb->slot_num != OCFS2_INVALID_SLOT) 1264 if (osb->slot_num != OCFS2_INVALID_SLOT)
1265 ocfs2_put_slot(osb); 1265 ocfs2_put_slot(osb);
1266 1266
1267 if (osb->dlm) 1267 if (osb->cconn)
1268 ocfs2_super_unlock(osb, 1); 1268 ocfs2_super_unlock(osb, 1);
1269 1269
1270 ocfs2_release_system_inodes(osb); 1270 ocfs2_release_system_inodes(osb);
1271 1271
1272 if (osb->dlm) 1272 if (osb->cconn)
1273 ocfs2_dlm_shutdown(osb); 1273 ocfs2_dlm_shutdown(osb);
1274 1274
1275 debugfs_remove(osb->osb_debug_root); 1275 debugfs_remove(osb->osb_debug_root);
@@ -1341,7 +1341,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
1341 sb->s_fs_info = osb; 1341 sb->s_fs_info = osb;
1342 sb->s_op = &ocfs2_sops; 1342 sb->s_op = &ocfs2_sops;
1343 sb->s_export_op = &ocfs2_export_ops; 1343 sb->s_export_op = &ocfs2_export_ops;
1344 osb->osb_locking_proto = ocfs2_locking_protocol;
1345 sb->s_time_gran = 1; 1344 sb->s_time_gran = 1;
1346 sb->s_flags |= MS_NOATIME; 1345 sb->s_flags |= MS_NOATIME;
1347 /* this is needed to support O_LARGEFILE */ 1346 /* this is needed to support O_LARGEFILE */
@@ -1391,8 +1390,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
1391 osb->local_alloc_state = OCFS2_LA_UNUSED; 1390 osb->local_alloc_state = OCFS2_LA_UNUSED;
1392 osb->local_alloc_bh = NULL; 1391 osb->local_alloc_bh = NULL;
1393 1392
1394 ocfs2_setup_hb_callbacks(osb);
1395
1396 init_waitqueue_head(&osb->osb_mount_event); 1393 init_waitqueue_head(&osb->osb_mount_event);
1397 1394
1398 osb->vol_label = kmalloc(OCFS2_MAX_VOL_LABEL_LEN, GFP_KERNEL); 1395 osb->vol_label = kmalloc(OCFS2_MAX_VOL_LABEL_LEN, GFP_KERNEL);