diff options
author | Lars Ellenberg <lars.ellenberg@linbit.com> | 2011-10-19 05:50:57 -0400 |
---|---|---|
committer | Philipp Reisner <philipp.reisner@linbit.com> | 2012-11-08 10:58:34 -0500 |
commit | 8c0785a5c9a0f2472aff68dc32247be01728c416 (patch) | |
tree | adb036acb283550aab1a1860bff454a86eb446d5 /drivers/block/drbd | |
parent | b379c41ed78e83c4443fca4dbfbc358c19e4f24c (diff) |
drbd: allow to dequeue batches of work at a time
cherry-picked and adapted from drbd 9 devel branch
In 8.4, we still use drbd_queue_work_front(),
so in normal operation, we can not dequeue batches,
but only single items.
Still, followup commits will wake the worker
without explicitly queueing a work item,
so up() is replaced by a simple wake_up().
Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd')
-rw-r--r-- | drivers/block/drbd/drbd_int.h | 8 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 2 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_worker.c | 88 |
3 files changed, 43 insertions, 55 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index d7ca76ce00cb..e84c7b6a6bac 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h | |||
@@ -735,8 +735,8 @@ enum bm_flag { | |||
735 | 735 | ||
736 | struct drbd_work_queue { | 736 | struct drbd_work_queue { |
737 | struct list_head q; | 737 | struct list_head q; |
738 | struct semaphore s; /* producers up it, worker down()s it */ | ||
739 | spinlock_t q_lock; /* to protect the list. */ | 738 | spinlock_t q_lock; /* to protect the list. */ |
739 | wait_queue_head_t q_wait; | ||
740 | }; | 740 | }; |
741 | 741 | ||
742 | struct drbd_socket { | 742 | struct drbd_socket { |
@@ -1832,9 +1832,8 @@ drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w) | |||
1832 | unsigned long flags; | 1832 | unsigned long flags; |
1833 | spin_lock_irqsave(&q->q_lock, flags); | 1833 | spin_lock_irqsave(&q->q_lock, flags); |
1834 | list_add(&w->list, &q->q); | 1834 | list_add(&w->list, &q->q); |
1835 | up(&q->s); /* within the spinlock, | ||
1836 | see comment near end of drbd_worker() */ | ||
1837 | spin_unlock_irqrestore(&q->q_lock, flags); | 1835 | spin_unlock_irqrestore(&q->q_lock, flags); |
1836 | wake_up(&q->q_wait); | ||
1838 | } | 1837 | } |
1839 | 1838 | ||
1840 | static inline void | 1839 | static inline void |
@@ -1843,9 +1842,8 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w) | |||
1843 | unsigned long flags; | 1842 | unsigned long flags; |
1844 | spin_lock_irqsave(&q->q_lock, flags); | 1843 | spin_lock_irqsave(&q->q_lock, flags); |
1845 | list_add_tail(&w->list, &q->q); | 1844 | list_add_tail(&w->list, &q->q); |
1846 | up(&q->s); /* within the spinlock, | ||
1847 | see comment near end of drbd_worker() */ | ||
1848 | spin_unlock_irqrestore(&q->q_lock, flags); | 1845 | spin_unlock_irqrestore(&q->q_lock, flags); |
1846 | wake_up(&q->q_wait); | ||
1849 | } | 1847 | } |
1850 | 1848 | ||
1851 | static inline void wake_asender(struct drbd_tconn *tconn) | 1849 | static inline void wake_asender(struct drbd_tconn *tconn) |
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index bfe6975ef94c..f379d33b10a4 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c | |||
@@ -2535,9 +2535,9 @@ out: | |||
2535 | 2535 | ||
2536 | static void drbd_init_workqueue(struct drbd_work_queue* wq) | 2536 | static void drbd_init_workqueue(struct drbd_work_queue* wq) |
2537 | { | 2537 | { |
2538 | sema_init(&wq->s, 0); | ||
2539 | spin_lock_init(&wq->q_lock); | 2538 | spin_lock_init(&wq->q_lock); |
2540 | INIT_LIST_HEAD(&wq->q); | 2539 | INIT_LIST_HEAD(&wq->q); |
2540 | init_waitqueue_head(&wq->q_wait); | ||
2541 | } | 2541 | } |
2542 | 2542 | ||
2543 | struct drbd_tconn *conn_get_by_name(const char *name) | 2543 | struct drbd_tconn *conn_get_by_name(const char *name) |
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index d7573f4b7421..fb2e6c8d45c9 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c | |||
@@ -1673,6 +1673,23 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) | |||
1673 | mutex_unlock(mdev->state_mutex); | 1673 | mutex_unlock(mdev->state_mutex); |
1674 | } | 1674 | } |
1675 | 1675 | ||
1676 | bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) | ||
1677 | { | ||
1678 | spin_lock_irq(&queue->q_lock); | ||
1679 | list_splice_init(&queue->q, work_list); | ||
1680 | spin_unlock_irq(&queue->q_lock); | ||
1681 | return !list_empty(work_list); | ||
1682 | } | ||
1683 | |||
1684 | bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list) | ||
1685 | { | ||
1686 | spin_lock_irq(&queue->q_lock); | ||
1687 | if (!list_empty(&queue->q)) | ||
1688 | list_move(queue->q.next, work_list); | ||
1689 | spin_unlock_irq(&queue->q_lock); | ||
1690 | return !list_empty(work_list); | ||
1691 | } | ||
1692 | |||
1676 | int drbd_worker(struct drbd_thread *thi) | 1693 | int drbd_worker(struct drbd_thread *thi) |
1677 | { | 1694 | { |
1678 | struct drbd_tconn *tconn = thi->tconn; | 1695 | struct drbd_tconn *tconn = thi->tconn; |
@@ -1680,15 +1697,21 @@ int drbd_worker(struct drbd_thread *thi) | |||
1680 | struct drbd_conf *mdev; | 1697 | struct drbd_conf *mdev; |
1681 | struct net_conf *nc; | 1698 | struct net_conf *nc; |
1682 | LIST_HEAD(work_list); | 1699 | LIST_HEAD(work_list); |
1683 | int vnr, intr = 0; | 1700 | int vnr; |
1684 | int cork; | 1701 | int cork; |
1685 | 1702 | ||
1686 | while (get_t_state(thi) == RUNNING) { | 1703 | while (get_t_state(thi) == RUNNING) { |
1687 | drbd_thread_current_set_cpu(thi); | 1704 | drbd_thread_current_set_cpu(thi); |
1688 | 1705 | ||
1689 | if (down_trylock(&tconn->data.work.s)) { | 1706 | /* as long as we use drbd_queue_work_front(), |
1690 | mutex_lock(&tconn->data.mutex); | 1707 | * we may only dequeue single work items here, not batches. */ |
1708 | if (list_empty(&work_list)) | ||
1709 | dequeue_work_item(&tconn->data.work, &work_list); | ||
1691 | 1710 | ||
1711 | /* Still nothing to do? Poke TCP, just in case, | ||
1712 | * then wait for new work (or signal). */ | ||
1713 | if (list_empty(&work_list)) { | ||
1714 | mutex_lock(&tconn->data.mutex); | ||
1692 | rcu_read_lock(); | 1715 | rcu_read_lock(); |
1693 | nc = rcu_dereference(tconn->net_conf); | 1716 | nc = rcu_dereference(tconn->net_conf); |
1694 | cork = nc ? nc->tcp_cork : 0; | 1717 | cork = nc ? nc->tcp_cork : 0; |
@@ -1698,15 +1721,16 @@ int drbd_worker(struct drbd_thread *thi) | |||
1698 | drbd_tcp_uncork(tconn->data.socket); | 1721 | drbd_tcp_uncork(tconn->data.socket); |
1699 | mutex_unlock(&tconn->data.mutex); | 1722 | mutex_unlock(&tconn->data.mutex); |
1700 | 1723 | ||
1701 | intr = down_interruptible(&tconn->data.work.s); | 1724 | wait_event_interruptible(tconn->data.work.q_wait, |
1725 | dequeue_work_item(&tconn->data.work, &work_list)); | ||
1702 | 1726 | ||
1703 | mutex_lock(&tconn->data.mutex); | 1727 | mutex_lock(&tconn->data.mutex); |
1704 | if (tconn->data.socket && cork) | 1728 | if (tconn->data.socket && cork) |
1705 | drbd_tcp_cork(tconn->data.socket); | 1729 | drbd_tcp_cork(tconn->data.socket); |
1706 | mutex_unlock(&tconn->data.mutex); | 1730 | mutex_unlock(&tconn->data.mutex); |
1707 | } | 1731 | } |
1708 | 1732 | ||
1709 | if (intr) { | 1733 | if (signal_pending(current)) { |
1710 | flush_signals(current); | 1734 | flush_signals(current); |
1711 | if (get_t_state(thi) == RUNNING) { | 1735 | if (get_t_state(thi) == RUNNING) { |
1712 | conn_warn(tconn, "Worker got an unexpected signal\n"); | 1736 | conn_warn(tconn, "Worker got an unexpected signal\n"); |
@@ -1717,59 +1741,25 @@ int drbd_worker(struct drbd_thread *thi) | |||
1717 | 1741 | ||
1718 | if (get_t_state(thi) != RUNNING) | 1742 | if (get_t_state(thi) != RUNNING) |
1719 | break; | 1743 | break; |
1720 | /* With this break, we have done a down() but not consumed | ||
1721 | the entry from the list. The cleanup code takes care of | ||
1722 | this... */ | ||
1723 | |||
1724 | w = NULL; | ||
1725 | spin_lock_irq(&tconn->data.work.q_lock); | ||
1726 | if (list_empty(&tconn->data.work.q)) { | ||
1727 | /* something terribly wrong in our logic. | ||
1728 | * we were able to down() the semaphore, | ||
1729 | * but the list is empty... doh. | ||
1730 | * | ||
1731 | * what is the best thing to do now? | ||
1732 | * try again from scratch, restarting the receiver, | ||
1733 | * asender, whatnot? could break even more ugly, | ||
1734 | * e.g. when we are primary, but no good local data. | ||
1735 | * | ||
1736 | * I'll try to get away just starting over this loop. | ||
1737 | */ | ||
1738 | conn_warn(tconn, "Work list unexpectedly empty\n"); | ||
1739 | spin_unlock_irq(&tconn->data.work.q_lock); | ||
1740 | continue; | ||
1741 | } | ||
1742 | w = list_entry(tconn->data.work.q.next, struct drbd_work, list); | ||
1743 | list_del_init(&w->list); | ||
1744 | spin_unlock_irq(&tconn->data.work.q_lock); | ||
1745 | 1744 | ||
1746 | if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) { | 1745 | while (!list_empty(&work_list)) { |
1747 | /* dev_warn(DEV, "worker: a callback failed! \n"); */ | 1746 | w = list_first_entry(&work_list, struct drbd_work, list); |
1747 | list_del_init(&w->list); | ||
1748 | if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0) | ||
1749 | continue; | ||
1748 | if (tconn->cstate >= C_WF_REPORT_PARAMS) | 1750 | if (tconn->cstate >= C_WF_REPORT_PARAMS) |
1749 | conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD); | 1751 | conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD); |
1750 | } | 1752 | } |
1751 | } | 1753 | } |
1752 | 1754 | ||
1753 | spin_lock_irq(&tconn->data.work.q_lock); | 1755 | do { |
1754 | while (!list_empty(&tconn->data.work.q)) { | ||
1755 | list_splice_init(&tconn->data.work.q, &work_list); | ||
1756 | spin_unlock_irq(&tconn->data.work.q_lock); | ||
1757 | |||
1758 | while (!list_empty(&work_list)) { | 1756 | while (!list_empty(&work_list)) { |
1759 | w = list_entry(work_list.next, struct drbd_work, list); | 1757 | w = list_first_entry(&work_list, struct drbd_work, list); |
1760 | list_del_init(&w->list); | 1758 | list_del_init(&w->list); |
1761 | w->cb(w, 1); | 1759 | w->cb(w, 1); |
1762 | } | 1760 | } |
1763 | 1761 | dequeue_work_batch(&tconn->data.work, &work_list); | |
1764 | spin_lock_irq(&tconn->data.work.q_lock); | 1762 | } while (!list_empty(&work_list)); |
1765 | } | ||
1766 | sema_init(&tconn->data.work.s, 0); | ||
1767 | /* DANGEROUS race: if someone did queue his work within the spinlock, | ||
1768 | * but up() ed outside the spinlock, we could get an up() on the | ||
1769 | * semaphore without corresponding list entry. | ||
1770 | * So don't do that. | ||
1771 | */ | ||
1772 | spin_unlock_irq(&tconn->data.work.q_lock); | ||
1773 | 1763 | ||
1774 | rcu_read_lock(); | 1764 | rcu_read_lock(); |
1775 | idr_for_each_entry(&tconn->volumes, mdev, vnr) { | 1765 | idr_for_each_entry(&tconn->volumes, mdev, vnr) { |