aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/net/xen-netback/netback.c
diff options
context:
space:
mode:
authorPaul Durrant <Paul.Durrant@citrix.com>2013-12-06 11:36:07 -0500
committerDavid S. Miller <davem@davemloft.net>2013-12-09 20:33:12 -0500
commitca2f09f2b2c6c25047cfc545d057c4edfcfe561c (patch)
tree08e9cc996597b15106ca9cc21128ca51ba51b3aa /drivers/net/xen-netback/netback.c
parent512137eeff00f73a8a62e481a6575f1556cf962c (diff)
xen-netback: improve guest-receive-side flow control
The way that flow control works without this patch is that, in start_xmit() the code uses xenvif_count_skb_slots() to predict how many slots xenvif_gop_skb() will consume and then adds this to a 'req_cons_peek' counter which it then uses to determine if the shared ring has that amount of space available by checking whether 'req_prod' has passed that value. If the ring doesn't have space the tx queue is stopped. xenvif_gop_skb() will then consume slots and update 'req_cons' and issue responses, updating 'rsp_prod' as it goes. The frontend will consume those responses and post new requests, by updating req_prod. So, req_prod chases req_cons which chases rsp_prod, and can never exceed that value. Thus if xenvif_count_skb_slots() ever returns a number of slots greater than xenvif_gop_skb() uses, req_cons_peek will get to a value that req_prod cannot possibly achieve (since it's limited by the 'real' req_cons) and, if this happens enough times, req_cons_peek gets more than a ring size ahead of req_cons and the tx queue then remains stopped forever waiting for an unachievable amount of space to become available in the ring. Having two routines trying to calculate the same value is always going to be fragile, so this patch does away with that. All we essentially need to do is make sure that we have 'enough stuff' on our internal queue without letting it build up uncontrollably. So start_xmit() makes a cheap optimistic check of how much space is needed for an skb and only turns the queue off if that is unachievable. net_rx_action() is the place where we could do with an accurate predicition but, since that has proven tricky to calculate, a cheap worse-case (but not too bad) estimate is all we really need since the only thing we *must* prevent is xenvif_gop_skb() consuming more slots than are available. Without this patch I can trivially stall netback permanently by just doing a large guest to guest file copy between two Windows Server 2008R2 VMs on a single host. Patch tested with frontends in: - Windows Server 2008R2 - CentOS 6.0 - Debian Squeeze - Debian Wheezy - SLES11 Signed-off-by: Paul Durrant <paul.durrant@citrix.com> Cc: Wei Liu <wei.liu2@citrix.com> Cc: Ian Campbell <ian.campbell@citrix.com> Cc: David Vrabel <david.vrabel@citrix.com> Cc: Annie Li <annie.li@oracle.com> Cc: Konrad Rzeszutek Wilk <konrad.wilk@oracle.com> Acked-by: Wei Liu <wei.liu2@citrix.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'drivers/net/xen-netback/netback.c')
-rw-r--r--drivers/net/xen-netback/netback.c217
1 files changed, 71 insertions, 146 deletions
diff --git a/drivers/net/xen-netback/netback.c b/drivers/net/xen-netback/netback.c
index acf13920e6d1..43341b82649c 100644
--- a/drivers/net/xen-netback/netback.c
+++ b/drivers/net/xen-netback/netback.c
@@ -138,36 +138,26 @@ static inline pending_ring_idx_t nr_pending_reqs(struct xenvif *vif)
138 vif->pending_prod + vif->pending_cons; 138 vif->pending_prod + vif->pending_cons;
139} 139}
140 140
141static int max_required_rx_slots(struct xenvif *vif) 141bool xenvif_rx_ring_slots_available(struct xenvif *vif, int needed)
142{ 142{
143 int max = DIV_ROUND_UP(vif->dev->mtu, PAGE_SIZE); 143 RING_IDX prod, cons;
144 144
145 /* XXX FIXME: RX path dependent on MAX_SKB_FRAGS */ 145 do {
146 if (vif->can_sg || vif->gso_mask || vif->gso_prefix_mask) 146 prod = vif->rx.sring->req_prod;
147 max += MAX_SKB_FRAGS + 1; /* extra_info + frags */ 147 cons = vif->rx.req_cons;
148
149 return max;
150}
151
152int xenvif_rx_ring_full(struct xenvif *vif)
153{
154 RING_IDX peek = vif->rx_req_cons_peek;
155 RING_IDX needed = max_required_rx_slots(vif);
156 148
157 return ((vif->rx.sring->req_prod - peek) < needed) || 149 if (prod - cons >= needed)
158 ((vif->rx.rsp_prod_pvt + XEN_NETIF_RX_RING_SIZE - peek) < needed); 150 return true;
159}
160 151
161int xenvif_must_stop_queue(struct xenvif *vif) 152 vif->rx.sring->req_event = prod + 1;
162{
163 if (!xenvif_rx_ring_full(vif))
164 return 0;
165 153
166 vif->rx.sring->req_event = vif->rx_req_cons_peek + 154 /* Make sure event is visible before we check prod
167 max_required_rx_slots(vif); 155 * again.
168 mb(); /* request notification /then/ check the queue */ 156 */
157 mb();
158 } while (vif->rx.sring->req_prod != prod);
169 159
170 return xenvif_rx_ring_full(vif); 160 return false;
171} 161}
172 162
173/* 163/*
@@ -210,93 +200,6 @@ static bool start_new_rx_buffer(int offset, unsigned long size, int head)
210 return false; 200 return false;
211} 201}
212 202
213struct xenvif_count_slot_state {
214 unsigned long copy_off;
215 bool head;
216};
217
218unsigned int xenvif_count_frag_slots(struct xenvif *vif,
219 unsigned long offset, unsigned long size,
220 struct xenvif_count_slot_state *state)
221{
222 unsigned count = 0;
223
224 offset &= ~PAGE_MASK;
225
226 while (size > 0) {
227 unsigned long bytes;
228
229 bytes = PAGE_SIZE - offset;
230
231 if (bytes > size)
232 bytes = size;
233
234 if (start_new_rx_buffer(state->copy_off, bytes, state->head)) {
235 count++;
236 state->copy_off = 0;
237 }
238
239 if (state->copy_off + bytes > MAX_BUFFER_OFFSET)
240 bytes = MAX_BUFFER_OFFSET - state->copy_off;
241
242 state->copy_off += bytes;
243
244 offset += bytes;
245 size -= bytes;
246
247 if (offset == PAGE_SIZE)
248 offset = 0;
249
250 state->head = false;
251 }
252
253 return count;
254}
255
256/*
257 * Figure out how many ring slots we're going to need to send @skb to
258 * the guest. This function is essentially a dry run of
259 * xenvif_gop_frag_copy.
260 */
261unsigned int xenvif_count_skb_slots(struct xenvif *vif, struct sk_buff *skb)
262{
263 struct xenvif_count_slot_state state;
264 unsigned int count;
265 unsigned char *data;
266 unsigned i;
267
268 state.head = true;
269 state.copy_off = 0;
270
271 /* Slot for the first (partial) page of data. */
272 count = 1;
273
274 /* Need a slot for the GSO prefix for GSO extra data? */
275 if (skb_shinfo(skb)->gso_size)
276 count++;
277
278 data = skb->data;
279 while (data < skb_tail_pointer(skb)) {
280 unsigned long offset = offset_in_page(data);
281 unsigned long size = PAGE_SIZE - offset;
282
283 if (data + size > skb_tail_pointer(skb))
284 size = skb_tail_pointer(skb) - data;
285
286 count += xenvif_count_frag_slots(vif, offset, size, &state);
287
288 data += size;
289 }
290
291 for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
292 unsigned long size = skb_frag_size(&skb_shinfo(skb)->frags[i]);
293 unsigned long offset = skb_shinfo(skb)->frags[i].page_offset;
294
295 count += xenvif_count_frag_slots(vif, offset, size, &state);
296 }
297 return count;
298}
299
300struct netrx_pending_operations { 203struct netrx_pending_operations {
301 unsigned copy_prod, copy_cons; 204 unsigned copy_prod, copy_cons;
302 unsigned meta_prod, meta_cons; 205 unsigned meta_prod, meta_cons;
@@ -557,12 +460,12 @@ struct skb_cb_overlay {
557 int meta_slots_used; 460 int meta_slots_used;
558}; 461};
559 462
560static void xenvif_kick_thread(struct xenvif *vif) 463void xenvif_kick_thread(struct xenvif *vif)
561{ 464{
562 wake_up(&vif->wq); 465 wake_up(&vif->wq);
563} 466}
564 467
565void xenvif_rx_action(struct xenvif *vif) 468static void xenvif_rx_action(struct xenvif *vif)
566{ 469{
567 s8 status; 470 s8 status;
568 u16 flags; 471 u16 flags;
@@ -571,8 +474,6 @@ void xenvif_rx_action(struct xenvif *vif)
571 struct sk_buff *skb; 474 struct sk_buff *skb;
572 LIST_HEAD(notify); 475 LIST_HEAD(notify);
573 int ret; 476 int ret;
574 int nr_frags;
575 int count;
576 unsigned long offset; 477 unsigned long offset;
577 struct skb_cb_overlay *sco; 478 struct skb_cb_overlay *sco;
578 int need_to_notify = 0; 479 int need_to_notify = 0;
@@ -584,29 +485,44 @@ void xenvif_rx_action(struct xenvif *vif)
584 485
585 skb_queue_head_init(&rxq); 486 skb_queue_head_init(&rxq);
586 487
587 count = 0;
588
589 while ((skb = skb_dequeue(&vif->rx_queue)) != NULL) { 488 while ((skb = skb_dequeue(&vif->rx_queue)) != NULL) {
590 vif = netdev_priv(skb->dev); 489 int max_slots_needed;
591 nr_frags = skb_shinfo(skb)->nr_frags; 490 int i;
491
492 /* We need a cheap worse case estimate for the number of
493 * slots we'll use.
494 */
495
496 max_slots_needed = DIV_ROUND_UP(offset_in_page(skb->data) +
497 skb_headlen(skb),
498 PAGE_SIZE);
499 for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
500 unsigned int size;
501 size = skb_frag_size(&skb_shinfo(skb)->frags[i]);
502 max_slots_needed += DIV_ROUND_UP(size, PAGE_SIZE);
503 }
504 if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV4 ||
505 skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6)
506 max_slots_needed++;
507
508 /* If the skb may not fit then bail out now */
509 if (!xenvif_rx_ring_slots_available(vif, max_slots_needed)) {
510 skb_queue_head(&vif->rx_queue, skb);
511 need_to_notify = 1;
512 break;
513 }
592 514
593 sco = (struct skb_cb_overlay *)skb->cb; 515 sco = (struct skb_cb_overlay *)skb->cb;
594 sco->meta_slots_used = xenvif_gop_skb(skb, &npo); 516 sco->meta_slots_used = xenvif_gop_skb(skb, &npo);
595 517 BUG_ON(sco->meta_slots_used > max_slots_needed);
596 count += nr_frags + 1;
597 518
598 __skb_queue_tail(&rxq, skb); 519 __skb_queue_tail(&rxq, skb);
599
600 /* Filled the batch queue? */
601 /* XXX FIXME: RX path dependent on MAX_SKB_FRAGS */
602 if (count + MAX_SKB_FRAGS >= XEN_NETIF_RX_RING_SIZE)
603 break;
604 } 520 }
605 521
606 BUG_ON(npo.meta_prod > ARRAY_SIZE(vif->meta)); 522 BUG_ON(npo.meta_prod > ARRAY_SIZE(vif->meta));
607 523
608 if (!npo.copy_prod) 524 if (!npo.copy_prod)
609 return; 525 goto done;
610 526
611 BUG_ON(npo.copy_prod > ARRAY_SIZE(vif->grant_copy_op)); 527 BUG_ON(npo.copy_prod > ARRAY_SIZE(vif->grant_copy_op));
612 gnttab_batch_copy(vif->grant_copy_op, npo.copy_prod); 528 gnttab_batch_copy(vif->grant_copy_op, npo.copy_prod);
@@ -614,8 +530,6 @@ void xenvif_rx_action(struct xenvif *vif)
614 while ((skb = __skb_dequeue(&rxq)) != NULL) { 530 while ((skb = __skb_dequeue(&rxq)) != NULL) {
615 sco = (struct skb_cb_overlay *)skb->cb; 531 sco = (struct skb_cb_overlay *)skb->cb;
616 532
617 vif = netdev_priv(skb->dev);
618
619 if ((1 << vif->meta[npo.meta_cons].gso_type) & 533 if ((1 << vif->meta[npo.meta_cons].gso_type) &
620 vif->gso_prefix_mask) { 534 vif->gso_prefix_mask) {
621 resp = RING_GET_RESPONSE(&vif->rx, 535 resp = RING_GET_RESPONSE(&vif->rx,
@@ -681,25 +595,13 @@ void xenvif_rx_action(struct xenvif *vif)
681 if (ret) 595 if (ret)
682 need_to_notify = 1; 596 need_to_notify = 1;
683 597
684 xenvif_notify_tx_completion(vif);
685
686 npo.meta_cons += sco->meta_slots_used; 598 npo.meta_cons += sco->meta_slots_used;
687 dev_kfree_skb(skb); 599 dev_kfree_skb(skb);
688 } 600 }
689 601
602done:
690 if (need_to_notify) 603 if (need_to_notify)
691 notify_remote_via_irq(vif->rx_irq); 604 notify_remote_via_irq(vif->rx_irq);
692
693 /* More work to do? */
694 if (!skb_queue_empty(&vif->rx_queue))
695 xenvif_kick_thread(vif);
696}
697
698void xenvif_queue_tx_skb(struct xenvif *vif, struct sk_buff *skb)
699{
700 skb_queue_tail(&vif->rx_queue, skb);
701
702 xenvif_kick_thread(vif);
703} 605}
704 606
705void xenvif_check_rx_xenvif(struct xenvif *vif) 607void xenvif_check_rx_xenvif(struct xenvif *vif)
@@ -1804,7 +1706,7 @@ static struct xen_netif_rx_response *make_rx_response(struct xenvif *vif,
1804 1706
1805static inline int rx_work_todo(struct xenvif *vif) 1707static inline int rx_work_todo(struct xenvif *vif)
1806{ 1708{
1807 return !skb_queue_empty(&vif->rx_queue); 1709 return !skb_queue_empty(&vif->rx_queue) || vif->rx_event;
1808} 1710}
1809 1711
1810static inline int tx_work_todo(struct xenvif *vif) 1712static inline int tx_work_todo(struct xenvif *vif)
@@ -1854,8 +1756,6 @@ int xenvif_map_frontend_rings(struct xenvif *vif,
1854 rxs = (struct xen_netif_rx_sring *)addr; 1756 rxs = (struct xen_netif_rx_sring *)addr;
1855 BACK_RING_INIT(&vif->rx, rxs, PAGE_SIZE); 1757 BACK_RING_INIT(&vif->rx, rxs, PAGE_SIZE);
1856 1758
1857 vif->rx_req_cons_peek = 0;
1858
1859 return 0; 1759 return 0;
1860 1760
1861err: 1761err:
@@ -1863,9 +1763,24 @@ err:
1863 return err; 1763 return err;
1864} 1764}
1865 1765
1766void xenvif_stop_queue(struct xenvif *vif)
1767{
1768 if (!vif->can_queue)
1769 return;
1770
1771 netif_stop_queue(vif->dev);
1772}
1773
1774static void xenvif_start_queue(struct xenvif *vif)
1775{
1776 if (xenvif_schedulable(vif))
1777 netif_wake_queue(vif->dev);
1778}
1779
1866int xenvif_kthread(void *data) 1780int xenvif_kthread(void *data)
1867{ 1781{
1868 struct xenvif *vif = data; 1782 struct xenvif *vif = data;
1783 struct sk_buff *skb;
1869 1784
1870 while (!kthread_should_stop()) { 1785 while (!kthread_should_stop()) {
1871 wait_event_interruptible(vif->wq, 1786 wait_event_interruptible(vif->wq,
@@ -1874,12 +1789,22 @@ int xenvif_kthread(void *data)
1874 if (kthread_should_stop()) 1789 if (kthread_should_stop())
1875 break; 1790 break;
1876 1791
1877 if (rx_work_todo(vif)) 1792 if (!skb_queue_empty(&vif->rx_queue))
1878 xenvif_rx_action(vif); 1793 xenvif_rx_action(vif);
1879 1794
1795 vif->rx_event = false;
1796
1797 if (skb_queue_empty(&vif->rx_queue) &&
1798 netif_queue_stopped(vif->dev))
1799 xenvif_start_queue(vif);
1800
1880 cond_resched(); 1801 cond_resched();
1881 } 1802 }
1882 1803
1804 /* Bin any remaining skbs */
1805 while ((skb = skb_dequeue(&vif->rx_queue)) != NULL)
1806 dev_kfree_skb(skb);
1807
1883 return 0; 1808 return 0;
1884} 1809}
1885 1810