aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorJoel Becker <joel.becker@oracle.com>2010-01-30 07:33:50 -0500
committerJoel Becker <joel.becker@oracle.com>2010-02-26 18:41:18 -0500
commit0016eedc4185a3cd7e578b027a6e69001b85d6c4 (patch)
tree4233550efcca539035d8de19e9b5314c3036124b /fs
parente8fce482f3702c1ad27c97b26db5022aa1fa64c7 (diff)
ocfs2_dlmfs: Use the stackglue.
Rather than directly using o2dlm, dlmfs can now use the stackglue. This allows it to use userspace cluster stacks and fs/dlm. This commit forces o2cb for now. A latter commit will bump the protocol version and allow non-o2cb stacks. This is one big sed, really. LKM_xxMODE becomes DLM_LOCK_xx. LKM_flag becomes DLM_LKF_flag. We also learn to check that the LVB is valid before reading it. Any DLM can lose the contents of the LVB during a complicated recovery. userdlm should be checking this. Now it does. dlmfs will return 0 from read(2) if the LVB was invalid. Signed-off-by: Joel Becker <joel.becker@oracle.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ocfs2/dlmfs/dlmfs.c57
-rw-r--r--fs/ocfs2/dlmfs/userdlm.c266
-rw-r--r--fs/ocfs2/dlmfs/userdlm.h16
3 files changed, 166 insertions, 173 deletions
diff --git a/fs/ocfs2/dlmfs/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c
index 13ac2bffb05d..8697366b63ad 100644
--- a/fs/ocfs2/dlmfs/dlmfs.c
+++ b/fs/ocfs2/dlmfs/dlmfs.c
@@ -47,21 +47,13 @@
47 47
48#include <asm/uaccess.h> 48#include <asm/uaccess.h>
49 49
50 50#include "stackglue.h"
51#include "cluster/nodemanager.h"
52#include "cluster/heartbeat.h"
53#include "cluster/tcp.h"
54
55#include "dlm/dlmapi.h"
56
57#include "userdlm.h" 51#include "userdlm.h"
58
59#include "dlmfsver.h" 52#include "dlmfsver.h"
60 53
61#define MLOG_MASK_PREFIX ML_DLMFS 54#define MLOG_MASK_PREFIX ML_DLMFS
62#include "cluster/masklog.h" 55#include "cluster/masklog.h"
63 56
64#include "ocfs2_lockingver.h"
65 57
66static const struct super_operations dlmfs_ops; 58static const struct super_operations dlmfs_ops;
67static const struct file_operations dlmfs_file_operations; 59static const struct file_operations dlmfs_file_operations;
@@ -72,15 +64,6 @@ static struct kmem_cache *dlmfs_inode_cache;
72 64
73struct workqueue_struct *user_dlm_worker; 65struct workqueue_struct *user_dlm_worker;
74 66
75/*
76 * This is the userdlmfs locking protocol version.
77 *
78 * See fs/ocfs2/dlmglue.c for more details on locking versions.
79 */
80static const struct dlm_protocol_version user_locking_protocol = {
81 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
82 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
83};
84 67
85 68
86/* 69/*
@@ -259,7 +242,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
259 loff_t *ppos) 242 loff_t *ppos)
260{ 243{
261 int bytes_left; 244 int bytes_left;
262 ssize_t readlen; 245 ssize_t readlen, got;
263 char *lvb_buf; 246 char *lvb_buf;
264 struct inode *inode = filp->f_path.dentry->d_inode; 247 struct inode *inode = filp->f_path.dentry->d_inode;
265 248
@@ -285,9 +268,13 @@ static ssize_t dlmfs_file_read(struct file *filp,
285 if (!lvb_buf) 268 if (!lvb_buf)
286 return -ENOMEM; 269 return -ENOMEM;
287 270
288 user_dlm_read_lvb(inode, lvb_buf, readlen); 271 got = user_dlm_read_lvb(inode, lvb_buf, readlen);
289 bytes_left = __copy_to_user(buf, lvb_buf, readlen); 272 if (got) {
290 readlen -= bytes_left; 273 BUG_ON(got != readlen);
274 bytes_left = __copy_to_user(buf, lvb_buf, readlen);
275 readlen -= bytes_left;
276 } else
277 readlen = 0;
291 278
292 kfree(lvb_buf); 279 kfree(lvb_buf);
293 280
@@ -346,7 +333,7 @@ static void dlmfs_init_once(void *foo)
346 struct dlmfs_inode_private *ip = 333 struct dlmfs_inode_private *ip =
347 (struct dlmfs_inode_private *) foo; 334 (struct dlmfs_inode_private *) foo;
348 335
349 ip->ip_dlm = NULL; 336 ip->ip_conn = NULL;
350 ip->ip_parent = NULL; 337 ip->ip_parent = NULL;
351 338
352 inode_init_once(&ip->ip_vfs_inode); 339 inode_init_once(&ip->ip_vfs_inode);
@@ -388,14 +375,14 @@ static void dlmfs_clear_inode(struct inode *inode)
388 goto clear_fields; 375 goto clear_fields;
389 } 376 }
390 377
391 mlog(0, "we're a directory, ip->ip_dlm = 0x%p\n", ip->ip_dlm); 378 mlog(0, "we're a directory, ip->ip_conn = 0x%p\n", ip->ip_conn);
392 /* we must be a directory. If required, lets unregister the 379 /* we must be a directory. If required, lets unregister the
393 * dlm context now. */ 380 * dlm context now. */
394 if (ip->ip_dlm) 381 if (ip->ip_conn)
395 user_dlm_unregister_context(ip->ip_dlm); 382 user_dlm_unregister(ip->ip_conn);
396clear_fields: 383clear_fields:
397 ip->ip_parent = NULL; 384 ip->ip_parent = NULL;
398 ip->ip_dlm = NULL; 385 ip->ip_conn = NULL;
399} 386}
400 387
401static struct backing_dev_info dlmfs_backing_dev_info = { 388static struct backing_dev_info dlmfs_backing_dev_info = {
@@ -445,7 +432,7 @@ static struct inode *dlmfs_get_inode(struct inode *parent,
445 inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; 432 inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
446 433
447 ip = DLMFS_I(inode); 434 ip = DLMFS_I(inode);
448 ip->ip_dlm = DLMFS_I(parent)->ip_dlm; 435 ip->ip_conn = DLMFS_I(parent)->ip_conn;
449 436
450 switch (mode & S_IFMT) { 437 switch (mode & S_IFMT) {
451 default: 438 default:
@@ -499,13 +486,12 @@ static int dlmfs_mkdir(struct inode * dir,
499 struct inode *inode = NULL; 486 struct inode *inode = NULL;
500 struct qstr *domain = &dentry->d_name; 487 struct qstr *domain = &dentry->d_name;
501 struct dlmfs_inode_private *ip; 488 struct dlmfs_inode_private *ip;
502 struct dlm_ctxt *dlm; 489 struct ocfs2_cluster_connection *conn;
503 struct dlm_protocol_version proto = user_locking_protocol;
504 490
505 mlog(0, "mkdir %.*s\n", domain->len, domain->name); 491 mlog(0, "mkdir %.*s\n", domain->len, domain->name);
506 492
507 /* verify that we have a proper domain */ 493 /* verify that we have a proper domain */
508 if (domain->len >= O2NM_MAX_NAME_LEN) { 494 if (domain->len >= GROUP_NAME_MAX) {
509 status = -EINVAL; 495 status = -EINVAL;
510 mlog(ML_ERROR, "invalid domain name for directory.\n"); 496 mlog(ML_ERROR, "invalid domain name for directory.\n");
511 goto bail; 497 goto bail;
@@ -520,14 +506,14 @@ static int dlmfs_mkdir(struct inode * dir,
520 506
521 ip = DLMFS_I(inode); 507 ip = DLMFS_I(inode);
522 508
523 dlm = user_dlm_register_context(domain, &proto); 509 conn = user_dlm_register(domain);
524 if (IS_ERR(dlm)) { 510 if (IS_ERR(conn)) {
525 status = PTR_ERR(dlm); 511 status = PTR_ERR(conn);
526 mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n", 512 mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n",
527 status, domain->len, domain->name); 513 status, domain->len, domain->name);
528 goto bail; 514 goto bail;
529 } 515 }
530 ip->ip_dlm = dlm; 516 ip->ip_conn = conn;
531 517
532 inc_nlink(dir); 518 inc_nlink(dir);
533 d_instantiate(dentry, inode); 519 d_instantiate(dentry, inode);
@@ -696,6 +682,7 @@ static int __init init_dlmfs_fs(void)
696 } 682 }
697 cleanup_worker = 1; 683 cleanup_worker = 1;
698 684
685 user_dlm_set_locking_protocol();
699 status = register_filesystem(&dlmfs_fs_type); 686 status = register_filesystem(&dlmfs_fs_type);
700bail: 687bail:
701 if (status) { 688 if (status) {
diff --git a/fs/ocfs2/dlmfs/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c
index 6adae70cee8e..c1b6a56a268f 100644
--- a/fs/ocfs2/dlmfs/userdlm.c
+++ b/fs/ocfs2/dlmfs/userdlm.c
@@ -34,18 +34,19 @@
34#include <linux/types.h> 34#include <linux/types.h>
35#include <linux/crc32.h> 35#include <linux/crc32.h>
36 36
37 37#include "ocfs2_lockingver.h"
38#include "cluster/nodemanager.h" 38#include "stackglue.h"
39#include "cluster/heartbeat.h"
40#include "cluster/tcp.h"
41
42#include "dlm/dlmapi.h"
43
44#include "userdlm.h" 39#include "userdlm.h"
45 40
46#define MLOG_MASK_PREFIX ML_DLMFS 41#define MLOG_MASK_PREFIX ML_DLMFS
47#include "cluster/masklog.h" 42#include "cluster/masklog.h"
48 43
44
45static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
46{
47 return container_of(lksb, struct user_lock_res, l_lksb);
48}
49
49static inline int user_check_wait_flag(struct user_lock_res *lockres, 50static inline int user_check_wait_flag(struct user_lock_res *lockres,
50 int flag) 51 int flag)
51{ 52{
@@ -73,15 +74,15 @@ static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
73} 74}
74 75
75/* I heart container_of... */ 76/* I heart container_of... */
76static inline struct dlm_ctxt * 77static inline struct ocfs2_cluster_connection *
77dlm_ctxt_from_user_lockres(struct user_lock_res *lockres) 78cluster_connection_from_user_lockres(struct user_lock_res *lockres)
78{ 79{
79 struct dlmfs_inode_private *ip; 80 struct dlmfs_inode_private *ip;
80 81
81 ip = container_of(lockres, 82 ip = container_of(lockres,
82 struct dlmfs_inode_private, 83 struct dlmfs_inode_private,
83 ip_lockres); 84 ip_lockres);
84 return ip->ip_dlm; 85 return ip->ip_conn;
85} 86}
86 87
87static struct inode * 88static struct inode *
@@ -103,9 +104,9 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
103} 104}
104 105
105#define user_log_dlm_error(_func, _stat, _lockres) do { \ 106#define user_log_dlm_error(_func, _stat, _lockres) do { \
106 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 107 mlog(ML_ERROR, "Dlm error %d while calling %s on " \
107 "resource %.*s: %s\n", dlm_errname(_stat), _func, \ 108 "resource %.*s\n", _stat, _func, \
108 _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \ 109 _lockres->l_namelen, _lockres->l_name); \
109} while (0) 110} while (0)
110 111
111/* WARNING: This function lives in a world where the only three lock 112/* WARNING: This function lives in a world where the only three lock
@@ -113,34 +114,34 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
113 * lock types are added. */ 114 * lock types are added. */
114static inline int user_highest_compat_lock_level(int level) 115static inline int user_highest_compat_lock_level(int level)
115{ 116{
116 int new_level = LKM_EXMODE; 117 int new_level = DLM_LOCK_EX;
117 118
118 if (level == LKM_EXMODE) 119 if (level == DLM_LOCK_EX)
119 new_level = LKM_NLMODE; 120 new_level = DLM_LOCK_NL;
120 else if (level == LKM_PRMODE) 121 else if (level == DLM_LOCK_PR)
121 new_level = LKM_PRMODE; 122 new_level = DLM_LOCK_PR;
122 return new_level; 123 return new_level;
123} 124}
124 125
125static void user_ast(void *opaque) 126static void user_ast(struct ocfs2_dlm_lksb *lksb)
126{ 127{
127 struct user_lock_res *lockres = opaque; 128 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
128 struct dlm_lockstatus *lksb; 129 int status;
129 130
130 mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen, 131 mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
131 lockres->l_name); 132 lockres->l_name);
132 133
133 spin_lock(&lockres->l_lock); 134 spin_lock(&lockres->l_lock);
134 135
135 lksb = &(lockres->l_lksb); 136 status = ocfs2_dlm_lock_status(&lockres->l_lksb);
136 if (lksb->status != DLM_NORMAL) { 137 if (status) {
137 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", 138 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
138 lksb->status, lockres->l_namelen, lockres->l_name); 139 status, lockres->l_namelen, lockres->l_name);
139 spin_unlock(&lockres->l_lock); 140 spin_unlock(&lockres->l_lock);
140 return; 141 return;
141 } 142 }
142 143
143 mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE, 144 mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
144 "Lockres %.*s, requested ivmode. flags 0x%x\n", 145 "Lockres %.*s, requested ivmode. flags 0x%x\n",
145 lockres->l_namelen, lockres->l_name, lockres->l_flags); 146 lockres->l_namelen, lockres->l_name, lockres->l_flags);
146 147
@@ -148,13 +149,13 @@ static void user_ast(void *opaque)
148 if (lockres->l_requested < lockres->l_level) { 149 if (lockres->l_requested < lockres->l_level) {
149 if (lockres->l_requested <= 150 if (lockres->l_requested <=
150 user_highest_compat_lock_level(lockres->l_blocking)) { 151 user_highest_compat_lock_level(lockres->l_blocking)) {
151 lockres->l_blocking = LKM_NLMODE; 152 lockres->l_blocking = DLM_LOCK_NL;
152 lockres->l_flags &= ~USER_LOCK_BLOCKED; 153 lockres->l_flags &= ~USER_LOCK_BLOCKED;
153 } 154 }
154 } 155 }
155 156
156 lockres->l_level = lockres->l_requested; 157 lockres->l_level = lockres->l_requested;
157 lockres->l_requested = LKM_IVMODE; 158 lockres->l_requested = DLM_LOCK_IV;
158 lockres->l_flags |= USER_LOCK_ATTACHED; 159 lockres->l_flags |= USER_LOCK_ATTACHED;
159 lockres->l_flags &= ~USER_LOCK_BUSY; 160 lockres->l_flags &= ~USER_LOCK_BUSY;
160 161
@@ -193,11 +194,11 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
193 return; 194 return;
194 195
195 switch (lockres->l_blocking) { 196 switch (lockres->l_blocking) {
196 case LKM_EXMODE: 197 case DLM_LOCK_EX:
197 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 198 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
198 queue = 1; 199 queue = 1;
199 break; 200 break;
200 case LKM_PRMODE: 201 case DLM_LOCK_PR:
201 if (!lockres->l_ex_holders) 202 if (!lockres->l_ex_holders)
202 queue = 1; 203 queue = 1;
203 break; 204 break;
@@ -209,9 +210,9 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
209 __user_dlm_queue_lockres(lockres); 210 __user_dlm_queue_lockres(lockres);
210} 211}
211 212
212static void user_bast(void *opaque, int level) 213static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
213{ 214{
214 struct user_lock_res *lockres = opaque; 215 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
215 216
216 mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n", 217 mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
217 lockres->l_namelen, lockres->l_name, level); 218 lockres->l_namelen, lockres->l_name, level);
@@ -227,15 +228,15 @@ static void user_bast(void *opaque, int level)
227 wake_up(&lockres->l_event); 228 wake_up(&lockres->l_event);
228} 229}
229 230
230static void user_unlock_ast(void *opaque, enum dlm_status status) 231static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
231{ 232{
232 struct user_lock_res *lockres = opaque; 233 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
233 234
234 mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen, 235 mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
235 lockres->l_name); 236 lockres->l_name);
236 237
237 if (status != DLM_NORMAL && status != DLM_CANCELGRANT) 238 if (status)
238 mlog(ML_ERROR, "Dlm returns status %d\n", status); 239 mlog(ML_ERROR, "dlm returns status %d\n", status);
239 240
240 spin_lock(&lockres->l_lock); 241 spin_lock(&lockres->l_lock);
241 /* The teardown flag gets set early during the unlock process, 242 /* The teardown flag gets set early during the unlock process,
@@ -243,7 +244,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
243 * for a concurrent cancel. */ 244 * for a concurrent cancel. */
244 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN 245 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
245 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { 246 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
246 lockres->l_level = LKM_IVMODE; 247 lockres->l_level = DLM_LOCK_IV;
247 } else if (status == DLM_CANCELGRANT) { 248 } else if (status == DLM_CANCELGRANT) {
248 /* We tried to cancel a convert request, but it was 249 /* We tried to cancel a convert request, but it was
249 * already granted. Don't clear the busy flag - the 250 * already granted. Don't clear the busy flag - the
@@ -254,7 +255,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status)
254 } else { 255 } else {
255 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 256 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
256 /* Cancel succeeded, we want to re-queue */ 257 /* Cancel succeeded, we want to re-queue */
257 lockres->l_requested = LKM_IVMODE; /* cancel an 258 lockres->l_requested = DLM_LOCK_IV; /* cancel an
258 * upconvert 259 * upconvert
259 * request. */ 260 * request. */
260 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 261 lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
@@ -271,6 +272,21 @@ out_noclear:
271 wake_up(&lockres->l_event); 272 wake_up(&lockres->l_event);
272} 273}
273 274
275/*
276 * This is the userdlmfs locking protocol version.
277 *
278 * See fs/ocfs2/dlmglue.c for more details on locking versions.
279 */
280static struct ocfs2_locking_protocol user_dlm_lproto = {
281 .lp_max_version = {
282 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
283 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
284 },
285 .lp_lock_ast = user_ast,
286 .lp_blocking_ast = user_bast,
287 .lp_unlock_ast = user_unlock_ast,
288};
289
274static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) 290static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
275{ 291{
276 struct inode *inode; 292 struct inode *inode;
@@ -283,7 +299,8 @@ static void user_dlm_unblock_lock(struct work_struct *work)
283 int new_level, status; 299 int new_level, status;
284 struct user_lock_res *lockres = 300 struct user_lock_res *lockres =
285 container_of(work, struct user_lock_res, l_work); 301 container_of(work, struct user_lock_res, l_work);
286 struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); 302 struct ocfs2_cluster_connection *conn =
303 cluster_connection_from_user_lockres(lockres);
287 304
288 mlog(0, "processing lockres %.*s\n", lockres->l_namelen, 305 mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
289 lockres->l_name); 306 lockres->l_name);
@@ -322,20 +339,17 @@ static void user_dlm_unblock_lock(struct work_struct *work)
322 lockres->l_flags |= USER_LOCK_IN_CANCEL; 339 lockres->l_flags |= USER_LOCK_IN_CANCEL;
323 spin_unlock(&lockres->l_lock); 340 spin_unlock(&lockres->l_lock);
324 341
325 status = dlmunlock(dlm, 342 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
326 &lockres->l_lksb, 343 DLM_LKF_CANCEL);
327 LKM_CANCEL, 344 if (status)
328 user_unlock_ast, 345 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
329 lockres);
330 if (status != DLM_NORMAL)
331 user_log_dlm_error("dlmunlock", status, lockres);
332 goto drop_ref; 346 goto drop_ref;
333 } 347 }
334 348
335 /* If there are still incompat holders, we can exit safely 349 /* If there are still incompat holders, we can exit safely
336 * without worrying about re-queueing this lock as that will 350 * without worrying about re-queueing this lock as that will
337 * happen on the last call to user_cluster_unlock. */ 351 * happen on the last call to user_cluster_unlock. */
338 if ((lockres->l_blocking == LKM_EXMODE) 352 if ((lockres->l_blocking == DLM_LOCK_EX)
339 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 353 && (lockres->l_ex_holders || lockres->l_ro_holders)) {
340 spin_unlock(&lockres->l_lock); 354 spin_unlock(&lockres->l_lock);
341 mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n", 355 mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n",
@@ -343,7 +357,7 @@ static void user_dlm_unblock_lock(struct work_struct *work)
343 goto drop_ref; 357 goto drop_ref;
344 } 358 }
345 359
346 if ((lockres->l_blocking == LKM_PRMODE) 360 if ((lockres->l_blocking == DLM_LOCK_PR)
347 && lockres->l_ex_holders) { 361 && lockres->l_ex_holders) {
348 spin_unlock(&lockres->l_lock); 362 spin_unlock(&lockres->l_lock);
349 mlog(0, "can't downconvert for pr: ex = %u\n", 363 mlog(0, "can't downconvert for pr: ex = %u\n",
@@ -360,17 +374,12 @@ static void user_dlm_unblock_lock(struct work_struct *work)
360 spin_unlock(&lockres->l_lock); 374 spin_unlock(&lockres->l_lock);
361 375
362 /* need lock downconvert request now... */ 376 /* need lock downconvert request now... */
363 status = dlmlock(dlm, 377 status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
364 new_level, 378 DLM_LKF_CONVERT|DLM_LKF_VALBLK,
365 &lockres->l_lksb, 379 lockres->l_name,
366 LKM_CONVERT|LKM_VALBLK, 380 lockres->l_namelen);
367 lockres->l_name, 381 if (status) {
368 lockres->l_namelen, 382 user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
369 user_ast,
370 lockres,
371 user_bast);
372 if (status != DLM_NORMAL) {
373 user_log_dlm_error("dlmlock", status, lockres);
374 user_recover_from_dlm_error(lockres); 383 user_recover_from_dlm_error(lockres);
375 } 384 }
376 385
@@ -382,10 +391,10 @@ static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
382 int level) 391 int level)
383{ 392{
384 switch(level) { 393 switch(level) {
385 case LKM_EXMODE: 394 case DLM_LOCK_EX:
386 lockres->l_ex_holders++; 395 lockres->l_ex_holders++;
387 break; 396 break;
388 case LKM_PRMODE: 397 case DLM_LOCK_PR:
389 lockres->l_ro_holders++; 398 lockres->l_ro_holders++;
390 break; 399 break;
391 default: 400 default:
@@ -410,10 +419,11 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
410 int lkm_flags) 419 int lkm_flags)
411{ 420{
412 int status, local_flags; 421 int status, local_flags;
413 struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); 422 struct ocfs2_cluster_connection *conn =
423 cluster_connection_from_user_lockres(lockres);
414 424
415 if (level != LKM_EXMODE && 425 if (level != DLM_LOCK_EX &&
416 level != LKM_PRMODE) { 426 level != DLM_LOCK_PR) {
417 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 427 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
418 lockres->l_namelen, lockres->l_name); 428 lockres->l_namelen, lockres->l_name);
419 status = -EINVAL; 429 status = -EINVAL;
@@ -422,7 +432,7 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres,
422 432
423 mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n", 433 mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
424 lockres->l_namelen, lockres->l_name, 434 lockres->l_namelen, lockres->l_name,
425 (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE", 435 (level == DLM_LOCK_EX) ? "DLM_LOCK_EX" : "DLM_LOCK_PR",
426 lkm_flags); 436 lkm_flags);
427 437
428again: 438again:
@@ -457,35 +467,26 @@ again:
457 } 467 }
458 468
459 if (level > lockres->l_level) { 469 if (level > lockres->l_level) {
460 local_flags = lkm_flags | LKM_VALBLK; 470 local_flags = lkm_flags | DLM_LKF_VALBLK;
461 if (lockres->l_level != LKM_IVMODE) 471 if (lockres->l_level != DLM_LOCK_IV)
462 local_flags |= LKM_CONVERT; 472 local_flags |= DLM_LKF_CONVERT;
463 473
464 lockres->l_requested = level; 474 lockres->l_requested = level;
465 lockres->l_flags |= USER_LOCK_BUSY; 475 lockres->l_flags |= USER_LOCK_BUSY;
466 spin_unlock(&lockres->l_lock); 476 spin_unlock(&lockres->l_lock);
467 477
468 BUG_ON(level == LKM_IVMODE); 478 BUG_ON(level == DLM_LOCK_IV);
469 BUG_ON(level == LKM_NLMODE); 479 BUG_ON(level == DLM_LOCK_NL);
470 480
471 /* call dlm_lock to upgrade lock now */ 481 /* call dlm_lock to upgrade lock now */
472 status = dlmlock(dlm, 482 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
473 level, 483 local_flags, lockres->l_name,
474 &lockres->l_lksb, 484 lockres->l_namelen);
475 local_flags, 485 if (status) {
476 lockres->l_name, 486 if ((lkm_flags & DLM_LKF_NOQUEUE) &&
477 lockres->l_namelen, 487 (status != -EAGAIN))
478 user_ast, 488 user_log_dlm_error("ocfs2_dlm_lock",
479 lockres, 489 status, lockres);
480 user_bast);
481 if (status != DLM_NORMAL) {
482 if ((lkm_flags & LKM_NOQUEUE) &&
483 (status == DLM_NOTQUEUED))
484 status = -EAGAIN;
485 else {
486 user_log_dlm_error("dlmlock", status, lockres);
487 status = -EINVAL;
488 }
489 user_recover_from_dlm_error(lockres); 490 user_recover_from_dlm_error(lockres);
490 goto bail; 491 goto bail;
491 } 492 }
@@ -506,11 +507,11 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
506 int level) 507 int level)
507{ 508{
508 switch(level) { 509 switch(level) {
509 case LKM_EXMODE: 510 case DLM_LOCK_EX:
510 BUG_ON(!lockres->l_ex_holders); 511 BUG_ON(!lockres->l_ex_holders);
511 lockres->l_ex_holders--; 512 lockres->l_ex_holders--;
512 break; 513 break;
513 case LKM_PRMODE: 514 case DLM_LOCK_PR:
514 BUG_ON(!lockres->l_ro_holders); 515 BUG_ON(!lockres->l_ro_holders);
515 lockres->l_ro_holders--; 516 lockres->l_ro_holders--;
516 break; 517 break;
@@ -522,8 +523,8 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
522void user_dlm_cluster_unlock(struct user_lock_res *lockres, 523void user_dlm_cluster_unlock(struct user_lock_res *lockres,
523 int level) 524 int level)
524{ 525{
525 if (level != LKM_EXMODE && 526 if (level != DLM_LOCK_EX &&
526 level != LKM_PRMODE) { 527 level != DLM_LOCK_PR) {
527 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 528 mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
528 lockres->l_namelen, lockres->l_name); 529 lockres->l_namelen, lockres->l_name);
529 return; 530 return;
@@ -540,33 +541,40 @@ void user_dlm_write_lvb(struct inode *inode,
540 unsigned int len) 541 unsigned int len)
541{ 542{
542 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 543 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
543 char *lvb = lockres->l_lksb.lvb; 544 char *lvb;
544 545
545 BUG_ON(len > DLM_LVB_LEN); 546 BUG_ON(len > DLM_LVB_LEN);
546 547
547 spin_lock(&lockres->l_lock); 548 spin_lock(&lockres->l_lock);
548 549
549 BUG_ON(lockres->l_level < LKM_EXMODE); 550 BUG_ON(lockres->l_level < DLM_LOCK_EX);
551 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
550 memcpy(lvb, val, len); 552 memcpy(lvb, val, len);
551 553
552 spin_unlock(&lockres->l_lock); 554 spin_unlock(&lockres->l_lock);
553} 555}
554 556
555void user_dlm_read_lvb(struct inode *inode, 557ssize_t user_dlm_read_lvb(struct inode *inode,
556 char *val, 558 char *val,
557 unsigned int len) 559 unsigned int len)
558{ 560{
559 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 561 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
560 char *lvb = lockres->l_lksb.lvb; 562 char *lvb;
563 ssize_t ret = len;
561 564
562 BUG_ON(len > DLM_LVB_LEN); 565 BUG_ON(len > DLM_LVB_LEN);
563 566
564 spin_lock(&lockres->l_lock); 567 spin_lock(&lockres->l_lock);
565 568
566 BUG_ON(lockres->l_level < LKM_PRMODE); 569 BUG_ON(lockres->l_level < DLM_LOCK_PR);
567 memcpy(val, lvb, len); 570 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
571 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
572 memcpy(val, lvb, len);
573 } else
574 ret = 0;
568 575
569 spin_unlock(&lockres->l_lock); 576 spin_unlock(&lockres->l_lock);
577 return ret;
570} 578}
571 579
572void user_dlm_lock_res_init(struct user_lock_res *lockres, 580void user_dlm_lock_res_init(struct user_lock_res *lockres,
@@ -576,9 +584,9 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
576 584
577 spin_lock_init(&lockres->l_lock); 585 spin_lock_init(&lockres->l_lock);
578 init_waitqueue_head(&lockres->l_event); 586 init_waitqueue_head(&lockres->l_event);
579 lockres->l_level = LKM_IVMODE; 587 lockres->l_level = DLM_LOCK_IV;
580 lockres->l_requested = LKM_IVMODE; 588 lockres->l_requested = DLM_LOCK_IV;
581 lockres->l_blocking = LKM_IVMODE; 589 lockres->l_blocking = DLM_LOCK_IV;
582 590
583 /* should have been checked before getting here. */ 591 /* should have been checked before getting here. */
584 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); 592 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
@@ -592,7 +600,8 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres,
592int user_dlm_destroy_lock(struct user_lock_res *lockres) 600int user_dlm_destroy_lock(struct user_lock_res *lockres)
593{ 601{
594 int status = -EBUSY; 602 int status = -EBUSY;
595 struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); 603 struct ocfs2_cluster_connection *conn =
604 cluster_connection_from_user_lockres(lockres);
596 605
597 mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name); 606 mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
598 607
@@ -627,14 +636,9 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres)
627 lockres->l_flags |= USER_LOCK_BUSY; 636 lockres->l_flags |= USER_LOCK_BUSY;
628 spin_unlock(&lockres->l_lock); 637 spin_unlock(&lockres->l_lock);
629 638
630 status = dlmunlock(dlm, 639 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
631 &lockres->l_lksb, 640 if (status) {
632 LKM_VALBLK, 641 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
633 user_unlock_ast,
634 lockres);
635 if (status != DLM_NORMAL) {
636 user_log_dlm_error("dlmunlock", status, lockres);
637 status = -EINVAL;
638 goto bail; 642 goto bail;
639 } 643 }
640 644
@@ -645,32 +649,34 @@ bail:
645 return status; 649 return status;
646} 650}
647 651
648struct dlm_ctxt *user_dlm_register_context(struct qstr *name, 652static void user_dlm_recovery_handler_noop(int node_num,
649 struct dlm_protocol_version *proto) 653 void *recovery_data)
650{ 654{
651 struct dlm_ctxt *dlm; 655 /* We ignore recovery events */
652 u32 dlm_key; 656 return;
653 char *domain; 657}
654
655 domain = kmalloc(name->len + 1, GFP_NOFS);
656 if (!domain) {
657 mlog_errno(-ENOMEM);
658 return ERR_PTR(-ENOMEM);
659 }
660 658
661 dlm_key = crc32_le(0, name->name, name->len); 659void user_dlm_set_locking_protocol(void)
660{
661 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
662}
662 663
663 snprintf(domain, name->len + 1, "%.*s", name->len, name->name); 664struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name)
665{
666 int rc;
667 struct ocfs2_cluster_connection *conn;
664 668
665 dlm = dlm_register_domain(domain, dlm_key, proto); 669 rc = ocfs2_cluster_connect("o2cb", name->name, name->len,
666 if (IS_ERR(dlm)) 670 &user_dlm_lproto,
667 mlog_errno(PTR_ERR(dlm)); 671 user_dlm_recovery_handler_noop,
672 NULL, &conn);
673 if (rc)
674 mlog_errno(rc);
668 675
669 kfree(domain); 676 return rc ? ERR_PTR(rc) : conn;
670 return dlm;
671} 677}
672 678
673void user_dlm_unregister_context(struct dlm_ctxt *dlm) 679void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
674{ 680{
675 dlm_unregister_domain(dlm); 681 ocfs2_cluster_disconnect(conn, 0);
676} 682}
diff --git a/fs/ocfs2/dlmfs/userdlm.h b/fs/ocfs2/dlmfs/userdlm.h
index 0c3cc03c61fa..3b42d79531d7 100644
--- a/fs/ocfs2/dlmfs/userdlm.h
+++ b/fs/ocfs2/dlmfs/userdlm.h
@@ -57,7 +57,7 @@ struct user_lock_res {
57 int l_level; 57 int l_level;
58 unsigned int l_ro_holders; 58 unsigned int l_ro_holders;
59 unsigned int l_ex_holders; 59 unsigned int l_ex_holders;
60 struct dlm_lockstatus l_lksb; 60 struct ocfs2_dlm_lksb l_lksb;
61 61
62 int l_requested; 62 int l_requested;
63 int l_blocking; 63 int l_blocking;
@@ -80,15 +80,15 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres,
80void user_dlm_write_lvb(struct inode *inode, 80void user_dlm_write_lvb(struct inode *inode,
81 const char *val, 81 const char *val,
82 unsigned int len); 82 unsigned int len);
83void user_dlm_read_lvb(struct inode *inode, 83ssize_t user_dlm_read_lvb(struct inode *inode,
84 char *val, 84 char *val,
85 unsigned int len); 85 unsigned int len);
86struct dlm_ctxt *user_dlm_register_context(struct qstr *name, 86struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name);
87 struct dlm_protocol_version *proto); 87void user_dlm_unregister(struct ocfs2_cluster_connection *conn);
88void user_dlm_unregister_context(struct dlm_ctxt *dlm); 88void user_dlm_set_locking_protocol(void);
89 89
90struct dlmfs_inode_private { 90struct dlmfs_inode_private {
91 struct dlm_ctxt *ip_dlm; 91 struct ocfs2_cluster_connection *ip_conn;
92 92
93 struct user_lock_res ip_lockres; /* unused for directories. */ 93 struct user_lock_res ip_lockres; /* unused for directories. */
94 struct inode *ip_parent; 94 struct inode *ip_parent;