aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2012-06-14 13:17:32 -0400
committerDavid Teigland <teigland@redhat.com>2012-07-16 15:18:01 -0400
commit05c32f47bfae74dabff05208957768078b53cc49 (patch)
tree71034eba054f49723a0dac41f6bcd9d4f37eb2bc /fs
parent1d7c484eeb167fc374294e38ae402de4097c8611 (diff)
dlm: fix race between remove and lookup
It was possible for a remove message on an old rsb to be sent after a lookup message on a new rsb, where the rsbs were for the same resource name. This could lead to a missing directory entry for the new rsb. It is fixed by keeping a copy of the resource name being removed until after the remove has been sent. A lookup checks if this in-progress remove matches the name it is looking up. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/dlm_internal.h13
-rw-r--r--fs/dlm/lock.c181
-rw-r--r--fs/dlm/lockspace.c21
3 files changed, 176 insertions, 39 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index a5f82d5b3946..9d3e485f88c8 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -498,6 +498,13 @@ struct rcom_lock {
498 char rl_lvb[0]; 498 char rl_lvb[0];
499}; 499};
500 500
501/*
502 * The max number of resources per rsbtbl bucket that shrink will attempt
503 * to remove in each iteration.
504 */
505
506#define DLM_REMOVE_NAMES_MAX 8
507
501struct dlm_ls { 508struct dlm_ls {
502 struct list_head ls_list; /* list of lockspaces */ 509 struct list_head ls_list; /* list of lockspaces */
503 dlm_lockspace_t *ls_local_handle; 510 dlm_lockspace_t *ls_local_handle;
@@ -531,6 +538,12 @@ struct dlm_ls {
531 int ls_new_rsb_count; 538 int ls_new_rsb_count;
532 struct list_head ls_new_rsb; /* new rsb structs */ 539 struct list_head ls_new_rsb; /* new rsb structs */
533 540
541 spinlock_t ls_remove_spin;
542 char ls_remove_name[DLM_RESNAME_MAXLEN+1];
543 char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
544 int ls_remove_len;
545 int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
546
534 struct list_head ls_nodes; /* current nodes in ls */ 547 struct list_head ls_nodes; /* current nodes in ls */
535 struct list_head ls_nodes_gone; /* dead node list, recovery */ 548 struct list_head ls_nodes_gone; /* dead node list, recovery */
536 int ls_num_nodes; /* number of nodes in ls */ 549 int ls_num_nodes; /* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index d9ee1b96549a..c7c6cf9e8685 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1624 return error; 1624 return error;
1625} 1625}
1626 1626
1627/* FIXME: make this more efficient */ 1627/* If there's an rsb for the same resource being removed, ensure
1628 that the remove message is sent before the new lookup message.
1629 It should be rare to need a delay here, but if not, then it may
1630 be worthwhile to add a proper wait mechanism rather than a delay. */
1628 1631
1629static int shrink_bucket(struct dlm_ls *ls, int b) 1632static void wait_pending_remove(struct dlm_rsb *r)
1630{ 1633{
1631 struct rb_node *n; 1634 struct dlm_ls *ls = r->res_ls;
1635 restart:
1636 spin_lock(&ls->ls_remove_spin);
1637 if (ls->ls_remove_len &&
1638 !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639 log_debug(ls, "delay lookup for remove dir %d %s",
1640 r->res_dir_nodeid, r->res_name);
1641 spin_unlock(&ls->ls_remove_spin);
1642 msleep(1);
1643 goto restart;
1644 }
1645 spin_unlock(&ls->ls_remove_spin);
1646}
1647
1648/*
1649 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650 * read by other threads in wait_pending_remove. ls_remove_names
1651 * and ls_remove_lens are only used by the scan thread, so they do
1652 * not need protection.
1653 */
1654
1655static void shrink_bucket(struct dlm_ls *ls, int b)
1656{
1657 struct rb_node *n, *next;
1632 struct dlm_rsb *r; 1658 struct dlm_rsb *r;
1659 char *name;
1633 int our_nodeid = dlm_our_nodeid(); 1660 int our_nodeid = dlm_our_nodeid();
1634 int count = 0, found; 1661 int remote_count = 0;
1662 int i, len, rv;
1635 1663
1636 for (;;) { 1664 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1637 found = 0;
1638 spin_lock(&ls->ls_rsbtbl[b].lock);
1639 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1640 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1641 1665
1642 /* If we're the directory record for this rsb, and 1666 spin_lock(&ls->ls_rsbtbl[b].lock);
1643 we're not the master of it, then we need to wait 1667 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1644 for the master node to send us a dir remove for 1668 next = rb_next(n);
1645 before removing the dir record. */ 1669 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1646 1670
1647 if (!dlm_no_directory(ls) && !is_master(r) && 1671 /* If we're the directory record for this rsb, and
1648 (dlm_dir_nodeid(r) == our_nodeid)) { 1672 we're not the master of it, then we need to wait
1649 continue; 1673 for the master node to send us a dir remove for
1650 } 1674 before removing the dir record. */
1651 1675
1652 if (!time_after_eq(jiffies, r->res_toss_time + 1676 if (!dlm_no_directory(ls) &&
1653 dlm_config.ci_toss_secs * HZ)) 1677 (r->res_master_nodeid != our_nodeid) &&
1654 continue; 1678 (dlm_dir_nodeid(r) == our_nodeid)) {
1655 found = 1; 1679 continue;
1656 break;
1657 } 1680 }
1658 1681
1659 if (!found) { 1682 if (!time_after_eq(jiffies, r->res_toss_time +
1660 spin_unlock(&ls->ls_rsbtbl[b].lock); 1683 dlm_config.ci_toss_secs * HZ)) {
1661 break; 1684 continue;
1662 } 1685 }
1663 1686
1664 if (kref_put(&r->res_ref, kill_rsb)) { 1687 if (!dlm_no_directory(ls) &&
1665 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1688 (r->res_master_nodeid == our_nodeid) &&
1666 spin_unlock(&ls->ls_rsbtbl[b].lock); 1689 (dlm_dir_nodeid(r) != our_nodeid)) {
1667 1690
1668 /* We're the master of this rsb but we're not 1691 /* We're the master of this rsb but we're not
1669 the directory record, so we need to tell the 1692 the directory record, so we need to tell the
1670 dir node to remove the dir record. */ 1693 dir node to remove the dir record. */
1671 1694
1672 if (!dlm_no_directory(ls) && is_master(r) && 1695 ls->ls_remove_lens[remote_count] = r->res_length;
1673 (dlm_dir_nodeid(r) != our_nodeid)) { 1696 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1674 send_remove(r); 1697 DLM_RESNAME_MAXLEN);
1675 } 1698 remote_count++;
1676 1699
1677 dlm_free_rsb(r); 1700 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1678 count++; 1701 break;
1679 } else { 1702 continue;
1680 spin_unlock(&ls->ls_rsbtbl[b].lock); 1703 }
1704
1705 if (!kref_put(&r->res_ref, kill_rsb)) {
1681 log_error(ls, "tossed rsb in use %s", r->res_name); 1706 log_error(ls, "tossed rsb in use %s", r->res_name);
1707 continue;
1682 } 1708 }
1709
1710 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711 dlm_free_rsb(r);
1683 } 1712 }
1713 spin_unlock(&ls->ls_rsbtbl[b].lock);
1684 1714
1685 return count; 1715 /*
1716 * While searching for rsb's to free, we found some that require
1717 * remote removal. We leave them in place and find them again here
1718 * so there is a very small gap between removing them from the toss
1719 * list and sending the removal. Keeping this gap small is
1720 * important to keep us (the master node) from being out of sync
1721 * with the remote dir node for very long.
1722 *
1723 * From the time the rsb is removed from toss until just after
1724 * send_remove, the rsb name is saved in ls_remove_name. A new
1725 * lookup checks this to ensure that a new lookup message for the
1726 * same resource name is not sent just before the remove message.
1727 */
1728
1729 for (i = 0; i < remote_count; i++) {
1730 name = ls->ls_remove_names[i];
1731 len = ls->ls_remove_lens[i];
1732
1733 spin_lock(&ls->ls_rsbtbl[b].lock);
1734 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735 if (rv) {
1736 spin_unlock(&ls->ls_rsbtbl[b].lock);
1737 log_debug(ls, "remove_name not toss %s", name);
1738 continue;
1739 }
1740
1741 if (r->res_master_nodeid != our_nodeid) {
1742 spin_unlock(&ls->ls_rsbtbl[b].lock);
1743 log_debug(ls, "remove_name master %d dir %d our %d %s",
1744 r->res_master_nodeid, r->res_dir_nodeid,
1745 our_nodeid, name);
1746 continue;
1747 }
1748
1749 if (r->res_dir_nodeid == our_nodeid) {
1750 /* should never happen */
1751 spin_unlock(&ls->ls_rsbtbl[b].lock);
1752 log_error(ls, "remove_name dir %d master %d our %d %s",
1753 r->res_dir_nodeid, r->res_master_nodeid,
1754 our_nodeid, name);
1755 continue;
1756 }
1757
1758 if (!time_after_eq(jiffies, r->res_toss_time +
1759 dlm_config.ci_toss_secs * HZ)) {
1760 spin_unlock(&ls->ls_rsbtbl[b].lock);
1761 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762 r->res_toss_time, jiffies, name);
1763 continue;
1764 }
1765
1766 if (!kref_put(&r->res_ref, kill_rsb)) {
1767 spin_unlock(&ls->ls_rsbtbl[b].lock);
1768 log_error(ls, "remove_name in use %s", name);
1769 continue;
1770 }
1771
1772 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774 /* block lookup of same name until we've sent remove */
1775 spin_lock(&ls->ls_remove_spin);
1776 ls->ls_remove_len = len;
1777 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778 spin_unlock(&ls->ls_remove_spin);
1779 spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781 send_remove(r);
1782
1783 /* allow lookup of name again */
1784 spin_lock(&ls->ls_remove_spin);
1785 ls->ls_remove_len = 0;
1786 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787 spin_unlock(&ls->ls_remove_spin);
1788
1789 dlm_free_rsb(r);
1790 }
1686} 1791}
1687 1792
1688void dlm_scan_rsbs(struct dlm_ls *ls) 1793void dlm_scan_rsbs(struct dlm_ls *ls)
@@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2608 return 0; 2713 return 0;
2609 } 2714 }
2610 2715
2716 wait_pending_remove(r);
2717
2611 r->res_first_lkid = lkb->lkb_id; 2718 r->res_first_lkid = lkb->lkb_id;
2612 send_lookup(r, lkb); 2719 send_lookup(r, lkb);
2613 return 1; 2720 return 1;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d4d3b3165c6c..952557d00ccd 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -506,6 +506,15 @@ static int new_lockspace(const char *name, const char *cluster,
506 spin_lock_init(&ls->ls_rsbtbl[i].lock); 506 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507 } 507 }
508 508
509 spin_lock_init(&ls->ls_remove_spin);
510
511 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
513 GFP_KERNEL);
514 if (!ls->ls_remove_names[i])
515 goto out_rsbtbl;
516 }
517
509 idr_init(&ls->ls_lkbidr); 518 idr_init(&ls->ls_lkbidr);
510 spin_lock_init(&ls->ls_lkbidr_spin); 519 spin_lock_init(&ls->ls_lkbidr_spin);
511 520
@@ -556,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster,
556 565
557 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); 566 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
558 if (!ls->ls_recover_buf) 567 if (!ls->ls_recover_buf)
559 goto out_lkbfree; 568 goto out_lkbidr;
560 569
561 ls->ls_slot = 0; 570 ls->ls_slot = 0;
562 ls->ls_num_slots = 0; 571 ls->ls_num_slots = 0;
@@ -640,8 +649,13 @@ static int new_lockspace(const char *name, const char *cluster,
640 spin_unlock(&lslist_lock); 649 spin_unlock(&lslist_lock);
641 idr_destroy(&ls->ls_recover_idr); 650 idr_destroy(&ls->ls_recover_idr);
642 kfree(ls->ls_recover_buf); 651 kfree(ls->ls_recover_buf);
643 out_lkbfree: 652 out_lkbidr:
644 idr_destroy(&ls->ls_lkbidr); 653 idr_destroy(&ls->ls_lkbidr);
654 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
655 if (ls->ls_remove_names[i])
656 kfree(ls->ls_remove_names[i]);
657 }
658 out_rsbtbl:
645 vfree(ls->ls_rsbtbl); 659 vfree(ls->ls_rsbtbl);
646 out_lsfree: 660 out_lsfree:
647 if (do_unreg) 661 if (do_unreg)
@@ -796,6 +810,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
796 810
797 vfree(ls->ls_rsbtbl); 811 vfree(ls->ls_rsbtbl);
798 812
813 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
814 kfree(ls->ls_remove_names[i]);
815
799 while (!list_empty(&ls->ls_new_rsb)) { 816 while (!list_empty(&ls->ls_new_rsb)) {
800 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 817 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
801 res_hashchain); 818 res_hashchain);