diff options
author | David Teigland <teigland@redhat.com> | 2012-06-14 13:17:32 -0400 |
---|---|---|
committer | David Teigland <teigland@redhat.com> | 2012-07-16 15:18:01 -0400 |
commit | 05c32f47bfae74dabff05208957768078b53cc49 (patch) | |
tree | 71034eba054f49723a0dac41f6bcd9d4f37eb2bc /fs/dlm | |
parent | 1d7c484eeb167fc374294e38ae402de4097c8611 (diff) |
dlm: fix race between remove and lookup
It was possible for a remove message on an old
rsb to be sent after a lookup message on a new
rsb, where the rsbs were for the same resource
name. This could lead to a missing directory
entry for the new rsb.
It is fixed by keeping a copy of the resource
name being removed until after the remove has
been sent. A lookup checks if this in-progress
remove matches the name it is looking up.
Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/dlm_internal.h | 13 | ||||
-rw-r--r-- | fs/dlm/lock.c | 181 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 21 |
3 files changed, 176 insertions, 39 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index a5f82d5b3946..9d3e485f88c8 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h | |||
@@ -498,6 +498,13 @@ struct rcom_lock { | |||
498 | char rl_lvb[0]; | 498 | char rl_lvb[0]; |
499 | }; | 499 | }; |
500 | 500 | ||
501 | /* | ||
502 | * The max number of resources per rsbtbl bucket that shrink will attempt | ||
503 | * to remove in each iteration. | ||
504 | */ | ||
505 | |||
506 | #define DLM_REMOVE_NAMES_MAX 8 | ||
507 | |||
501 | struct dlm_ls { | 508 | struct dlm_ls { |
502 | struct list_head ls_list; /* list of lockspaces */ | 509 | struct list_head ls_list; /* list of lockspaces */ |
503 | dlm_lockspace_t *ls_local_handle; | 510 | dlm_lockspace_t *ls_local_handle; |
@@ -531,6 +538,12 @@ struct dlm_ls { | |||
531 | int ls_new_rsb_count; | 538 | int ls_new_rsb_count; |
532 | struct list_head ls_new_rsb; /* new rsb structs */ | 539 | struct list_head ls_new_rsb; /* new rsb structs */ |
533 | 540 | ||
541 | spinlock_t ls_remove_spin; | ||
542 | char ls_remove_name[DLM_RESNAME_MAXLEN+1]; | ||
543 | char *ls_remove_names[DLM_REMOVE_NAMES_MAX]; | ||
544 | int ls_remove_len; | ||
545 | int ls_remove_lens[DLM_REMOVE_NAMES_MAX]; | ||
546 | |||
534 | struct list_head ls_nodes; /* current nodes in ls */ | 547 | struct list_head ls_nodes; /* current nodes in ls */ |
535 | struct list_head ls_nodes_gone; /* dead node list, recovery */ | 548 | struct list_head ls_nodes_gone; /* dead node list, recovery */ |
536 | int ls_num_nodes; /* number of nodes in ls */ | 549 | int ls_num_nodes; /* number of nodes in ls */ |
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index d9ee1b96549a..c7c6cf9e8685 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c | |||
@@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) | |||
1624 | return error; | 1624 | return error; |
1625 | } | 1625 | } |
1626 | 1626 | ||
1627 | /* FIXME: make this more efficient */ | 1627 | /* If there's an rsb for the same resource being removed, ensure |
1628 | that the remove message is sent before the new lookup message. | ||
1629 | It should be rare to need a delay here, but if not, then it may | ||
1630 | be worthwhile to add a proper wait mechanism rather than a delay. */ | ||
1628 | 1631 | ||
1629 | static int shrink_bucket(struct dlm_ls *ls, int b) | 1632 | static void wait_pending_remove(struct dlm_rsb *r) |
1630 | { | 1633 | { |
1631 | struct rb_node *n; | 1634 | struct dlm_ls *ls = r->res_ls; |
1635 | restart: | ||
1636 | spin_lock(&ls->ls_remove_spin); | ||
1637 | if (ls->ls_remove_len && | ||
1638 | !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { | ||
1639 | log_debug(ls, "delay lookup for remove dir %d %s", | ||
1640 | r->res_dir_nodeid, r->res_name); | ||
1641 | spin_unlock(&ls->ls_remove_spin); | ||
1642 | msleep(1); | ||
1643 | goto restart; | ||
1644 | } | ||
1645 | spin_unlock(&ls->ls_remove_spin); | ||
1646 | } | ||
1647 | |||
1648 | /* | ||
1649 | * ls_remove_spin protects ls_remove_name and ls_remove_len which are | ||
1650 | * read by other threads in wait_pending_remove. ls_remove_names | ||
1651 | * and ls_remove_lens are only used by the scan thread, so they do | ||
1652 | * not need protection. | ||
1653 | */ | ||
1654 | |||
1655 | static void shrink_bucket(struct dlm_ls *ls, int b) | ||
1656 | { | ||
1657 | struct rb_node *n, *next; | ||
1632 | struct dlm_rsb *r; | 1658 | struct dlm_rsb *r; |
1659 | char *name; | ||
1633 | int our_nodeid = dlm_our_nodeid(); | 1660 | int our_nodeid = dlm_our_nodeid(); |
1634 | int count = 0, found; | 1661 | int remote_count = 0; |
1662 | int i, len, rv; | ||
1635 | 1663 | ||
1636 | for (;;) { | 1664 | memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); |
1637 | found = 0; | ||
1638 | spin_lock(&ls->ls_rsbtbl[b].lock); | ||
1639 | for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { | ||
1640 | r = rb_entry(n, struct dlm_rsb, res_hashnode); | ||
1641 | 1665 | ||
1642 | /* If we're the directory record for this rsb, and | 1666 | spin_lock(&ls->ls_rsbtbl[b].lock); |
1643 | we're not the master of it, then we need to wait | 1667 | for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { |
1644 | for the master node to send us a dir remove for | 1668 | next = rb_next(n); |
1645 | before removing the dir record. */ | 1669 | r = rb_entry(n, struct dlm_rsb, res_hashnode); |
1646 | 1670 | ||
1647 | if (!dlm_no_directory(ls) && !is_master(r) && | 1671 | /* If we're the directory record for this rsb, and |
1648 | (dlm_dir_nodeid(r) == our_nodeid)) { | 1672 | we're not the master of it, then we need to wait |
1649 | continue; | 1673 | for the master node to send us a dir remove for |
1650 | } | 1674 | before removing the dir record. */ |
1651 | 1675 | ||
1652 | if (!time_after_eq(jiffies, r->res_toss_time + | 1676 | if (!dlm_no_directory(ls) && |
1653 | dlm_config.ci_toss_secs * HZ)) | 1677 | (r->res_master_nodeid != our_nodeid) && |
1654 | continue; | 1678 | (dlm_dir_nodeid(r) == our_nodeid)) { |
1655 | found = 1; | 1679 | continue; |
1656 | break; | ||
1657 | } | 1680 | } |
1658 | 1681 | ||
1659 | if (!found) { | 1682 | if (!time_after_eq(jiffies, r->res_toss_time + |
1660 | spin_unlock(&ls->ls_rsbtbl[b].lock); | 1683 | dlm_config.ci_toss_secs * HZ)) { |
1661 | break; | 1684 | continue; |
1662 | } | 1685 | } |
1663 | 1686 | ||
1664 | if (kref_put(&r->res_ref, kill_rsb)) { | 1687 | if (!dlm_no_directory(ls) && |
1665 | rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); | 1688 | (r->res_master_nodeid == our_nodeid) && |
1666 | spin_unlock(&ls->ls_rsbtbl[b].lock); | 1689 | (dlm_dir_nodeid(r) != our_nodeid)) { |
1667 | 1690 | ||
1668 | /* We're the master of this rsb but we're not | 1691 | /* We're the master of this rsb but we're not |
1669 | the directory record, so we need to tell the | 1692 | the directory record, so we need to tell the |
1670 | dir node to remove the dir record. */ | 1693 | dir node to remove the dir record. */ |
1671 | 1694 | ||
1672 | if (!dlm_no_directory(ls) && is_master(r) && | 1695 | ls->ls_remove_lens[remote_count] = r->res_length; |
1673 | (dlm_dir_nodeid(r) != our_nodeid)) { | 1696 | memcpy(ls->ls_remove_names[remote_count], r->res_name, |
1674 | send_remove(r); | 1697 | DLM_RESNAME_MAXLEN); |
1675 | } | 1698 | remote_count++; |
1676 | 1699 | ||
1677 | dlm_free_rsb(r); | 1700 | if (remote_count >= DLM_REMOVE_NAMES_MAX) |
1678 | count++; | 1701 | break; |
1679 | } else { | 1702 | continue; |
1680 | spin_unlock(&ls->ls_rsbtbl[b].lock); | 1703 | } |
1704 | |||
1705 | if (!kref_put(&r->res_ref, kill_rsb)) { | ||
1681 | log_error(ls, "tossed rsb in use %s", r->res_name); | 1706 | log_error(ls, "tossed rsb in use %s", r->res_name); |
1707 | continue; | ||
1682 | } | 1708 | } |
1709 | |||
1710 | rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); | ||
1711 | dlm_free_rsb(r); | ||
1683 | } | 1712 | } |
1713 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1684 | 1714 | ||
1685 | return count; | 1715 | /* |
1716 | * While searching for rsb's to free, we found some that require | ||
1717 | * remote removal. We leave them in place and find them again here | ||
1718 | * so there is a very small gap between removing them from the toss | ||
1719 | * list and sending the removal. Keeping this gap small is | ||
1720 | * important to keep us (the master node) from being out of sync | ||
1721 | * with the remote dir node for very long. | ||
1722 | * | ||
1723 | * From the time the rsb is removed from toss until just after | ||
1724 | * send_remove, the rsb name is saved in ls_remove_name. A new | ||
1725 | * lookup checks this to ensure that a new lookup message for the | ||
1726 | * same resource name is not sent just before the remove message. | ||
1727 | */ | ||
1728 | |||
1729 | for (i = 0; i < remote_count; i++) { | ||
1730 | name = ls->ls_remove_names[i]; | ||
1731 | len = ls->ls_remove_lens[i]; | ||
1732 | |||
1733 | spin_lock(&ls->ls_rsbtbl[b].lock); | ||
1734 | rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); | ||
1735 | if (rv) { | ||
1736 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1737 | log_debug(ls, "remove_name not toss %s", name); | ||
1738 | continue; | ||
1739 | } | ||
1740 | |||
1741 | if (r->res_master_nodeid != our_nodeid) { | ||
1742 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1743 | log_debug(ls, "remove_name master %d dir %d our %d %s", | ||
1744 | r->res_master_nodeid, r->res_dir_nodeid, | ||
1745 | our_nodeid, name); | ||
1746 | continue; | ||
1747 | } | ||
1748 | |||
1749 | if (r->res_dir_nodeid == our_nodeid) { | ||
1750 | /* should never happen */ | ||
1751 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1752 | log_error(ls, "remove_name dir %d master %d our %d %s", | ||
1753 | r->res_dir_nodeid, r->res_master_nodeid, | ||
1754 | our_nodeid, name); | ||
1755 | continue; | ||
1756 | } | ||
1757 | |||
1758 | if (!time_after_eq(jiffies, r->res_toss_time + | ||
1759 | dlm_config.ci_toss_secs * HZ)) { | ||
1760 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1761 | log_debug(ls, "remove_name toss_time %lu now %lu %s", | ||
1762 | r->res_toss_time, jiffies, name); | ||
1763 | continue; | ||
1764 | } | ||
1765 | |||
1766 | if (!kref_put(&r->res_ref, kill_rsb)) { | ||
1767 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1768 | log_error(ls, "remove_name in use %s", name); | ||
1769 | continue; | ||
1770 | } | ||
1771 | |||
1772 | rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); | ||
1773 | |||
1774 | /* block lookup of same name until we've sent remove */ | ||
1775 | spin_lock(&ls->ls_remove_spin); | ||
1776 | ls->ls_remove_len = len; | ||
1777 | memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); | ||
1778 | spin_unlock(&ls->ls_remove_spin); | ||
1779 | spin_unlock(&ls->ls_rsbtbl[b].lock); | ||
1780 | |||
1781 | send_remove(r); | ||
1782 | |||
1783 | /* allow lookup of name again */ | ||
1784 | spin_lock(&ls->ls_remove_spin); | ||
1785 | ls->ls_remove_len = 0; | ||
1786 | memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); | ||
1787 | spin_unlock(&ls->ls_remove_spin); | ||
1788 | |||
1789 | dlm_free_rsb(r); | ||
1790 | } | ||
1686 | } | 1791 | } |
1687 | 1792 | ||
1688 | void dlm_scan_rsbs(struct dlm_ls *ls) | 1793 | void dlm_scan_rsbs(struct dlm_ls *ls) |
@@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) | |||
2608 | return 0; | 2713 | return 0; |
2609 | } | 2714 | } |
2610 | 2715 | ||
2716 | wait_pending_remove(r); | ||
2717 | |||
2611 | r->res_first_lkid = lkb->lkb_id; | 2718 | r->res_first_lkid = lkb->lkb_id; |
2612 | send_lookup(r, lkb); | 2719 | send_lookup(r, lkb); |
2613 | return 1; | 2720 | return 1; |
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index d4d3b3165c6c..952557d00ccd 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c | |||
@@ -506,6 +506,15 @@ static int new_lockspace(const char *name, const char *cluster, | |||
506 | spin_lock_init(&ls->ls_rsbtbl[i].lock); | 506 | spin_lock_init(&ls->ls_rsbtbl[i].lock); |
507 | } | 507 | } |
508 | 508 | ||
509 | spin_lock_init(&ls->ls_remove_spin); | ||
510 | |||
511 | for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { | ||
512 | ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, | ||
513 | GFP_KERNEL); | ||
514 | if (!ls->ls_remove_names[i]) | ||
515 | goto out_rsbtbl; | ||
516 | } | ||
517 | |||
509 | idr_init(&ls->ls_lkbidr); | 518 | idr_init(&ls->ls_lkbidr); |
510 | spin_lock_init(&ls->ls_lkbidr_spin); | 519 | spin_lock_init(&ls->ls_lkbidr_spin); |
511 | 520 | ||
@@ -556,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster, | |||
556 | 565 | ||
557 | ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); | 566 | ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); |
558 | if (!ls->ls_recover_buf) | 567 | if (!ls->ls_recover_buf) |
559 | goto out_lkbfree; | 568 | goto out_lkbidr; |
560 | 569 | ||
561 | ls->ls_slot = 0; | 570 | ls->ls_slot = 0; |
562 | ls->ls_num_slots = 0; | 571 | ls->ls_num_slots = 0; |
@@ -640,8 +649,13 @@ static int new_lockspace(const char *name, const char *cluster, | |||
640 | spin_unlock(&lslist_lock); | 649 | spin_unlock(&lslist_lock); |
641 | idr_destroy(&ls->ls_recover_idr); | 650 | idr_destroy(&ls->ls_recover_idr); |
642 | kfree(ls->ls_recover_buf); | 651 | kfree(ls->ls_recover_buf); |
643 | out_lkbfree: | 652 | out_lkbidr: |
644 | idr_destroy(&ls->ls_lkbidr); | 653 | idr_destroy(&ls->ls_lkbidr); |
654 | for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { | ||
655 | if (ls->ls_remove_names[i]) | ||
656 | kfree(ls->ls_remove_names[i]); | ||
657 | } | ||
658 | out_rsbtbl: | ||
645 | vfree(ls->ls_rsbtbl); | 659 | vfree(ls->ls_rsbtbl); |
646 | out_lsfree: | 660 | out_lsfree: |
647 | if (do_unreg) | 661 | if (do_unreg) |
@@ -796,6 +810,9 @@ static int release_lockspace(struct dlm_ls *ls, int force) | |||
796 | 810 | ||
797 | vfree(ls->ls_rsbtbl); | 811 | vfree(ls->ls_rsbtbl); |
798 | 812 | ||
813 | for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) | ||
814 | kfree(ls->ls_remove_names[i]); | ||
815 | |||
799 | while (!list_empty(&ls->ls_new_rsb)) { | 816 | while (!list_empty(&ls->ls_new_rsb)) { |
800 | rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, | 817 | rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, |
801 | res_hashchain); | 818 | res_hashchain); |