aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2011-10-19 05:50:57 -0400
committerPhilipp Reisner <philipp.reisner@linbit.com>2012-11-08 10:58:34 -0500
commit8c0785a5c9a0f2472aff68dc32247be01728c416 (patch)
treeadb036acb283550aab1a1860bff454a86eb446d5 /drivers/block/drbd
parentb379c41ed78e83c4443fca4dbfbc358c19e4f24c (diff)
drbd: allow to dequeue batches of work at a time
cherry-picked and adapted from drbd 9 devel branch In 8.4, we still use drbd_queue_work_front(), so in normal operation, we can not dequeue batches, but only single items. Still, followup commits will wake the worker without explicitly queueing a work item, so up() is replaced by a simple wake_up(). Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd')
-rw-r--r--drivers/block/drbd/drbd_int.h8
-rw-r--r--drivers/block/drbd/drbd_main.c2
-rw-r--r--drivers/block/drbd/drbd_worker.c88
3 files changed, 43 insertions, 55 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index d7ca76ce00cb..e84c7b6a6bac 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -735,8 +735,8 @@ enum bm_flag {
735 735
736struct drbd_work_queue { 736struct drbd_work_queue {
737 struct list_head q; 737 struct list_head q;
738 struct semaphore s; /* producers up it, worker down()s it */
739 spinlock_t q_lock; /* to protect the list. */ 738 spinlock_t q_lock; /* to protect the list. */
739 wait_queue_head_t q_wait;
740}; 740};
741 741
742struct drbd_socket { 742struct drbd_socket {
@@ -1832,9 +1832,8 @@ drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
1832 unsigned long flags; 1832 unsigned long flags;
1833 spin_lock_irqsave(&q->q_lock, flags); 1833 spin_lock_irqsave(&q->q_lock, flags);
1834 list_add(&w->list, &q->q); 1834 list_add(&w->list, &q->q);
1835 up(&q->s); /* within the spinlock,
1836 see comment near end of drbd_worker() */
1837 spin_unlock_irqrestore(&q->q_lock, flags); 1835 spin_unlock_irqrestore(&q->q_lock, flags);
1836 wake_up(&q->q_wait);
1838} 1837}
1839 1838
1840static inline void 1839static inline void
@@ -1843,9 +1842,8 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
1843 unsigned long flags; 1842 unsigned long flags;
1844 spin_lock_irqsave(&q->q_lock, flags); 1843 spin_lock_irqsave(&q->q_lock, flags);
1845 list_add_tail(&w->list, &q->q); 1844 list_add_tail(&w->list, &q->q);
1846 up(&q->s); /* within the spinlock,
1847 see comment near end of drbd_worker() */
1848 spin_unlock_irqrestore(&q->q_lock, flags); 1845 spin_unlock_irqrestore(&q->q_lock, flags);
1846 wake_up(&q->q_wait);
1849} 1847}
1850 1848
1851static inline void wake_asender(struct drbd_tconn *tconn) 1849static inline void wake_asender(struct drbd_tconn *tconn)
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index bfe6975ef94c..f379d33b10a4 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -2535,9 +2535,9 @@ out:
2535 2535
2536static void drbd_init_workqueue(struct drbd_work_queue* wq) 2536static void drbd_init_workqueue(struct drbd_work_queue* wq)
2537{ 2537{
2538 sema_init(&wq->s, 0);
2539 spin_lock_init(&wq->q_lock); 2538 spin_lock_init(&wq->q_lock);
2540 INIT_LIST_HEAD(&wq->q); 2539 INIT_LIST_HEAD(&wq->q);
2540 init_waitqueue_head(&wq->q_wait);
2541} 2541}
2542 2542
2543struct drbd_tconn *conn_get_by_name(const char *name) 2543struct drbd_tconn *conn_get_by_name(const char *name)
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index d7573f4b7421..fb2e6c8d45c9 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1673,6 +1673,23 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1673 mutex_unlock(mdev->state_mutex); 1673 mutex_unlock(mdev->state_mutex);
1674} 1674}
1675 1675
1676bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1677{
1678 spin_lock_irq(&queue->q_lock);
1679 list_splice_init(&queue->q, work_list);
1680 spin_unlock_irq(&queue->q_lock);
1681 return !list_empty(work_list);
1682}
1683
1684bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1685{
1686 spin_lock_irq(&queue->q_lock);
1687 if (!list_empty(&queue->q))
1688 list_move(queue->q.next, work_list);
1689 spin_unlock_irq(&queue->q_lock);
1690 return !list_empty(work_list);
1691}
1692
1676int drbd_worker(struct drbd_thread *thi) 1693int drbd_worker(struct drbd_thread *thi)
1677{ 1694{
1678 struct drbd_tconn *tconn = thi->tconn; 1695 struct drbd_tconn *tconn = thi->tconn;
@@ -1680,15 +1697,21 @@ int drbd_worker(struct drbd_thread *thi)
1680 struct drbd_conf *mdev; 1697 struct drbd_conf *mdev;
1681 struct net_conf *nc; 1698 struct net_conf *nc;
1682 LIST_HEAD(work_list); 1699 LIST_HEAD(work_list);
1683 int vnr, intr = 0; 1700 int vnr;
1684 int cork; 1701 int cork;
1685 1702
1686 while (get_t_state(thi) == RUNNING) { 1703 while (get_t_state(thi) == RUNNING) {
1687 drbd_thread_current_set_cpu(thi); 1704 drbd_thread_current_set_cpu(thi);
1688 1705
1689 if (down_trylock(&tconn->data.work.s)) { 1706 /* as long as we use drbd_queue_work_front(),
1690 mutex_lock(&tconn->data.mutex); 1707 * we may only dequeue single work items here, not batches. */
1708 if (list_empty(&work_list))
1709 dequeue_work_item(&tconn->data.work, &work_list);
1691 1710
1711 /* Still nothing to do? Poke TCP, just in case,
1712 * then wait for new work (or signal). */
1713 if (list_empty(&work_list)) {
1714 mutex_lock(&tconn->data.mutex);
1692 rcu_read_lock(); 1715 rcu_read_lock();
1693 nc = rcu_dereference(tconn->net_conf); 1716 nc = rcu_dereference(tconn->net_conf);
1694 cork = nc ? nc->tcp_cork : 0; 1717 cork = nc ? nc->tcp_cork : 0;
@@ -1698,15 +1721,16 @@ int drbd_worker(struct drbd_thread *thi)
1698 drbd_tcp_uncork(tconn->data.socket); 1721 drbd_tcp_uncork(tconn->data.socket);
1699 mutex_unlock(&tconn->data.mutex); 1722 mutex_unlock(&tconn->data.mutex);
1700 1723
1701 intr = down_interruptible(&tconn->data.work.s); 1724 wait_event_interruptible(tconn->data.work.q_wait,
1725 dequeue_work_item(&tconn->data.work, &work_list));
1702 1726
1703 mutex_lock(&tconn->data.mutex); 1727 mutex_lock(&tconn->data.mutex);
1704 if (tconn->data.socket && cork) 1728 if (tconn->data.socket && cork)
1705 drbd_tcp_cork(tconn->data.socket); 1729 drbd_tcp_cork(tconn->data.socket);
1706 mutex_unlock(&tconn->data.mutex); 1730 mutex_unlock(&tconn->data.mutex);
1707 } 1731 }
1708 1732
1709 if (intr) { 1733 if (signal_pending(current)) {
1710 flush_signals(current); 1734 flush_signals(current);
1711 if (get_t_state(thi) == RUNNING) { 1735 if (get_t_state(thi) == RUNNING) {
1712 conn_warn(tconn, "Worker got an unexpected signal\n"); 1736 conn_warn(tconn, "Worker got an unexpected signal\n");
@@ -1717,59 +1741,25 @@ int drbd_worker(struct drbd_thread *thi)
1717 1741
1718 if (get_t_state(thi) != RUNNING) 1742 if (get_t_state(thi) != RUNNING)
1719 break; 1743 break;
1720 /* With this break, we have done a down() but not consumed
1721 the entry from the list. The cleanup code takes care of
1722 this... */
1723
1724 w = NULL;
1725 spin_lock_irq(&tconn->data.work.q_lock);
1726 if (list_empty(&tconn->data.work.q)) {
1727 /* something terribly wrong in our logic.
1728 * we were able to down() the semaphore,
1729 * but the list is empty... doh.
1730 *
1731 * what is the best thing to do now?
1732 * try again from scratch, restarting the receiver,
1733 * asender, whatnot? could break even more ugly,
1734 * e.g. when we are primary, but no good local data.
1735 *
1736 * I'll try to get away just starting over this loop.
1737 */
1738 conn_warn(tconn, "Work list unexpectedly empty\n");
1739 spin_unlock_irq(&tconn->data.work.q_lock);
1740 continue;
1741 }
1742 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1743 list_del_init(&w->list);
1744 spin_unlock_irq(&tconn->data.work.q_lock);
1745 1744
1746 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) { 1745 while (!list_empty(&work_list)) {
1747 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1746 w = list_first_entry(&work_list, struct drbd_work, list);
1747 list_del_init(&w->list);
1748 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1749 continue;
1748 if (tconn->cstate >= C_WF_REPORT_PARAMS) 1750 if (tconn->cstate >= C_WF_REPORT_PARAMS)
1749 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD); 1751 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1750 } 1752 }
1751 } 1753 }
1752 1754
1753 spin_lock_irq(&tconn->data.work.q_lock); 1755 do {
1754 while (!list_empty(&tconn->data.work.q)) {
1755 list_splice_init(&tconn->data.work.q, &work_list);
1756 spin_unlock_irq(&tconn->data.work.q_lock);
1757
1758 while (!list_empty(&work_list)) { 1756 while (!list_empty(&work_list)) {
1759 w = list_entry(work_list.next, struct drbd_work, list); 1757 w = list_first_entry(&work_list, struct drbd_work, list);
1760 list_del_init(&w->list); 1758 list_del_init(&w->list);
1761 w->cb(w, 1); 1759 w->cb(w, 1);
1762 } 1760 }
1763 1761 dequeue_work_batch(&tconn->data.work, &work_list);
1764 spin_lock_irq(&tconn->data.work.q_lock); 1762 } while (!list_empty(&work_list));
1765 }
1766 sema_init(&tconn->data.work.s, 0);
1767 /* DANGEROUS race: if someone did queue his work within the spinlock,
1768 * but up() ed outside the spinlock, we could get an up() on the
1769 * semaphore without corresponding list entry.
1770 * So don't do that.
1771 */
1772 spin_unlock_irq(&tconn->data.work.q_lock);
1773 1763
1774 rcu_read_lock(); 1764 rcu_read_lock();
1775 idr_for_each_entry(&tconn->volumes, mdev, vnr) { 1765 idr_for_each_entry(&tconn->volumes, mdev, vnr) {