aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c109
1 files changed, 90 insertions, 19 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index d1c85f10c634..19399446aa8e 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -74,6 +74,7 @@ struct dlm_master_list_entry
74 wait_queue_head_t wq; 74 wait_queue_head_t wq;
75 atomic_t woken; 75 atomic_t woken;
76 struct kref mle_refs; 76 struct kref mle_refs;
77 int inuse;
77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 78 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 79 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 80 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -337,6 +338,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
337 spin_unlock(&dlm->spinlock); 338 spin_unlock(&dlm->spinlock);
338} 339}
339 340
341static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
342{
343 struct dlm_ctxt *dlm;
344 dlm = mle->dlm;
345
346 assert_spin_locked(&dlm->spinlock);
347 assert_spin_locked(&dlm->master_lock);
348 mle->inuse++;
349 kref_get(&mle->mle_refs);
350}
351
352static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
353{
354 struct dlm_ctxt *dlm;
355 dlm = mle->dlm;
356
357 spin_lock(&dlm->spinlock);
358 spin_lock(&dlm->master_lock);
359 mle->inuse--;
360 __dlm_put_mle(mle);
361 spin_unlock(&dlm->master_lock);
362 spin_unlock(&dlm->spinlock);
363
364}
365
340/* remove from list and free */ 366/* remove from list and free */
341static void __dlm_put_mle(struct dlm_master_list_entry *mle) 367static void __dlm_put_mle(struct dlm_master_list_entry *mle)
342{ 368{
@@ -390,6 +416,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
390 memset(mle->response_map, 0, sizeof(mle->response_map)); 416 memset(mle->response_map, 0, sizeof(mle->response_map));
391 mle->master = O2NM_MAX_NODES; 417 mle->master = O2NM_MAX_NODES;
392 mle->new_master = O2NM_MAX_NODES; 418 mle->new_master = O2NM_MAX_NODES;
419 mle->inuse = 0;
393 420
394 if (mle->type == DLM_MLE_MASTER) { 421 if (mle->type == DLM_MLE_MASTER) {
395 BUG_ON(!res); 422 BUG_ON(!res);
@@ -809,7 +836,7 @@ lookup:
809 * if so, the creator of the BLOCK may try to put the last 836 * if so, the creator of the BLOCK may try to put the last
810 * ref at this time in the assert master handler, so we 837 * ref at this time in the assert master handler, so we
811 * need an extra one to keep from a bad ptr deref. */ 838 * need an extra one to keep from a bad ptr deref. */
812 dlm_get_mle(mle); 839 dlm_get_mle_inuse(mle);
813 spin_unlock(&dlm->master_lock); 840 spin_unlock(&dlm->master_lock);
814 spin_unlock(&dlm->spinlock); 841 spin_unlock(&dlm->spinlock);
815 842
@@ -899,7 +926,7 @@ wait:
899 dlm_mle_detach_hb_events(dlm, mle); 926 dlm_mle_detach_hb_events(dlm, mle);
900 dlm_put_mle(mle); 927 dlm_put_mle(mle);
901 /* put the extra ref */ 928 /* put the extra ref */
902 dlm_put_mle(mle); 929 dlm_put_mle_inuse(mle);
903 930
904wake_waiters: 931wake_waiters:
905 spin_lock(&res->spinlock); 932 spin_lock(&res->spinlock);
@@ -1753,6 +1780,7 @@ ok:
1753 if (mle) { 1780 if (mle) {
1754 int extra_ref = 0; 1781 int extra_ref = 0;
1755 int nn = -1; 1782 int nn = -1;
1783 int rr, err = 0;
1756 1784
1757 spin_lock(&mle->spinlock); 1785 spin_lock(&mle->spinlock);
1758 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) 1786 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
@@ -1772,27 +1800,64 @@ ok:
1772 wake_up(&mle->wq); 1800 wake_up(&mle->wq);
1773 spin_unlock(&mle->spinlock); 1801 spin_unlock(&mle->spinlock);
1774 1802
1775 if (mle->type == DLM_MLE_MIGRATION && res) { 1803 if (res) {
1776 mlog(0, "finishing off migration of lockres %.*s, "
1777 "from %u to %u\n",
1778 res->lockname.len, res->lockname.name,
1779 dlm->node_num, mle->new_master);
1780 spin_lock(&res->spinlock); 1804 spin_lock(&res->spinlock);
1781 res->state &= ~DLM_LOCK_RES_MIGRATING; 1805 if (mle->type == DLM_MLE_MIGRATION) {
1782 dlm_change_lockres_owner(dlm, res, mle->new_master); 1806 mlog(0, "finishing off migration of lockres %.*s, "
1783 BUG_ON(res->state & DLM_LOCK_RES_DIRTY); 1807 "from %u to %u\n",
1808 res->lockname.len, res->lockname.name,
1809 dlm->node_num, mle->new_master);
1810 res->state &= ~DLM_LOCK_RES_MIGRATING;
1811 dlm_change_lockres_owner(dlm, res, mle->new_master);
1812 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1813 } else {
1814 dlm_change_lockres_owner(dlm, res, mle->master);
1815 }
1784 spin_unlock(&res->spinlock); 1816 spin_unlock(&res->spinlock);
1785 } 1817 }
1786 /* master is known, detach if not already detached */ 1818
1787 dlm_mle_detach_hb_events(dlm, mle); 1819 /* master is known, detach if not already detached.
1788 dlm_put_mle(mle); 1820 * ensures that only one assert_master call will happen
1789 1821 * on this mle. */
1822 spin_lock(&dlm->spinlock);
1823 spin_lock(&dlm->master_lock);
1824
1825 rr = atomic_read(&mle->mle_refs.refcount);
1826 if (mle->inuse > 0) {
1827 if (extra_ref && rr < 3)
1828 err = 1;
1829 else if (!extra_ref && rr < 2)
1830 err = 1;
1831 } else {
1832 if (extra_ref && rr < 2)
1833 err = 1;
1834 else if (!extra_ref && rr < 1)
1835 err = 1;
1836 }
1837 if (err) {
1838 mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1839 "that will mess up this node, refs=%d, extra=%d, "
1840 "inuse=%d\n", dlm->name, namelen, name,
1841 assert->node_idx, rr, extra_ref, mle->inuse);
1842 dlm_print_one_mle(mle);
1843 }
1844 list_del_init(&mle->list);
1845 __dlm_mle_detach_hb_events(dlm, mle);
1846 __dlm_put_mle(mle);
1790 if (extra_ref) { 1847 if (extra_ref) {
1791 /* the assert master message now balances the extra 1848 /* the assert master message now balances the extra
1792 * ref given by the master / migration request message. 1849 * ref given by the master / migration request message.
1793 * if this is the last put, it will be removed 1850 * if this is the last put, it will be removed
1794 * from the list. */ 1851 * from the list. */
1795 dlm_put_mle(mle); 1852 __dlm_put_mle(mle);
1853 }
1854 spin_unlock(&dlm->master_lock);
1855 spin_unlock(&dlm->spinlock);
1856 } else if (res) {
1857 if (res->owner != assert->node_idx) {
1858 mlog(0, "assert_master from %u, but current "
1859 "owner is %u (%.*s), no mle\n", assert->node_idx,
1860 res->owner, namelen, name);
1796 } 1861 }
1797 } 1862 }
1798 1863
@@ -2138,7 +2203,7 @@ fail:
2138 * take both dlm->spinlock and dlm->master_lock */ 2203 * take both dlm->spinlock and dlm->master_lock */
2139 spin_lock(&dlm->spinlock); 2204 spin_lock(&dlm->spinlock);
2140 spin_lock(&dlm->master_lock); 2205 spin_lock(&dlm->master_lock);
2141 dlm_get_mle(mle); 2206 dlm_get_mle_inuse(mle);
2142 spin_unlock(&dlm->master_lock); 2207 spin_unlock(&dlm->master_lock);
2143 spin_unlock(&dlm->spinlock); 2208 spin_unlock(&dlm->spinlock);
2144 2209
@@ -2155,7 +2220,10 @@ fail:
2155 /* migration failed, detach and clean up mle */ 2220 /* migration failed, detach and clean up mle */
2156 dlm_mle_detach_hb_events(dlm, mle); 2221 dlm_mle_detach_hb_events(dlm, mle);
2157 dlm_put_mle(mle); 2222 dlm_put_mle(mle);
2158 dlm_put_mle(mle); 2223 dlm_put_mle_inuse(mle);
2224 spin_lock(&res->spinlock);
2225 res->state &= ~DLM_LOCK_RES_MIGRATING;
2226 spin_unlock(&res->spinlock);
2159 goto leave; 2227 goto leave;
2160 } 2228 }
2161 2229
@@ -2196,7 +2264,10 @@ fail:
2196 /* migration failed, detach and clean up mle */ 2264 /* migration failed, detach and clean up mle */
2197 dlm_mle_detach_hb_events(dlm, mle); 2265 dlm_mle_detach_hb_events(dlm, mle);
2198 dlm_put_mle(mle); 2266 dlm_put_mle(mle);
2199 dlm_put_mle(mle); 2267 dlm_put_mle_inuse(mle);
2268 spin_lock(&res->spinlock);
2269 res->state &= ~DLM_LOCK_RES_MIGRATING;
2270 spin_unlock(&res->spinlock);
2200 goto leave; 2271 goto leave;
2201 } 2272 }
2202 /* TODO: if node died: stop, clean up, return error */ 2273 /* TODO: if node died: stop, clean up, return error */
@@ -2212,7 +2283,7 @@ fail:
2212 2283
2213 /* master is known, detach if not already detached */ 2284 /* master is known, detach if not already detached */
2214 dlm_mle_detach_hb_events(dlm, mle); 2285 dlm_mle_detach_hb_events(dlm, mle);
2215 dlm_put_mle(mle); 2286 dlm_put_mle_inuse(mle);
2216 ret = 0; 2287 ret = 0;
2217 2288
2218 dlm_lockres_calc_usage(dlm, res); 2289 dlm_lockres_calc_usage(dlm, res);