aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c85
1 files changed, 76 insertions, 9 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index c5e4a49e3a12..e044019cb3b1 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -875,6 +875,14 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo
875 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 875 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
876 876
877 lockres->l_level = lockres->l_requested; 877 lockres->l_level = lockres->l_requested;
878
879 /*
880 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
881 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
882 * downconverting the lock before the upconvert has fully completed.
883 */
884 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
885
878 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 886 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
879 887
880 mlog_exit_void(); 888 mlog_exit_void();
@@ -907,8 +915,6 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
907 915
908 assert_spin_locked(&lockres->l_lock); 916 assert_spin_locked(&lockres->l_lock);
909 917
910 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
911
912 if (level > lockres->l_blocking) { 918 if (level > lockres->l_blocking) {
913 /* only schedule a downconvert if we haven't already scheduled 919 /* only schedule a downconvert if we haven't already scheduled
914 * one that goes low enough to satisfy the level we're 920 * one that goes low enough to satisfy the level we're
@@ -921,6 +927,9 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
921 lockres->l_blocking = level; 927 lockres->l_blocking = level;
922 } 928 }
923 929
930 if (needs_downconvert)
931 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
932
924 mlog_exit(needs_downconvert); 933 mlog_exit(needs_downconvert);
925 return needs_downconvert; 934 return needs_downconvert;
926} 935}
@@ -1133,6 +1142,7 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1133 mlog_entry_void(); 1142 mlog_entry_void();
1134 spin_lock_irqsave(&lockres->l_lock, flags); 1143 spin_lock_irqsave(&lockres->l_lock, flags);
1135 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1144 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1145 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1136 if (convert) 1146 if (convert)
1137 lockres->l_action = OCFS2_AST_INVALID; 1147 lockres->l_action = OCFS2_AST_INVALID;
1138 else 1148 else
@@ -1323,13 +1333,13 @@ static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1323again: 1333again:
1324 wait = 0; 1334 wait = 0;
1325 1335
1336 spin_lock_irqsave(&lockres->l_lock, flags);
1337
1326 if (catch_signals && signal_pending(current)) { 1338 if (catch_signals && signal_pending(current)) {
1327 ret = -ERESTARTSYS; 1339 ret = -ERESTARTSYS;
1328 goto out; 1340 goto unlock;
1329 } 1341 }
1330 1342
1331 spin_lock_irqsave(&lockres->l_lock, flags);
1332
1333 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1343 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1334 "Cluster lock called on freeing lockres %s! flags " 1344 "Cluster lock called on freeing lockres %s! flags "
1335 "0x%lx\n", lockres->l_name, lockres->l_flags); 1345 "0x%lx\n", lockres->l_name, lockres->l_flags);
@@ -1346,6 +1356,25 @@ again:
1346 goto unlock; 1356 goto unlock;
1347 } 1357 }
1348 1358
1359 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1360 /*
1361 * We've upconverted. If the lock now has a level we can
1362 * work with, we take it. If, however, the lock is not at the
1363 * required level, we go thru the full cycle. One way this could
1364 * happen is if a process requesting an upconvert to PR is
1365 * closely followed by another requesting upconvert to an EX.
1366 * If the process requesting EX lands here, we want it to
1367 * continue attempting to upconvert and let the process
1368 * requesting PR take the lock.
1369 * If multiple processes request upconvert to PR, the first one
1370 * here will take the lock. The others will have to go thru the
1371 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1372 * downconvert request.
1373 */
1374 if (level <= lockres->l_level)
1375 goto update_holders;
1376 }
1377
1349 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1378 if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1350 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1379 !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1351 /* is the lock is currently blocked on behalf of 1380 /* is the lock is currently blocked on behalf of
@@ -1416,11 +1445,14 @@ again:
1416 goto again; 1445 goto again;
1417 } 1446 }
1418 1447
1448update_holders:
1419 /* Ok, if we get here then we're good to go. */ 1449 /* Ok, if we get here then we're good to go. */
1420 ocfs2_inc_holders(lockres, level); 1450 ocfs2_inc_holders(lockres, level);
1421 1451
1422 ret = 0; 1452 ret = 0;
1423unlock: 1453unlock:
1454 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1455
1424 spin_unlock_irqrestore(&lockres->l_lock, flags); 1456 spin_unlock_irqrestore(&lockres->l_lock, flags);
1425out: 1457out:
1426 /* 1458 /*
@@ -3155,7 +3187,7 @@ out:
3155/* Mark the lockres as being dropped. It will no longer be 3187/* Mark the lockres as being dropped. It will no longer be
3156 * queued if blocking, but we still may have to wait on it 3188 * queued if blocking, but we still may have to wait on it
3157 * being dequeued from the downconvert thread before we can consider 3189 * being dequeued from the downconvert thread before we can consider
3158 * it safe to drop. 3190 * it safe to drop.
3159 * 3191 *
3160 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3192 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3161void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 3193void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
@@ -3352,6 +3384,7 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3352 unsigned long flags; 3384 unsigned long flags;
3353 int blocking; 3385 int blocking;
3354 int new_level; 3386 int new_level;
3387 int level;
3355 int ret = 0; 3388 int ret = 0;
3356 int set_lvb = 0; 3389 int set_lvb = 0;
3357 unsigned int gen; 3390 unsigned int gen;
@@ -3360,9 +3393,17 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3360 3393
3361 spin_lock_irqsave(&lockres->l_lock, flags); 3394 spin_lock_irqsave(&lockres->l_lock, flags);
3362 3395
3363 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
3364
3365recheck: 3396recheck:
3397 /*
3398 * Is it still blocking? If not, we have no more work to do.
3399 */
3400 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3401 BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3402 spin_unlock_irqrestore(&lockres->l_lock, flags);
3403 ret = 0;
3404 goto leave;
3405 }
3406
3366 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3407 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3367 /* XXX 3408 /* XXX
3368 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3409 * This is a *big* race. The OCFS2_LOCK_PENDING flag
@@ -3401,6 +3442,31 @@ recheck:
3401 goto leave; 3442 goto leave;
3402 } 3443 }
3403 3444
3445 /*
3446 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3447 * set when the ast is received for an upconvert just before the
3448 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3449 * on the heels of the ast, we want to delay the downconvert just
3450 * enough to allow the up requestor to do its task. Because this
3451 * lock is in the blocked queue, the lock will be downconverted
3452 * as soon as the requestor is done with the lock.
3453 */
3454 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3455 goto leave_requeue;
3456
3457 /*
3458 * How can we block and yet be at NL? We were trying to upconvert
3459 * from NL and got canceled. The code comes back here, and now
3460 * we notice and clear BLOCKING.
3461 */
3462 if (lockres->l_level == DLM_LOCK_NL) {
3463 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3464 lockres->l_blocking = DLM_LOCK_NL;
3465 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3466 spin_unlock_irqrestore(&lockres->l_lock, flags);
3467 goto leave;
3468 }
3469
3404 /* if we're blocking an exclusive and we have *any* holders, 3470 /* if we're blocking an exclusive and we have *any* holders,
3405 * then requeue. */ 3471 * then requeue. */
3406 if ((lockres->l_blocking == DLM_LOCK_EX) 3472 if ((lockres->l_blocking == DLM_LOCK_EX)
@@ -3438,6 +3504,7 @@ recheck:
3438 * may sleep, so we save off a copy of what we're blocking as 3504 * may sleep, so we save off a copy of what we're blocking as
3439 * it may change while we're not holding the spin lock. */ 3505 * it may change while we're not holding the spin lock. */
3440 blocking = lockres->l_blocking; 3506 blocking = lockres->l_blocking;
3507 level = lockres->l_level;
3441 spin_unlock_irqrestore(&lockres->l_lock, flags); 3508 spin_unlock_irqrestore(&lockres->l_lock, flags);
3442 3509
3443 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3510 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
@@ -3446,7 +3513,7 @@ recheck:
3446 goto leave; 3513 goto leave;
3447 3514
3448 spin_lock_irqsave(&lockres->l_lock, flags); 3515 spin_lock_irqsave(&lockres->l_lock, flags);
3449 if (blocking != lockres->l_blocking) { 3516 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3450 /* If this changed underneath us, then we can't drop 3517 /* If this changed underneath us, then we can't drop
3451 * it just yet. */ 3518 * it just yet. */
3452 goto recheck; 3519 goto recheck;