aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c224
1 files changed, 80 insertions, 144 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index a54d33d95ada..efc015c6128a 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -48,47 +48,11 @@
48#include "dlmapi.h" 48#include "dlmapi.h"
49#include "dlmcommon.h" 49#include "dlmcommon.h"
50#include "dlmdomain.h" 50#include "dlmdomain.h"
51#include "dlmdebug.h"
51 52
52#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) 53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53#include "cluster/masklog.h" 54#include "cluster/masklog.h"
54 55
55enum dlm_mle_type {
56 DLM_MLE_BLOCK,
57 DLM_MLE_MASTER,
58 DLM_MLE_MIGRATION
59};
60
61struct dlm_lock_name
62{
63 u8 len;
64 u8 name[DLM_LOCKID_NAME_MAX];
65};
66
67struct dlm_master_list_entry
68{
69 struct list_head list;
70 struct list_head hb_events;
71 struct dlm_ctxt *dlm;
72 spinlock_t spinlock;
73 wait_queue_head_t wq;
74 atomic_t woken;
75 struct kref mle_refs;
76 int inuse;
77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80 unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
81 u8 master;
82 u8 new_master;
83 enum dlm_mle_type type;
84 struct o2hb_callback_func mle_hb_up;
85 struct o2hb_callback_func mle_hb_down;
86 union {
87 struct dlm_lock_resource *res;
88 struct dlm_lock_name name;
89 } u;
90};
91
92static void dlm_mle_node_down(struct dlm_ctxt *dlm, 56static void dlm_mle_node_down(struct dlm_ctxt *dlm,
93 struct dlm_master_list_entry *mle, 57 struct dlm_master_list_entry *mle,
94 struct o2nm_node *node, 58 struct o2nm_node *node,
@@ -128,98 +92,10 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
128 return 1; 92 return 1;
129} 93}
130 94
131#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) 95static struct kmem_cache *dlm_lockres_cache = NULL;
132static void _dlm_print_nodemap(unsigned long *map, const char *mapname) 96static struct kmem_cache *dlm_lockname_cache = NULL;
133{
134 int i;
135 printk("%s=[ ", mapname);
136 for (i=0; i<O2NM_MAX_NODES; i++)
137 if (test_bit(i, map))
138 printk("%d ", i);
139 printk("]");
140}
141
142static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
143{
144 int refs;
145 char *type;
146 char attached;
147 u8 master;
148 unsigned int namelen;
149 const char *name;
150 struct kref *k;
151 unsigned long *maybe = mle->maybe_map,
152 *vote = mle->vote_map,
153 *resp = mle->response_map,
154 *node = mle->node_map;
155
156 k = &mle->mle_refs;
157 if (mle->type == DLM_MLE_BLOCK)
158 type = "BLK";
159 else if (mle->type == DLM_MLE_MASTER)
160 type = "MAS";
161 else
162 type = "MIG";
163 refs = atomic_read(&k->refcount);
164 master = mle->master;
165 attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
166
167 if (mle->type != DLM_MLE_MASTER) {
168 namelen = mle->u.name.len;
169 name = mle->u.name.name;
170 } else {
171 namelen = mle->u.res->lockname.len;
172 name = mle->u.res->lockname.name;
173 }
174
175 mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
176 namelen, name, type, refs, master, mle->new_master, attached,
177 mle->inuse);
178 dlm_print_nodemap(maybe);
179 printk(", ");
180 dlm_print_nodemap(vote);
181 printk(", ");
182 dlm_print_nodemap(resp);
183 printk(", ");
184 dlm_print_nodemap(node);
185 printk(", ");
186 printk("\n");
187}
188
189#if 0
190/* Code here is included but defined out as it aids debugging */
191
192static void dlm_dump_mles(struct dlm_ctxt *dlm)
193{
194 struct dlm_master_list_entry *mle;
195
196 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
197 spin_lock(&dlm->master_lock);
198 list_for_each_entry(mle, &dlm->master_list, list)
199 dlm_print_one_mle(mle);
200 spin_unlock(&dlm->master_lock);
201}
202
203int dlm_dump_all_mles(const char __user *data, unsigned int len)
204{
205 struct dlm_ctxt *dlm;
206
207 spin_lock(&dlm_domain_lock);
208 list_for_each_entry(dlm, &dlm_domains, list) {
209 mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
210 dlm_dump_mles(dlm);
211 }
212 spin_unlock(&dlm_domain_lock);
213 return len;
214}
215EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
216
217#endif /* 0 */
218
219
220static struct kmem_cache *dlm_mle_cache = NULL; 97static struct kmem_cache *dlm_mle_cache = NULL;
221 98
222
223static void dlm_mle_release(struct kref *kref); 99static void dlm_mle_release(struct kref *kref);
224static void dlm_init_mle(struct dlm_master_list_entry *mle, 100static void dlm_init_mle(struct dlm_master_list_entry *mle,
225 enum dlm_mle_type type, 101 enum dlm_mle_type type,
@@ -507,7 +383,7 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
507 383
508int dlm_init_mle_cache(void) 384int dlm_init_mle_cache(void)
509{ 385{
510 dlm_mle_cache = kmem_cache_create("dlm_mle_cache", 386 dlm_mle_cache = kmem_cache_create("o2dlm_mle",
511 sizeof(struct dlm_master_list_entry), 387 sizeof(struct dlm_master_list_entry),
512 0, SLAB_HWCACHE_ALIGN, 388 0, SLAB_HWCACHE_ALIGN,
513 NULL); 389 NULL);
@@ -560,6 +436,35 @@ static void dlm_mle_release(struct kref *kref)
560 * LOCK RESOURCE FUNCTIONS 436 * LOCK RESOURCE FUNCTIONS
561 */ 437 */
562 438
439int dlm_init_master_caches(void)
440{
441 dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
442 sizeof(struct dlm_lock_resource),
443 0, SLAB_HWCACHE_ALIGN, NULL);
444 if (!dlm_lockres_cache)
445 goto bail;
446
447 dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
448 DLM_LOCKID_NAME_MAX, 0,
449 SLAB_HWCACHE_ALIGN, NULL);
450 if (!dlm_lockname_cache)
451 goto bail;
452
453 return 0;
454bail:
455 dlm_destroy_master_caches();
456 return -ENOMEM;
457}
458
459void dlm_destroy_master_caches(void)
460{
461 if (dlm_lockname_cache)
462 kmem_cache_destroy(dlm_lockname_cache);
463
464 if (dlm_lockres_cache)
465 kmem_cache_destroy(dlm_lockres_cache);
466}
467
563static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, 468static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
564 struct dlm_lock_resource *res, 469 struct dlm_lock_resource *res,
565 u8 owner) 470 u8 owner)
@@ -610,6 +515,14 @@ static void dlm_lockres_release(struct kref *kref)
610 mlog(0, "destroying lockres %.*s\n", res->lockname.len, 515 mlog(0, "destroying lockres %.*s\n", res->lockname.len,
611 res->lockname.name); 516 res->lockname.name);
612 517
518 if (!list_empty(&res->tracking))
519 list_del_init(&res->tracking);
520 else {
521 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
522 res->lockname.len, res->lockname.name);
523 dlm_print_one_lock_resource(res);
524 }
525
613 if (!hlist_unhashed(&res->hash_node) || 526 if (!hlist_unhashed(&res->hash_node) ||
614 !list_empty(&res->granted) || 527 !list_empty(&res->granted) ||
615 !list_empty(&res->converting) || 528 !list_empty(&res->converting) ||
@@ -642,9 +555,9 @@ static void dlm_lockres_release(struct kref *kref)
642 BUG_ON(!list_empty(&res->recovering)); 555 BUG_ON(!list_empty(&res->recovering));
643 BUG_ON(!list_empty(&res->purge)); 556 BUG_ON(!list_empty(&res->purge));
644 557
645 kfree(res->lockname.name); 558 kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
646 559
647 kfree(res); 560 kmem_cache_free(dlm_lockres_cache, res);
648} 561}
649 562
650void dlm_lockres_put(struct dlm_lock_resource *res) 563void dlm_lockres_put(struct dlm_lock_resource *res)
@@ -677,6 +590,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
677 INIT_LIST_HEAD(&res->dirty); 590 INIT_LIST_HEAD(&res->dirty);
678 INIT_LIST_HEAD(&res->recovering); 591 INIT_LIST_HEAD(&res->recovering);
679 INIT_LIST_HEAD(&res->purge); 592 INIT_LIST_HEAD(&res->purge);
593 INIT_LIST_HEAD(&res->tracking);
680 atomic_set(&res->asts_reserved, 0); 594 atomic_set(&res->asts_reserved, 0);
681 res->migration_pending = 0; 595 res->migration_pending = 0;
682 res->inflight_locks = 0; 596 res->inflight_locks = 0;
@@ -692,6 +606,8 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
692 606
693 res->last_used = 0; 607 res->last_used = 0;
694 608
609 list_add_tail(&res->tracking, &dlm->tracking_list);
610
695 memset(res->lvb, 0, DLM_LVB_LEN); 611 memset(res->lvb, 0, DLM_LVB_LEN);
696 memset(res->refmap, 0, sizeof(res->refmap)); 612 memset(res->refmap, 0, sizeof(res->refmap));
697} 613}
@@ -700,20 +616,28 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
700 const char *name, 616 const char *name,
701 unsigned int namelen) 617 unsigned int namelen)
702{ 618{
703 struct dlm_lock_resource *res; 619 struct dlm_lock_resource *res = NULL;
704 620
705 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS); 621 res = (struct dlm_lock_resource *)
622 kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
706 if (!res) 623 if (!res)
707 return NULL; 624 goto error;
708 625
709 res->lockname.name = kmalloc(namelen, GFP_NOFS); 626 res->lockname.name = (char *)
710 if (!res->lockname.name) { 627 kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
711 kfree(res); 628 if (!res->lockname.name)
712 return NULL; 629 goto error;
713 }
714 630
715 dlm_init_lockres(dlm, res, name, namelen); 631 dlm_init_lockres(dlm, res, name, namelen);
716 return res; 632 return res;
633
634error:
635 if (res && res->lockname.name)
636 kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
637
638 if (res)
639 kmem_cache_free(dlm_lockres_cache, res);
640 return NULL;
717} 641}
718 642
719void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, 643void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
@@ -1663,7 +1587,12 @@ way_up_top:
1663 dlm_put_mle(tmpmle); 1587 dlm_put_mle(tmpmle);
1664 } 1588 }
1665send_response: 1589send_response:
1666 1590 /*
1591 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1592 * The reference is released by dlm_assert_master_worker() under
1593 * the call to dlm_dispatch_assert_master(). If
1594 * dlm_assert_master_worker() isn't called, we drop it here.
1595 */
1667 if (dispatch_assert) { 1596 if (dispatch_assert) {
1668 if (response != DLM_MASTER_RESP_YES) 1597 if (response != DLM_MASTER_RESP_YES)
1669 mlog(ML_ERROR, "invalid response %d\n", response); 1598 mlog(ML_ERROR, "invalid response %d\n", response);
@@ -1678,7 +1607,11 @@ send_response:
1678 if (ret < 0) { 1607 if (ret < 0) {
1679 mlog(ML_ERROR, "failed to dispatch assert master work\n"); 1608 mlog(ML_ERROR, "failed to dispatch assert master work\n");
1680 response = DLM_MASTER_RESP_ERROR; 1609 response = DLM_MASTER_RESP_ERROR;
1610 dlm_lockres_put(res);
1681 } 1611 }
1612 } else {
1613 if (res)
1614 dlm_lockres_put(res);
1682 } 1615 }
1683 1616
1684 dlm_put(dlm); 1617 dlm_put(dlm);
@@ -1695,9 +1628,9 @@ send_response:
1695 * can periodically run all locks owned by this node 1628 * can periodically run all locks owned by this node
1696 * and re-assert across the cluster... 1629 * and re-assert across the cluster...
1697 */ 1630 */
1698int dlm_do_assert_master(struct dlm_ctxt *dlm, 1631static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1699 struct dlm_lock_resource *res, 1632 struct dlm_lock_resource *res,
1700 void *nodemap, u32 flags) 1633 void *nodemap, u32 flags)
1701{ 1634{
1702 struct dlm_assert_master assert; 1635 struct dlm_assert_master assert;
1703 int to, tmpret; 1636 int to, tmpret;
@@ -2348,7 +2281,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2348 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " 2281 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2349 "but it is already dropped!\n", dlm->name, 2282 "but it is already dropped!\n", dlm->name,
2350 res->lockname.len, res->lockname.name, node); 2283 res->lockname.len, res->lockname.name, node);
2351 __dlm_print_one_lock_resource(res); 2284 dlm_print_one_lock_resource(res);
2352 } 2285 }
2353 ret = 0; 2286 ret = 0;
2354 goto done; 2287 goto done;
@@ -2408,7 +2341,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2408 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " 2341 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2409 "but it is already dropped!\n", dlm->name, 2342 "but it is already dropped!\n", dlm->name,
2410 res->lockname.len, res->lockname.name, node); 2343 res->lockname.len, res->lockname.name, node);
2411 __dlm_print_one_lock_resource(res); 2344 dlm_print_one_lock_resource(res);
2412 } 2345 }
2413 2346
2414 dlm_lockres_put(res); 2347 dlm_lockres_put(res);
@@ -2933,6 +2866,9 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2933 dlm_lockres_clear_refmap_bit(lock->ml.node, res); 2866 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2934 list_del_init(&lock->list); 2867 list_del_init(&lock->list);
2935 dlm_lock_put(lock); 2868 dlm_lock_put(lock);
2869 /* In a normal unlock, we would have added a
2870 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2871 dlm_lock_put(lock);
2936 } 2872 }
2937 } 2873 }
2938 queue++; 2874 queue++;