aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
authorMark Fasheh <mark.fasheh@oracle.com>2006-09-08 17:14:34 -0400
committerMark Fasheh <mark.fasheh@oracle.com>2006-09-24 16:50:42 -0400
commitd680efe9d8fe0eb99d9dd063a4def6b362cdb40d (patch)
tree51e8c081c673240434dce4b44bf66fbfd4dddf30 /fs/ocfs2/dlmglue.c
parentf0681062b8e369d9fb6f3ce10f4e3fc8cea5f910 (diff)
ocfs2: Add new cluster lock type
Replace the dentry vote mechanism with a cluster lock which covers a set of dentries. This allows us to force d_delete() only on nodes which actually care about an unlink. Every node that does a ->lookup() gets a read only lock on the dentry, until an unlink during which the unlinking node, will request an exclusive lock, forcing the other nodes who care about that dentry to d_delete() it. The effect is that we retain a very lightweight ->d_revalidate(), and at the same time get to make large improvements to the average case performance of the ocfs2 unlink and rename operations. This patch adds the cluster lock type which OCFS2 can attach to dentries. A small number of fs/ocfs2/dcache.c functions are stubbed out so that this change can compile. Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c475
1 files changed, 371 insertions, 104 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 20c6ca8ac7fd..764d15defd88 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -46,6 +46,7 @@
46#include "ocfs2.h" 46#include "ocfs2.h"
47 47
48#include "alloc.h" 48#include "alloc.h"
49#include "dcache.h"
49#include "dlmglue.h" 50#include "dlmglue.h"
50#include "extent_map.h" 51#include "extent_map.h"
51#include "heartbeat.h" 52#include "heartbeat.h"
@@ -69,6 +70,9 @@ struct ocfs2_mask_waiter {
69static void ocfs2_inode_ast_func(void *opaque); 70static void ocfs2_inode_ast_func(void *opaque);
70static void ocfs2_inode_bast_func(void *opaque, 71static void ocfs2_inode_bast_func(void *opaque,
71 int level); 72 int level);
73static void ocfs2_dentry_ast_func(void *opaque);
74static void ocfs2_dentry_bast_func(void *opaque,
75 int level);
72static void ocfs2_super_ast_func(void *opaque); 76static void ocfs2_super_ast_func(void *opaque);
73static void ocfs2_super_bast_func(void *opaque, 77static void ocfs2_super_bast_func(void *opaque,
74 int level); 78 int level);
@@ -76,32 +80,57 @@ static void ocfs2_rename_ast_func(void *opaque);
76static void ocfs2_rename_bast_func(void *opaque, 80static void ocfs2_rename_bast_func(void *opaque,
77 int level); 81 int level);
78 82
83/*
84 * Return value from ocfs2_convert_worker_t functions.
85 *
86 * These control the precise actions of ocfs2_generic_unblock_lock()
87 * and ocfs2_process_blocked_lock()
88 *
89 */
90enum ocfs2_unblock_action {
91 UNBLOCK_CONTINUE = 0, /* Continue downconvert */
92 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire
93 * ->post_unlock callback */
94 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire
95 * ->post_unlock() callback. */
96};
97
98struct ocfs2_unblock_ctl {
99 int requeue;
100 enum ocfs2_unblock_action unblock_action;
101};
102
79/* so far, all locks have gotten along with the same unlock ast */ 103/* so far, all locks have gotten along with the same unlock ast */
80static void ocfs2_unlock_ast_func(void *opaque, 104static void ocfs2_unlock_ast_func(void *opaque,
81 enum dlm_status status); 105 enum dlm_status status);
82static int ocfs2_do_unblock_meta(struct inode *inode,
83 int *requeue);
84static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, 106static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
85 int *requeue); 107 struct ocfs2_unblock_ctl *ctl);
86static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, 108static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
87 int *requeue); 109 struct ocfs2_unblock_ctl *ctl);
88static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, 110static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
89 int *requeue); 111 struct ocfs2_unblock_ctl *ctl);
112static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
113 struct ocfs2_unblock_ctl *ctl);
90static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, 114static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
91 int *requeue); 115 struct ocfs2_unblock_ctl *ctl);
92typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int); 116
93static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, 117static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
94 struct ocfs2_lock_res *lockres, 118 struct ocfs2_lock_res *lockres);
95 int *requeue,
96 ocfs2_convert_worker_t *worker);
97 119
98struct ocfs2_lock_res_ops { 120struct ocfs2_lock_res_ops {
99 void (*ast)(void *); 121 void (*ast)(void *);
100 void (*bast)(void *, int); 122 void (*bast)(void *, int);
101 void (*unlock_ast)(void *, enum dlm_status); 123 void (*unlock_ast)(void *, enum dlm_status);
102 int (*unblock)(struct ocfs2_lock_res *, int *); 124 int (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
125 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
103}; 126};
104 127
128typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
129static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
130 struct ocfs2_lock_res *lockres,
131 struct ocfs2_unblock_ctl *ctl,
132 ocfs2_convert_worker_t *worker);
133
105static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 134static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
106 .ast = ocfs2_inode_ast_func, 135 .ast = ocfs2_inode_ast_func,
107 .bast = ocfs2_inode_bast_func, 136 .bast = ocfs2_inode_bast_func,
@@ -116,9 +145,6 @@ static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
116 .unblock = ocfs2_unblock_meta, 145 .unblock = ocfs2_unblock_meta,
117}; 146};
118 147
119static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
120 int blocking);
121
122static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = { 148static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
123 .ast = ocfs2_inode_ast_func, 149 .ast = ocfs2_inode_ast_func,
124 .bast = ocfs2_inode_bast_func, 150 .bast = ocfs2_inode_bast_func,
@@ -140,6 +166,14 @@ static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
140 .unblock = ocfs2_unblock_osb_lock, 166 .unblock = ocfs2_unblock_osb_lock,
141}; 167};
142 168
169static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
170 .ast = ocfs2_dentry_ast_func,
171 .bast = ocfs2_dentry_bast_func,
172 .unlock_ast = ocfs2_unlock_ast_func,
173 .unblock = ocfs2_unblock_dentry_lock,
174 .post_unlock = ocfs2_dentry_post_unlock,
175};
176
143static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 177static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
144{ 178{
145 return lockres->l_type == OCFS2_LOCK_TYPE_META || 179 return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -172,6 +206,13 @@ static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
172 return (struct inode *) lockres->l_priv; 206 return (struct inode *) lockres->l_priv;
173} 207}
174 208
209static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
210{
211 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
212
213 return (struct ocfs2_dentry_lock *)lockres->l_priv;
214}
215
175static int ocfs2_lock_create(struct ocfs2_super *osb, 216static int ocfs2_lock_create(struct ocfs2_super *osb,
176 struct ocfs2_lock_res *lockres, 217 struct ocfs2_lock_res *lockres,
177 int level, 218 int level,
@@ -204,22 +245,6 @@ static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
204 struct ocfs2_lock_res *lockres, 245 struct ocfs2_lock_res *lockres,
205 int new_level); 246 int new_level);
206 247
207static char *ocfs2_lock_type_strings[] = {
208 [OCFS2_LOCK_TYPE_META] = "Meta",
209 [OCFS2_LOCK_TYPE_DATA] = "Data",
210 [OCFS2_LOCK_TYPE_SUPER] = "Super",
211 [OCFS2_LOCK_TYPE_RENAME] = "Rename",
212 /* Need to differntiate from [R]ename.. serializing writes is the
213 * important job it does, anyway. */
214 [OCFS2_LOCK_TYPE_RW] = "Write/Read",
215};
216
217static char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
218{
219 mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
220 return ocfs2_lock_type_strings[type];
221}
222
223static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 248static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
224 u64 blkno, 249 u64 blkno,
225 u32 generation, 250 u32 generation,
@@ -265,13 +290,9 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
265static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 290static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
266 struct ocfs2_lock_res *res, 291 struct ocfs2_lock_res *res,
267 enum ocfs2_lock_type type, 292 enum ocfs2_lock_type type,
268 u64 blkno,
269 u32 generation,
270 struct ocfs2_lock_res_ops *ops, 293 struct ocfs2_lock_res_ops *ops,
271 void *priv) 294 void *priv)
272{ 295{
273 ocfs2_build_lock_name(type, blkno, generation, res->l_name);
274
275 res->l_type = type; 296 res->l_type = type;
276 res->l_ops = ops; 297 res->l_ops = ops;
277 res->l_priv = priv; 298 res->l_priv = priv;
@@ -319,9 +340,59 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
319 break; 340 break;
320 }; 341 };
321 342
322 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, 343 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
323 OCFS2_I(inode)->ip_blkno, 344 inode->i_generation, res->l_name);
324 inode->i_generation, ops, inode); 345 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
346}
347
348static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
349{
350 __be64 inode_blkno_be;
351
352 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
353 sizeof(__be64));
354
355 return be64_to_cpu(inode_blkno_be);
356}
357
358void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
359 u64 parent, struct inode *inode)
360{
361 int len;
362 u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
363 __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
364 struct ocfs2_lock_res *lockres = &dl->dl_lockres;
365
366 ocfs2_lock_res_init_once(lockres);
367
368 /*
369 * Unfortunately, the standard lock naming scheme won't work
370 * here because we have two 16 byte values to use. Instead,
371 * we'll stuff the inode number as a binary value. We still
372 * want error prints to show something without garbling the
373 * display, so drop a null byte in there before the inode
374 * number. A future version of OCFS2 will likely use all
375 * binary lock names. The stringified names have been a
376 * tremendous aid in debugging, but now that the debugfs
377 * interface exists, we can mangle things there if need be.
378 *
379 * NOTE: We also drop the standard "pad" value (the total lock
380 * name size stays the same though - the last part is all
381 * zeros due to the memset in ocfs2_lock_res_init_once()
382 */
383 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
384 "%c%016llx",
385 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
386 (long long)parent);
387
388 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
389
390 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
391 sizeof(__be64));
392
393 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
394 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
395 dl);
325} 396}
326 397
327static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 398static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
@@ -330,8 +401,9 @@ static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
330 /* Superblock lockres doesn't come from a slab so we call init 401 /* Superblock lockres doesn't come from a slab so we call init
331 * once on it manually. */ 402 * once on it manually. */
332 ocfs2_lock_res_init_once(res); 403 ocfs2_lock_res_init_once(res);
404 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
405 0, res->l_name);
333 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 406 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
334 OCFS2_SUPER_BLOCK_BLKNO, 0,
335 &ocfs2_super_lops, osb); 407 &ocfs2_super_lops, osb);
336} 408}
337 409
@@ -341,7 +413,8 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
341 /* Rename lockres doesn't come from a slab so we call init 413 /* Rename lockres doesn't come from a slab so we call init
342 * once on it manually. */ 414 * once on it manually. */
343 ocfs2_lock_res_init_once(res); 415 ocfs2_lock_res_init_once(res);
344 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0, 416 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
417 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
345 &ocfs2_rename_lops, osb); 418 &ocfs2_rename_lops, osb);
346} 419}
347 420
@@ -627,9 +700,10 @@ static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
627 ocfs2_schedule_blocked_lock(osb, lockres); 700 ocfs2_schedule_blocked_lock(osb, lockres);
628 spin_unlock_irqrestore(&lockres->l_lock, flags); 701 spin_unlock_irqrestore(&lockres->l_lock, flags);
629 702
703 wake_up(&lockres->l_event);
704
630 ocfs2_kick_vote_thread(osb); 705 ocfs2_kick_vote_thread(osb);
631 706
632 wake_up(&lockres->l_event);
633 mlog_exit_void(); 707 mlog_exit_void();
634} 708}
635 709
@@ -690,9 +764,9 @@ static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
690 /* set it to something invalid so if we get called again we 764 /* set it to something invalid so if we get called again we
691 * can catch it. */ 765 * can catch it. */
692 lockres->l_action = OCFS2_AST_INVALID; 766 lockres->l_action = OCFS2_AST_INVALID;
693 spin_unlock_irqrestore(&lockres->l_lock, flags);
694 767
695 wake_up(&lockres->l_event); 768 wake_up(&lockres->l_event);
769 spin_unlock_irqrestore(&lockres->l_lock, flags);
696} 770}
697 771
698static void ocfs2_super_ast_func(void *opaque) 772static void ocfs2_super_ast_func(void *opaque)
@@ -757,6 +831,27 @@ static void ocfs2_rename_bast_func(void *opaque,
757 mlog_exit_void(); 831 mlog_exit_void();
758} 832}
759 833
834static void ocfs2_dentry_ast_func(void *opaque)
835{
836 struct ocfs2_lock_res *lockres = opaque;
837
838 BUG_ON(!lockres);
839
840 ocfs2_generic_ast_func(lockres, 1);
841}
842
843static void ocfs2_dentry_bast_func(void *opaque, int level)
844{
845 struct ocfs2_lock_res *lockres = opaque;
846 struct ocfs2_dentry_lock *dl = lockres->l_priv;
847 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
848
849 mlog(0, "Dentry bast: level: %d, name: %s\n", level,
850 lockres->l_name);
851
852 ocfs2_generic_bast_func(osb, lockres, level);
853}
854
760static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 855static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
761 int convert) 856 int convert)
762{ 857{
@@ -1076,10 +1171,11 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1076 mlog_exit_void(); 1171 mlog_exit_void();
1077} 1172}
1078 1173
1079static int ocfs2_create_new_inode_lock(struct inode *inode, 1174int ocfs2_create_new_lock(struct ocfs2_super *osb,
1080 struct ocfs2_lock_res *lockres) 1175 struct ocfs2_lock_res *lockres,
1176 int ex)
1081{ 1177{
1082 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1178 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1083 unsigned long flags; 1179 unsigned long flags;
1084 1180
1085 spin_lock_irqsave(&lockres->l_lock, flags); 1181 spin_lock_irqsave(&lockres->l_lock, flags);
@@ -1087,7 +1183,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
1087 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1183 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1088 spin_unlock_irqrestore(&lockres->l_lock, flags); 1184 spin_unlock_irqrestore(&lockres->l_lock, flags);
1089 1185
1090 return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL); 1186 return ocfs2_lock_create(osb, lockres, level, LKM_LOCAL);
1091} 1187}
1092 1188
1093/* Grants us an EX lock on the data and metadata resources, skipping 1189/* Grants us an EX lock on the data and metadata resources, skipping
@@ -1099,6 +1195,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
1099int ocfs2_create_new_inode_locks(struct inode *inode) 1195int ocfs2_create_new_inode_locks(struct inode *inode)
1100{ 1196{
1101 int ret; 1197 int ret;
1198 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1102 1199
1103 BUG_ON(!inode); 1200 BUG_ON(!inode);
1104 BUG_ON(!ocfs2_inode_is_new(inode)); 1201 BUG_ON(!ocfs2_inode_is_new(inode));
@@ -1115,22 +1212,19 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
1115 * on a resource which has an invalid one -- we'll set it 1212 * on a resource which has an invalid one -- we'll set it
1116 * valid when we release the EX. */ 1213 * valid when we release the EX. */
1117 1214
1118 ret = ocfs2_create_new_inode_lock(inode, 1215 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1);
1119 &OCFS2_I(inode)->ip_rw_lockres);
1120 if (ret) { 1216 if (ret) {
1121 mlog_errno(ret); 1217 mlog_errno(ret);
1122 goto bail; 1218 goto bail;
1123 } 1219 }
1124 1220
1125 ret = ocfs2_create_new_inode_lock(inode, 1221 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1);
1126 &OCFS2_I(inode)->ip_meta_lockres);
1127 if (ret) { 1222 if (ret) {
1128 mlog_errno(ret); 1223 mlog_errno(ret);
1129 goto bail; 1224 goto bail;
1130 } 1225 }
1131 1226
1132 ret = ocfs2_create_new_inode_lock(inode, 1227 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1);
1133 &OCFS2_I(inode)->ip_data_lockres);
1134 if (ret) { 1228 if (ret) {
1135 mlog_errno(ret); 1229 mlog_errno(ret);
1136 goto bail; 1230 goto bail;
@@ -1809,6 +1903,34 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
1809 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 1903 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1810} 1904}
1811 1905
1906int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1907{
1908 int ret;
1909 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1910 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1911 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1912
1913 BUG_ON(!dl);
1914
1915 if (ocfs2_is_hard_readonly(osb))
1916 return -EROFS;
1917
1918 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1919 if (ret < 0)
1920 mlog_errno(ret);
1921
1922 return ret;
1923}
1924
1925void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1926{
1927 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1928 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1929 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1930
1931 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1932}
1933
1812/* Reference counting of the dlm debug structure. We want this because 1934/* Reference counting of the dlm debug structure. We want this because
1813 * open references on the debug inodes can live on after a mount, so 1935 * open references on the debug inodes can live on after a mount, so
1814 * we can't rely on the ocfs2_super to always exist. */ 1936 * we can't rely on the ocfs2_super to always exist. */
@@ -1939,9 +2061,16 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1939 if (!lockres) 2061 if (!lockres)
1940 return -EINVAL; 2062 return -EINVAL;
1941 2063
1942 seq_printf(m, "0x%x\t" 2064 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1943 "%.*s\t" 2065
1944 "%d\t" 2066 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2067 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2068 lockres->l_name,
2069 (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2070 else
2071 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2072
2073 seq_printf(m, "%d\t"
1945 "0x%lx\t" 2074 "0x%lx\t"
1946 "0x%x\t" 2075 "0x%x\t"
1947 "0x%x\t" 2076 "0x%x\t"
@@ -1949,8 +2078,6 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1949 "%u\t" 2078 "%u\t"
1950 "%d\t" 2079 "%d\t"
1951 "%d\t", 2080 "%d\t",
1952 OCFS2_DLM_DEBUG_STR_VERSION,
1953 OCFS2_LOCK_ID_MAX_LEN, lockres->l_name,
1954 lockres->l_level, 2081 lockres->l_level,
1955 lockres->l_flags, 2082 lockres->l_flags,
1956 lockres->l_action, 2083 lockres->l_action,
@@ -2311,25 +2438,21 @@ void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2311 spin_unlock_irqrestore(&lockres->l_lock, flags); 2438 spin_unlock_irqrestore(&lockres->l_lock, flags);
2312} 2439}
2313 2440
2314static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2441void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2442 struct ocfs2_lock_res *lockres)
2315{ 2443{
2316 int status; 2444 int ret;
2317
2318 mlog_entry_void();
2319
2320 ocfs2_mark_lockres_freeing(&osb->osb_super_lockres);
2321
2322 status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL);
2323 if (status < 0)
2324 mlog_errno(status);
2325
2326 ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres);
2327 2445
2328 status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL); 2446 ocfs2_mark_lockres_freeing(lockres);
2329 if (status < 0) 2447 ret = ocfs2_drop_lock(osb, lockres, NULL);
2330 mlog_errno(status); 2448 if (ret)
2449 mlog_errno(ret);
2450}
2331 2451
2332 mlog_exit(status); 2452static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2453{
2454 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2455 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2333} 2456}
2334 2457
2335static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data) 2458static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
@@ -2599,7 +2722,7 @@ leave:
2599 2722
2600static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, 2723static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2601 struct ocfs2_lock_res *lockres, 2724 struct ocfs2_lock_res *lockres,
2602 int *requeue, 2725 struct ocfs2_unblock_ctl *ctl,
2603 ocfs2_convert_worker_t *worker) 2726 ocfs2_convert_worker_t *worker)
2604{ 2727{
2605 unsigned long flags; 2728 unsigned long flags;
@@ -2615,7 +2738,7 @@ static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2615 2738
2616recheck: 2739recheck:
2617 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2740 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2618 *requeue = 1; 2741 ctl->requeue = 1;
2619 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2742 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2620 spin_unlock_irqrestore(&lockres->l_lock, flags); 2743 spin_unlock_irqrestore(&lockres->l_lock, flags);
2621 if (ret) { 2744 if (ret) {
@@ -2631,7 +2754,7 @@ recheck:
2631 if ((lockres->l_blocking == LKM_EXMODE) 2754 if ((lockres->l_blocking == LKM_EXMODE)
2632 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 2755 && (lockres->l_ex_holders || lockres->l_ro_holders)) {
2633 spin_unlock_irqrestore(&lockres->l_lock, flags); 2756 spin_unlock_irqrestore(&lockres->l_lock, flags);
2634 *requeue = 1; 2757 ctl->requeue = 1;
2635 ret = 0; 2758 ret = 0;
2636 goto leave; 2759 goto leave;
2637 } 2760 }
@@ -2641,7 +2764,7 @@ recheck:
2641 if (lockres->l_blocking == LKM_PRMODE && 2764 if (lockres->l_blocking == LKM_PRMODE &&
2642 lockres->l_ex_holders) { 2765 lockres->l_ex_holders) {
2643 spin_unlock_irqrestore(&lockres->l_lock, flags); 2766 spin_unlock_irqrestore(&lockres->l_lock, flags);
2644 *requeue = 1; 2767 ctl->requeue = 1;
2645 ret = 0; 2768 ret = 0;
2646 goto leave; 2769 goto leave;
2647 } 2770 }
@@ -2659,7 +2782,10 @@ recheck:
2659 blocking = lockres->l_blocking; 2782 blocking = lockres->l_blocking;
2660 spin_unlock_irqrestore(&lockres->l_lock, flags); 2783 spin_unlock_irqrestore(&lockres->l_lock, flags);
2661 2784
2662 worker(lockres, blocking); 2785 ctl->unblock_action = worker(lockres, blocking);
2786
2787 if (ctl->unblock_action == UNBLOCK_STOP_POST)
2788 goto leave;
2663 2789
2664 spin_lock_irqsave(&lockres->l_lock, flags); 2790 spin_lock_irqsave(&lockres->l_lock, flags);
2665 if (blocking != lockres->l_blocking) { 2791 if (blocking != lockres->l_blocking) {
@@ -2669,7 +2795,7 @@ recheck:
2669 } 2795 }
2670 2796
2671downconvert: 2797downconvert:
2672 *requeue = 0; 2798 ctl->requeue = 0;
2673 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2799 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2674 2800
2675 ocfs2_prepare_downconvert(lockres, new_level); 2801 ocfs2_prepare_downconvert(lockres, new_level);
@@ -2680,14 +2806,12 @@ leave:
2680 return ret; 2806 return ret;
2681} 2807}
2682 2808
2683static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 2809static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2684 int blocking) 2810 int blocking)
2685{ 2811{
2686 struct inode *inode; 2812 struct inode *inode;
2687 struct address_space *mapping; 2813 struct address_space *mapping;
2688 2814
2689 mlog_entry_void();
2690
2691 inode = ocfs2_lock_res_inode(lockres); 2815 inode = ocfs2_lock_res_inode(lockres);
2692 mapping = inode->i_mapping; 2816 mapping = inode->i_mapping;
2693 2817
@@ -2708,11 +2832,11 @@ static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2708 filemap_fdatawait(mapping); 2832 filemap_fdatawait(mapping);
2709 } 2833 }
2710 2834
2711 mlog_exit_void(); 2835 return UNBLOCK_CONTINUE;
2712} 2836}
2713 2837
2714int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, 2838int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2715 int *requeue) 2839 struct ocfs2_unblock_ctl *ctl)
2716{ 2840{
2717 int status; 2841 int status;
2718 struct inode *inode; 2842 struct inode *inode;
@@ -2726,22 +2850,20 @@ int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2726 mlog(0, "unblock inode %llu\n", 2850 mlog(0, "unblock inode %llu\n",
2727 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2851 (unsigned long long)OCFS2_I(inode)->ip_blkno);
2728 2852
2729 status = ocfs2_generic_unblock_lock(osb, 2853 status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2730 lockres,
2731 requeue,
2732 ocfs2_data_convert_worker); 2854 ocfs2_data_convert_worker);
2733 if (status < 0) 2855 if (status < 0)
2734 mlog_errno(status); 2856 mlog_errno(status);
2735 2857
2736 mlog(0, "inode %llu, requeue = %d\n", 2858 mlog(0, "inode %llu, requeue = %d\n",
2737 (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue); 2859 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2738 2860
2739 mlog_exit(status); 2861 mlog_exit(status);
2740 return status; 2862 return status;
2741} 2863}
2742 2864
2743static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, 2865static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2744 int *requeue) 2866 struct ocfs2_unblock_ctl *ctl)
2745{ 2867{
2746 int status; 2868 int status;
2747 struct inode *inode; 2869 struct inode *inode;
@@ -2753,9 +2875,7 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2753 inode = ocfs2_lock_res_inode(lockres); 2875 inode = ocfs2_lock_res_inode(lockres);
2754 2876
2755 status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb), 2877 status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2756 lockres, 2878 lockres, ctl, NULL);
2757 requeue,
2758 NULL);
2759 if (status < 0) 2879 if (status < 0)
2760 mlog_errno(status); 2880 mlog_errno(status);
2761 2881
@@ -2763,9 +2883,8 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2763 return status; 2883 return status;
2764} 2884}
2765 2885
2766 2886static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2767int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, 2887 struct ocfs2_unblock_ctl *ctl)
2768 int *requeue)
2769{ 2888{
2770 int status; 2889 int status;
2771 struct inode *inode; 2890 struct inode *inode;
@@ -2777,21 +2896,165 @@ int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2777 mlog(0, "unblock inode %llu\n", 2896 mlog(0, "unblock inode %llu\n",
2778 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2897 (unsigned long long)OCFS2_I(inode)->ip_blkno);
2779 2898
2780 status = ocfs2_do_unblock_meta(inode, requeue); 2899 status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2781 if (status < 0) 2900 if (status < 0)
2782 mlog_errno(status); 2901 mlog_errno(status);
2783 2902
2784 mlog(0, "inode %llu, requeue = %d\n", 2903 mlog(0, "inode %llu, requeue = %d\n",
2785 (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue); 2904 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2786 2905
2787 mlog_exit(status); 2906 mlog_exit(status);
2788 return status; 2907 return status;
2789} 2908}
2790 2909
2910/*
2911 * Does the final reference drop on our dentry lock. Right now this
2912 * happens in the vote thread, but we could choose to simplify the
2913 * dlmglue API and push these off to the ocfs2_wq in the future.
2914 */
2915static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2916 struct ocfs2_lock_res *lockres)
2917{
2918 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2919 ocfs2_dentry_lock_put(osb, dl);
2920}
2921
2922/*
2923 * d_delete() matching dentries before the lock downconvert.
2924 *
2925 * At this point, any process waiting to destroy the
2926 * dentry_lock due to last ref count is stopped by the
2927 * OCFS2_LOCK_QUEUED flag.
2928 *
2929 * We have two potential problems
2930 *
2931 * 1) If we do the last reference drop on our dentry_lock (via dput)
2932 * we'll wind up in ocfs2_release_dentry_lock(), waiting on
2933 * the downconvert to finish. Instead we take an elevated
2934 * reference and push the drop until after we've completed our
2935 * unblock processing.
2936 *
2937 * 2) There might be another process with a final reference,
2938 * waiting on us to finish processing. If this is the case, we
2939 * detect it and exit out - there's no more dentries anyway.
2940 */
2941static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2942 int blocking)
2943{
2944 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2945 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2946 struct dentry *dentry;
2947 unsigned long flags;
2948 int extra_ref = 0;
2949
2950 /*
2951 * This node is blocking another node from getting a read
2952 * lock. This happens when we've renamed within a
2953 * directory. We've forced the other nodes to d_delete(), but
2954 * we never actually dropped our lock because it's still
2955 * valid. The downconvert code will retain a PR for this node,
2956 * so there's no further work to do.
2957 */
2958 if (blocking == LKM_PRMODE)
2959 return UNBLOCK_CONTINUE;
2960
2961 /*
2962 * Mark this inode as potentially orphaned. The code in
2963 * ocfs2_delete_inode() will figure out whether it actually
2964 * needs to be freed or not.
2965 */
2966 spin_lock(&oi->ip_lock);
2967 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2968 spin_unlock(&oi->ip_lock);
2969
2970 /*
2971 * Yuck. We need to make sure however that the check of
2972 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2973 * respect to a reference decrement or the setting of that
2974 * flag.
2975 */
2976 spin_lock_irqsave(&lockres->l_lock, flags);
2977 spin_lock(&dentry_attach_lock);
2978 if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2979 && dl->dl_count) {
2980 dl->dl_count++;
2981 extra_ref = 1;
2982 }
2983 spin_unlock(&dentry_attach_lock);
2984 spin_unlock_irqrestore(&lockres->l_lock, flags);
2985
2986 mlog(0, "extra_ref = %d\n", extra_ref);
2987
2988 /*
2989 * We have a process waiting on us in ocfs2_dentry_iput(),
2990 * which means we can't have any more outstanding
2991 * aliases. There's no need to do any more work.
2992 */
2993 if (!extra_ref)
2994 return UNBLOCK_CONTINUE;
2995
2996 spin_lock(&dentry_attach_lock);
2997 while (1) {
2998 dentry = ocfs2_find_local_alias(dl->dl_inode,
2999 dl->dl_parent_blkno, 1);
3000 if (!dentry)
3001 break;
3002 spin_unlock(&dentry_attach_lock);
3003
3004 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3005 dentry->d_name.name);
3006
3007 /*
3008 * The following dcache calls may do an
3009 * iput(). Normally we don't want that from the
3010 * downconverting thread, but in this case it's ok
3011 * because the requesting node already has an
3012 * exclusive lock on the inode, so it can't be queued
3013 * for a downconvert.
3014 */
3015 d_delete(dentry);
3016 dput(dentry);
3017
3018 spin_lock(&dentry_attach_lock);
3019 }
3020 spin_unlock(&dentry_attach_lock);
3021
3022 /*
3023 * If we are the last holder of this dentry lock, there is no
3024 * reason to downconvert so skip straight to the unlock.
3025 */
3026 if (dl->dl_count == 1)
3027 return UNBLOCK_STOP_POST;
3028
3029 return UNBLOCK_CONTINUE_POST;
3030}
3031
3032static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
3033 struct ocfs2_unblock_ctl *ctl)
3034{
3035 int ret;
3036 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3037 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
3038
3039 mlog(0, "unblock dentry lock: %llu\n",
3040 (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
3041
3042 ret = ocfs2_generic_unblock_lock(osb,
3043 lockres,
3044 ctl,
3045 ocfs2_dentry_convert_worker);
3046 if (ret < 0)
3047 mlog_errno(ret);
3048
3049 mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
3050
3051 return ret;
3052}
3053
2791/* Generic unblock function for any lockres whose private data is an 3054/* Generic unblock function for any lockres whose private data is an
2792 * ocfs2_super pointer. */ 3055 * ocfs2_super pointer. */
2793static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, 3056static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2794 int *requeue) 3057 struct ocfs2_unblock_ctl *ctl)
2795{ 3058{
2796 int status; 3059 int status;
2797 struct ocfs2_super *osb; 3060 struct ocfs2_super *osb;
@@ -2804,7 +3067,7 @@ static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2804 3067
2805 status = ocfs2_generic_unblock_lock(osb, 3068 status = ocfs2_generic_unblock_lock(osb,
2806 lockres, 3069 lockres,
2807 requeue, 3070 ctl,
2808 NULL); 3071 NULL);
2809 if (status < 0) 3072 if (status < 0)
2810 mlog_errno(status); 3073 mlog_errno(status);
@@ -2817,7 +3080,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2817 struct ocfs2_lock_res *lockres) 3080 struct ocfs2_lock_res *lockres)
2818{ 3081{
2819 int status; 3082 int status;
2820 int requeue = 0; 3083 struct ocfs2_unblock_ctl ctl = {0, 0,};
2821 unsigned long flags; 3084 unsigned long flags;
2822 3085
2823 /* Our reference to the lockres in this function can be 3086 /* Our reference to the lockres in this function can be
@@ -2842,21 +3105,25 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2842 goto unqueue; 3105 goto unqueue;
2843 spin_unlock_irqrestore(&lockres->l_lock, flags); 3106 spin_unlock_irqrestore(&lockres->l_lock, flags);
2844 3107
2845 status = lockres->l_ops->unblock(lockres, &requeue); 3108 status = lockres->l_ops->unblock(lockres, &ctl);
2846 if (status < 0) 3109 if (status < 0)
2847 mlog_errno(status); 3110 mlog_errno(status);
2848 3111
2849 spin_lock_irqsave(&lockres->l_lock, flags); 3112 spin_lock_irqsave(&lockres->l_lock, flags);
2850unqueue: 3113unqueue:
2851 if (lockres->l_flags & OCFS2_LOCK_FREEING || !requeue) { 3114 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
2852 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3115 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2853 } else 3116 } else
2854 ocfs2_schedule_blocked_lock(osb, lockres); 3117 ocfs2_schedule_blocked_lock(osb, lockres);
2855 3118
2856 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3119 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
2857 requeue ? "yes" : "no"); 3120 ctl.requeue ? "yes" : "no");
2858 spin_unlock_irqrestore(&lockres->l_lock, flags); 3121 spin_unlock_irqrestore(&lockres->l_lock, flags);
2859 3122
3123 if (ctl.unblock_action != UNBLOCK_CONTINUE
3124 && lockres->l_ops->post_unlock)
3125 lockres->l_ops->post_unlock(osb, lockres);
3126
2860 mlog_exit_void(); 3127 mlog_exit_void();
2861} 3128}
2862 3129