aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c164
1 files changed, 128 insertions, 36 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 4e97dcceaf8f..b3068ade3f7b 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -55,7 +55,6 @@
55#include "slot_map.h" 55#include "slot_map.h"
56#include "super.h" 56#include "super.h"
57#include "uptodate.h" 57#include "uptodate.h"
58#include "vote.h"
59 58
60#include "buffer_head_io.h" 59#include "buffer_head_io.h"
61 60
@@ -153,10 +152,10 @@ struct ocfs2_lock_res_ops {
153 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 152 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
154 153
155 /* 154 /*
156 * Optionally called in the downconvert (or "vote") thread 155 * Optionally called in the downconvert thread after a
157 * after a successful downconvert. The lockres will not be 156 * successful downconvert. The lockres will not be referenced
158 * referenced after this callback is called, so it is safe to 157 * after this callback is called, so it is safe to free
159 * free memory, etc. 158 * memory, etc.
160 * 159 *
161 * The exact semantics of when this is called are controlled 160 * The exact semantics of when this is called are controlled
162 * by ->downconvert_worker() 161 * by ->downconvert_worker()
@@ -310,8 +309,9 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
310 "resource %s: %s\n", dlm_errname(_stat), _func, \ 309 "resource %s: %s\n", dlm_errname(_stat), _func, \
311 _lockres->l_name, dlm_errmsg(_stat)); \ 310 _lockres->l_name, dlm_errmsg(_stat)); \
312} while (0) 311} while (0)
313static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 312static int ocfs2_downconvert_thread(void *arg);
314 struct ocfs2_lock_res *lockres); 313static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
314 struct ocfs2_lock_res *lockres);
315static int ocfs2_meta_lock_update(struct inode *inode, 315static int ocfs2_meta_lock_update(struct inode *inode,
316 struct buffer_head **bh); 316 struct buffer_head **bh);
317static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 317static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
@@ -732,7 +732,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
732 732
733 wake_up(&lockres->l_event); 733 wake_up(&lockres->l_event);
734 734
735 ocfs2_kick_vote_thread(osb); 735 ocfs2_wake_downconvert_thread(osb);
736} 736}
737 737
738static void ocfs2_locking_ast(void *opaque) 738static void ocfs2_locking_ast(void *opaque)
@@ -1089,7 +1089,7 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1089 mlog_entry_void(); 1089 mlog_entry_void();
1090 spin_lock_irqsave(&lockres->l_lock, flags); 1090 spin_lock_irqsave(&lockres->l_lock, flags);
1091 ocfs2_dec_holders(lockres, level); 1091 ocfs2_dec_holders(lockres, level);
1092 ocfs2_vote_on_unlock(osb, lockres); 1092 ocfs2_downconvert_on_unlock(osb, lockres);
1093 spin_unlock_irqrestore(&lockres->l_lock, flags); 1093 spin_unlock_irqrestore(&lockres->l_lock, flags);
1094 mlog_exit_void(); 1094 mlog_exit_void();
1095} 1095}
@@ -1372,15 +1372,15 @@ int ocfs2_data_lock_with_page(struct inode *inode,
1372 return ret; 1372 return ret;
1373} 1373}
1374 1374
1375static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 1375static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1376 struct ocfs2_lock_res *lockres) 1376 struct ocfs2_lock_res *lockres)
1377{ 1377{
1378 int kick = 0; 1378 int kick = 0;
1379 1379
1380 mlog_entry_void(); 1380 mlog_entry_void();
1381 1381
1382 /* If we know that another node is waiting on our lock, kick 1382 /* If we know that another node is waiting on our lock, kick
1383 * the vote thread * pre-emptively when we reach a release 1383 * the downconvert thread * pre-emptively when we reach a release
1384 * condition. */ 1384 * condition. */
1385 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1385 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1386 switch(lockres->l_blocking) { 1386 switch(lockres->l_blocking) {
@@ -1398,7 +1398,7 @@ static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1398 } 1398 }
1399 1399
1400 if (kick) 1400 if (kick)
1401 ocfs2_kick_vote_thread(osb); 1401 ocfs2_wake_downconvert_thread(osb);
1402 1402
1403 mlog_exit_void(); 1403 mlog_exit_void();
1404} 1404}
@@ -1832,19 +1832,20 @@ bail:
1832} 1832}
1833 1833
1834/* 1834/*
1835 * This is working around a lock inversion between tasks acquiring DLM locks 1835 * This is working around a lock inversion between tasks acquiring DLM
1836 * while holding a page lock and the vote thread which blocks dlm lock acquiry 1836 * locks while holding a page lock and the downconvert thread which
1837 * while acquiring page locks. 1837 * blocks dlm lock acquiry while acquiring page locks.
1838 * 1838 *
1839 * ** These _with_page variantes are only intended to be called from aop 1839 * ** These _with_page variantes are only intended to be called from aop
1840 * methods that hold page locks and return a very specific *positive* error 1840 * methods that hold page locks and return a very specific *positive* error
1841 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 1841 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1842 * 1842 *
1843 * The DLM is called such that it returns -EAGAIN if it would have blocked 1843 * The DLM is called such that it returns -EAGAIN if it would have
1844 * waiting for the vote thread. In that case we unlock our page so the vote 1844 * blocked waiting for the downconvert thread. In that case we unlock
1845 * thread can make progress. Once we've done this we have to return 1845 * our page so the downconvert thread can make progress. Once we've
1846 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up 1846 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
1847 * into the VFS who will then immediately retry the aop call. 1847 * that called us can bubble that back up into the VFS who will then
1848 * immediately retry the aop call.
1848 * 1849 *
1849 * We do a blocking lock and immediate unlock before returning, though, so that 1850 * We do a blocking lock and immediate unlock before returning, though, so that
1850 * the lock has a great chance of being cached on this node by the time the VFS 1851 * the lock has a great chance of being cached on this node by the time the VFS
@@ -2320,11 +2321,11 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
2320 goto bail; 2321 goto bail;
2321 } 2322 }
2322 2323
2323 /* launch vote thread */ 2324 /* launch downconvert thread */
2324 osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote"); 2325 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2325 if (IS_ERR(osb->vote_task)) { 2326 if (IS_ERR(osb->dc_task)) {
2326 status = PTR_ERR(osb->vote_task); 2327 status = PTR_ERR(osb->dc_task);
2327 osb->vote_task = NULL; 2328 osb->dc_task = NULL;
2328 mlog_errno(status); 2329 mlog_errno(status);
2329 goto bail; 2330 goto bail;
2330 } 2331 }
@@ -2353,8 +2354,8 @@ local:
2353bail: 2354bail:
2354 if (status < 0) { 2355 if (status < 0) {
2355 ocfs2_dlm_shutdown_debug(osb); 2356 ocfs2_dlm_shutdown_debug(osb);
2356 if (osb->vote_task) 2357 if (osb->dc_task)
2357 kthread_stop(osb->vote_task); 2358 kthread_stop(osb->dc_task);
2358 } 2359 }
2359 2360
2360 mlog_exit(status); 2361 mlog_exit(status);
@@ -2369,9 +2370,9 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2369 2370
2370 ocfs2_drop_osb_locks(osb); 2371 ocfs2_drop_osb_locks(osb);
2371 2372
2372 if (osb->vote_task) { 2373 if (osb->dc_task) {
2373 kthread_stop(osb->vote_task); 2374 kthread_stop(osb->dc_task);
2374 osb->vote_task = NULL; 2375 osb->dc_task = NULL;
2375 } 2376 }
2376 2377
2377 ocfs2_lock_res_free(&osb->osb_super_lockres); 2378 ocfs2_lock_res_free(&osb->osb_super_lockres);
@@ -2527,7 +2528,7 @@ out:
2527 2528
2528/* Mark the lockres as being dropped. It will no longer be 2529/* Mark the lockres as being dropped. It will no longer be
2529 * queued if blocking, but we still may have to wait on it 2530 * queued if blocking, but we still may have to wait on it
2530 * being dequeued from the vote thread before we can consider 2531 * being dequeued from the downconvert thread before we can consider
2531 * it safe to drop. 2532 * it safe to drop.
2532 * 2533 *
2533 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2534 * You can *not* attempt to call cluster_lock on this lockres anymore. */
@@ -2903,7 +2904,7 @@ static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2903 2904
2904/* 2905/*
2905 * Does the final reference drop on our dentry lock. Right now this 2906 * Does the final reference drop on our dentry lock. Right now this
2906 * happens in the vote thread, but we could choose to simplify the 2907 * happens in the downconvert thread, but we could choose to simplify the
2907 * dlmglue API and push these off to the ocfs2_wq in the future. 2908 * dlmglue API and push these off to the ocfs2_wq in the future.
2908 */ 2909 */
2909static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 2910static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
@@ -3042,7 +3043,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3042 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3043 mlog(0, "lockres %s blocked.\n", lockres->l_name);
3043 3044
3044 /* Detect whether a lock has been marked as going away while 3045 /* Detect whether a lock has been marked as going away while
3045 * the vote thread was processing other things. A lock can 3046 * the downconvert thread was processing other things. A lock can
3046 * still be marked with OCFS2_LOCK_FREEING after this check, 3047 * still be marked with OCFS2_LOCK_FREEING after this check,
3047 * but short circuiting here will still save us some 3048 * but short circuiting here will still save us some
3048 * performance. */ 3049 * performance. */
@@ -3091,13 +3092,104 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3091 3092
3092 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3093 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3093 3094
3094 spin_lock(&osb->vote_task_lock); 3095 spin_lock(&osb->dc_task_lock);
3095 if (list_empty(&lockres->l_blocked_list)) { 3096 if (list_empty(&lockres->l_blocked_list)) {
3096 list_add_tail(&lockres->l_blocked_list, 3097 list_add_tail(&lockres->l_blocked_list,
3097 &osb->blocked_lock_list); 3098 &osb->blocked_lock_list);
3098 osb->blocked_lock_count++; 3099 osb->blocked_lock_count++;
3099 } 3100 }
3100 spin_unlock(&osb->vote_task_lock); 3101 spin_unlock(&osb->dc_task_lock);
3101 3102
3102 mlog_exit_void(); 3103 mlog_exit_void();
3103} 3104}
3105
3106static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3107{
3108 unsigned long processed;
3109 struct ocfs2_lock_res *lockres;
3110
3111 mlog_entry_void();
3112
3113 spin_lock(&osb->dc_task_lock);
3114 /* grab this early so we know to try again if a state change and
3115 * wake happens part-way through our work */
3116 osb->dc_work_sequence = osb->dc_wake_sequence;
3117
3118 processed = osb->blocked_lock_count;
3119 while (processed) {
3120 BUG_ON(list_empty(&osb->blocked_lock_list));
3121
3122 lockres = list_entry(osb->blocked_lock_list.next,
3123 struct ocfs2_lock_res, l_blocked_list);
3124 list_del_init(&lockres->l_blocked_list);
3125 osb->blocked_lock_count--;
3126 spin_unlock(&osb->dc_task_lock);
3127
3128 BUG_ON(!processed);
3129 processed--;
3130
3131 ocfs2_process_blocked_lock(osb, lockres);
3132
3133 spin_lock(&osb->dc_task_lock);
3134 }
3135 spin_unlock(&osb->dc_task_lock);
3136
3137 mlog_exit_void();
3138}
3139
3140static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3141{
3142 int empty = 0;
3143
3144 spin_lock(&osb->dc_task_lock);
3145 if (list_empty(&osb->blocked_lock_list))
3146 empty = 1;
3147
3148 spin_unlock(&osb->dc_task_lock);
3149 return empty;
3150}
3151
3152static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3153{
3154 int should_wake = 0;
3155
3156 spin_lock(&osb->dc_task_lock);
3157 if (osb->dc_work_sequence != osb->dc_wake_sequence)
3158 should_wake = 1;
3159 spin_unlock(&osb->dc_task_lock);
3160
3161 return should_wake;
3162}
3163
3164int ocfs2_downconvert_thread(void *arg)
3165{
3166 int status = 0;
3167 struct ocfs2_super *osb = arg;
3168
3169 /* only quit once we've been asked to stop and there is no more
3170 * work available */
3171 while (!(kthread_should_stop() &&
3172 ocfs2_downconvert_thread_lists_empty(osb))) {
3173
3174 wait_event_interruptible(osb->dc_event,
3175 ocfs2_downconvert_thread_should_wake(osb) ||
3176 kthread_should_stop());
3177
3178 mlog(0, "downconvert_thread: awoken\n");
3179
3180 ocfs2_downconvert_thread_do_work(osb);
3181 }
3182
3183 osb->dc_task = NULL;
3184 return status;
3185}
3186
3187void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3188{
3189 spin_lock(&osb->dc_task_lock);
3190 /* make sure the voting thread gets a swipe at whatever changes
3191 * the caller may have made to the voting state */
3192 osb->dc_wake_sequence++;
3193 spin_unlock(&osb->dc_task_lock);
3194 wake_up(&osb->dc_event);
3195}