aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmthread.c')
-rw-r--r--fs/ocfs2/dlm/dlmthread.c200
1 files changed, 98 insertions, 102 deletions
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 0c822f3ffb05..8ffa0916eb86 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -54,9 +54,6 @@
54#include "cluster/masklog.h" 54#include "cluster/masklog.h"
55 55
56static int dlm_thread(void *data); 56static int dlm_thread(void *data);
57static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
59
60static void dlm_flush_asts(struct dlm_ctxt *dlm); 57static void dlm_flush_asts(struct dlm_ctxt *dlm);
61 58
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 59#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@ repeat:
82 current->state = TASK_RUNNING; 79 current->state = TASK_RUNNING;
83} 80}
84 81
85 82int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
86int __dlm_lockres_unused(struct dlm_lock_resource *res)
87{ 83{
88 if (list_empty(&res->granted) && 84 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) && 85 list_empty(&res->converting) &&
90 list_empty(&res->blocked) && 86 list_empty(&res->blocked))
91 list_empty(&res->dirty)) 87 return 0;
92 return 1; 88 return 1;
89}
90
91/* "unused": the lockres has no locks, is not on the dirty list,
92 * has no inflight locks (in the gap between mastery and acquiring
93 * the first lock), and has no bits in its refmap.
94 * truly ready to be freed. */
95int __dlm_lockres_unused(struct dlm_lock_resource *res)
96{
97 if (!__dlm_lockres_has_locks(res) &&
98 (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
99 /* try not to scan the bitmap unless the first two
100 * conditions are already true */
101 int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
102 if (bit >= O2NM_MAX_NODES) {
103 /* since the bit for dlm->node_num is not
104 * set, inflight_locks better be zero */
105 BUG_ON(res->inflight_locks != 0);
106 return 1;
107 }
108 }
93 return 0; 109 return 0;
94} 110}
95 111
@@ -106,46 +122,21 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
106 assert_spin_locked(&res->spinlock); 122 assert_spin_locked(&res->spinlock);
107 123
108 if (__dlm_lockres_unused(res)){ 124 if (__dlm_lockres_unused(res)){
109 /* For now, just keep any resource we master */
110 if (res->owner == dlm->node_num)
111 {
112 if (!list_empty(&res->purge)) {
113 mlog(0, "we master %s:%.*s, but it is on "
114 "the purge list. Removing\n",
115 dlm->name, res->lockname.len,
116 res->lockname.name);
117 list_del_init(&res->purge);
118 dlm->purge_count--;
119 }
120 return;
121 }
122
123 if (list_empty(&res->purge)) { 125 if (list_empty(&res->purge)) {
124 mlog(0, "putting lockres %.*s from purge list\n", 126 mlog(0, "putting lockres %.*s:%p onto purge list\n",
125 res->lockname.len, res->lockname.name); 127 res->lockname.len, res->lockname.name, res);
126 128
127 res->last_used = jiffies; 129 res->last_used = jiffies;
130 dlm_lockres_get(res);
128 list_add_tail(&res->purge, &dlm->purge_list); 131 list_add_tail(&res->purge, &dlm->purge_list);
129 dlm->purge_count++; 132 dlm->purge_count++;
130
131 /* if this node is not the owner, there is
132 * no way to keep track of who the owner could be.
133 * unhash it to avoid serious problems. */
134 if (res->owner != dlm->node_num) {
135 mlog(0, "%s:%.*s: doing immediate "
136 "purge of lockres owned by %u\n",
137 dlm->name, res->lockname.len,
138 res->lockname.name, res->owner);
139
140 dlm_purge_lockres_now(dlm, res);
141 }
142 } 133 }
143 } else if (!list_empty(&res->purge)) { 134 } else if (!list_empty(&res->purge)) {
144 mlog(0, "removing lockres %.*s from purge list, " 135 mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
145 "owner=%u\n", res->lockname.len, res->lockname.name, 136 res->lockname.len, res->lockname.name, res, res->owner);
146 res->owner);
147 137
148 list_del_init(&res->purge); 138 list_del_init(&res->purge);
139 dlm_lockres_put(res);
149 dlm->purge_count--; 140 dlm->purge_count--;
150 } 141 }
151} 142}
@@ -163,68 +154,65 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
163 spin_unlock(&dlm->spinlock); 154 spin_unlock(&dlm->spinlock);
164} 155}
165 156
166/* TODO: Eventual API: Called with the dlm spinlock held, may drop it 157static int dlm_purge_lockres(struct dlm_ctxt *dlm,
167 * to do migration, but will re-acquire before exit. */ 158 struct dlm_lock_resource *res)
168void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
169{ 159{
170 int master; 160 int master;
171 int ret; 161 int ret = 0;
172
173 spin_lock(&lockres->spinlock);
174 master = lockres->owner == dlm->node_num;
175 spin_unlock(&lockres->spinlock);
176 162
177 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len, 163 spin_lock(&res->spinlock);
178 lockres->lockname.name, master); 164 if (!__dlm_lockres_unused(res)) {
179 165 spin_unlock(&res->spinlock);
180 /* Non master is the easy case -- no migration required, just 166 mlog(0, "%s:%.*s: tried to purge but not unused\n",
181 * quit. */ 167 dlm->name, res->lockname.len, res->lockname.name);
168 return -ENOTEMPTY;
169 }
170 master = (res->owner == dlm->node_num);
182 if (!master) 171 if (!master)
183 goto finish; 172 res->state |= DLM_LOCK_RES_DROPPING_REF;
184 173 spin_unlock(&res->spinlock);
185 /* Wheee! Migrate lockres here! */
186 spin_unlock(&dlm->spinlock);
187again:
188 174
189 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); 175 mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
190 if (ret == -ENOTEMPTY) { 176 res->lockname.name, master);
191 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
192 lockres->lockname.len, lockres->lockname.name);
193 177
194 BUG(); 178 if (!master) {
195 } else if (ret < 0) { 179 spin_lock(&res->spinlock);
196 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", 180 /* This ensures that clear refmap is sent after the set */
197 lockres->lockname.len, lockres->lockname.name); 181 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
198 msleep(100); 182 spin_unlock(&res->spinlock);
199 goto again; 183 /* drop spinlock to do messaging, retake below */
184 spin_unlock(&dlm->spinlock);
185 /* clear our bit from the master's refmap, ignore errors */
186 ret = dlm_drop_lockres_ref(dlm, res);
187 if (ret < 0) {
188 mlog_errno(ret);
189 if (!dlm_is_host_down(ret))
190 BUG();
191 }
192 mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
193 dlm->name, res->lockname.len, res->lockname.name, ret);
194 spin_lock(&dlm->spinlock);
200 } 195 }
201 196
202 spin_lock(&dlm->spinlock); 197 if (!list_empty(&res->purge)) {
203 198 mlog(0, "removing lockres %.*s:%p from purgelist, "
204finish: 199 "master = %d\n", res->lockname.len, res->lockname.name,
205 if (!list_empty(&lockres->purge)) { 200 res, master);
206 list_del_init(&lockres->purge); 201 list_del_init(&res->purge);
202 dlm_lockres_put(res);
207 dlm->purge_count--; 203 dlm->purge_count--;
208 } 204 }
209 __dlm_unhash_lockres(lockres); 205 __dlm_unhash_lockres(res);
210}
211
212/* make an unused lockres go away immediately.
213 * as soon as the dlm spinlock is dropped, this lockres
214 * will not be found. kfree still happens on last put. */
215static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
216 struct dlm_lock_resource *lockres)
217{
218 assert_spin_locked(&dlm->spinlock);
219 assert_spin_locked(&lockres->spinlock);
220 206
221 BUG_ON(!__dlm_lockres_unused(lockres)); 207 /* lockres is not in the hash now. drop the flag and wake up
222 208 * any processes waiting in dlm_get_lock_resource. */
223 if (!list_empty(&lockres->purge)) { 209 if (!master) {
224 list_del_init(&lockres->purge); 210 spin_lock(&res->spinlock);
225 dlm->purge_count--; 211 res->state &= ~DLM_LOCK_RES_DROPPING_REF;
212 spin_unlock(&res->spinlock);
213 wake_up(&res->wq);
226 } 214 }
227 __dlm_unhash_lockres(lockres); 215 return 0;
228} 216}
229 217
230static void dlm_run_purge_list(struct dlm_ctxt *dlm, 218static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -268,13 +256,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
268 break; 256 break;
269 } 257 }
270 258
259 mlog(0, "removing lockres %.*s:%p from purgelist\n",
260 lockres->lockname.len, lockres->lockname.name, lockres);
271 list_del_init(&lockres->purge); 261 list_del_init(&lockres->purge);
262 dlm_lockres_put(lockres);
272 dlm->purge_count--; 263 dlm->purge_count--;
273 264
274 /* This may drop and reacquire the dlm spinlock if it 265 /* This may drop and reacquire the dlm spinlock if it
275 * has to do migration. */ 266 * has to do migration. */
276 mlog(0, "calling dlm_purge_lockres!\n"); 267 mlog(0, "calling dlm_purge_lockres!\n");
277 dlm_purge_lockres(dlm, lockres); 268 if (dlm_purge_lockres(dlm, lockres))
269 BUG();
278 mlog(0, "DONE calling dlm_purge_lockres!\n"); 270 mlog(0, "DONE calling dlm_purge_lockres!\n");
279 271
280 /* Avoid adding any scheduling latencies */ 272 /* Avoid adding any scheduling latencies */
@@ -467,12 +459,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
467 assert_spin_locked(&res->spinlock); 459 assert_spin_locked(&res->spinlock);
468 460
469 /* don't shuffle secondary queues */ 461 /* don't shuffle secondary queues */
470 if ((res->owner == dlm->node_num) && 462 if ((res->owner == dlm->node_num)) {
471 !(res->state & DLM_LOCK_RES_DIRTY)) { 463 if (res->state & (DLM_LOCK_RES_MIGRATING |
472 /* ref for dirty_list */ 464 DLM_LOCK_RES_BLOCK_DIRTY))
473 dlm_lockres_get(res); 465 return;
474 list_add_tail(&res->dirty, &dlm->dirty_list); 466
475 res->state |= DLM_LOCK_RES_DIRTY; 467 if (list_empty(&res->dirty)) {
468 /* ref for dirty_list */
469 dlm_lockres_get(res);
470 list_add_tail(&res->dirty, &dlm->dirty_list);
471 res->state |= DLM_LOCK_RES_DIRTY;
472 }
476 } 473 }
477} 474}
478 475
@@ -651,7 +648,7 @@ static int dlm_thread(void *data)
651 dlm_lockres_get(res); 648 dlm_lockres_get(res);
652 649
653 spin_lock(&res->spinlock); 650 spin_lock(&res->spinlock);
654 res->state &= ~DLM_LOCK_RES_DIRTY; 651 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
655 list_del_init(&res->dirty); 652 list_del_init(&res->dirty);
656 spin_unlock(&res->spinlock); 653 spin_unlock(&res->spinlock);
657 spin_unlock(&dlm->spinlock); 654 spin_unlock(&dlm->spinlock);
@@ -675,10 +672,11 @@ static int dlm_thread(void *data)
675 /* it is now ok to move lockreses in these states 672 /* it is now ok to move lockreses in these states
676 * to the dirty list, assuming that they will only be 673 * to the dirty list, assuming that they will only be
677 * dirty for a short while. */ 674 * dirty for a short while. */
675 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
678 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 676 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
679 DLM_LOCK_RES_MIGRATING |
680 DLM_LOCK_RES_RECOVERING)) { 677 DLM_LOCK_RES_RECOVERING)) {
681 /* move it to the tail and keep going */ 678 /* move it to the tail and keep going */
679 res->state &= ~DLM_LOCK_RES_DIRTY;
682 spin_unlock(&res->spinlock); 680 spin_unlock(&res->spinlock);
683 mlog(0, "delaying list shuffling for in-" 681 mlog(0, "delaying list shuffling for in-"
684 "progress lockres %.*s, state=%d\n", 682 "progress lockres %.*s, state=%d\n",
@@ -699,6 +697,7 @@ static int dlm_thread(void *data)
699 697
700 /* called while holding lockres lock */ 698 /* called while holding lockres lock */
701 dlm_shuffle_lists(dlm, res); 699 dlm_shuffle_lists(dlm, res);
700 res->state &= ~DLM_LOCK_RES_DIRTY;
702 spin_unlock(&res->spinlock); 701 spin_unlock(&res->spinlock);
703 702
704 dlm_lockres_calc_usage(dlm, res); 703 dlm_lockres_calc_usage(dlm, res);
@@ -709,11 +708,8 @@ in_progress:
709 /* if the lock was in-progress, stick 708 /* if the lock was in-progress, stick
710 * it on the back of the list */ 709 * it on the back of the list */
711 if (delay) { 710 if (delay) {
712 /* ref for dirty_list */
713 dlm_lockres_get(res);
714 spin_lock(&res->spinlock); 711 spin_lock(&res->spinlock);
715 list_add_tail(&res->dirty, &dlm->dirty_list); 712 __dlm_dirty_lockres(dlm, res);
716 res->state |= DLM_LOCK_RES_DIRTY;
717 spin_unlock(&res->spinlock); 713 spin_unlock(&res->spinlock);
718 } 714 }
719 dlm_lockres_put(res); 715 dlm_lockres_put(res);