summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaolo Abeni <pabeni@redhat.com>2017-05-16 05:20:14 -0400
committerDavid S. Miller <davem@davemloft.net>2017-05-16 15:41:29 -0400
commit2276f58ac5890e58d2b6a48b95493faff7347e3a (patch)
treec77ce6fd0b942dfddb145e1ad6dbb2fb1942d849
parent65101aeca52241a05e66f23c96eb896c9412718d (diff)
udp: use a separate rx queue for packet reception
under udp flood the sk_receive_queue spinlock is heavily contended. This patch try to reduce the contention on such lock adding a second receive queue to the udp sockets; recvmsg() looks first in such queue and, only if empty, tries to fetch the data from sk_receive_queue. The latter is spliced into the newly added queue every time the receive path has to acquire the sk_receive_queue lock. The accounting of forward allocated memory is still protected with the sk_receive_queue lock, so udp_rmem_release() needs to acquire both locks when the forward deficit is flushed. On specific scenarios we can end up acquiring and releasing the sk_receive_queue lock multiple times; that will be covered by the next patch Suggested-by: Eric Dumazet <edumazet@google.com> Signed-off-by: Paolo Abeni <pabeni@redhat.com> Acked-by: Eric Dumazet <edumazet@google.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--include/linux/udp.h3
-rw-r--r--include/net/udp.h9
-rw-r--r--include/net/udplite.h2
-rw-r--r--net/ipv4/udp.c138
-rw-r--r--net/ipv6/udp.c3
5 files changed, 131 insertions, 24 deletions
diff --git a/include/linux/udp.h b/include/linux/udp.h
index 6cb4061a720d..eaea63bc79bb 100644
--- a/include/linux/udp.h
+++ b/include/linux/udp.h
@@ -80,6 +80,9 @@ struct udp_sock {
80 struct sk_buff *skb, 80 struct sk_buff *skb,
81 int nhoff); 81 int nhoff);
82 82
83 /* udp_recvmsg try to use this before splicing sk_receive_queue */
84 struct sk_buff_head reader_queue ____cacheline_aligned_in_smp;
85
83 /* This field is dirtied by udp_recvmsg() */ 86 /* This field is dirtied by udp_recvmsg() */
84 int forward_deficit; 87 int forward_deficit;
85}; 88};
diff --git a/include/net/udp.h b/include/net/udp.h
index 3391dbd73959..1468dbd0f09a 100644
--- a/include/net/udp.h
+++ b/include/net/udp.h
@@ -249,13 +249,8 @@ void udp_destruct_sock(struct sock *sk);
249void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len); 249void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
250int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb); 250int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
251void udp_skb_destructor(struct sock *sk, struct sk_buff *skb); 251void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
252static inline struct sk_buff * 252struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
253__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked, 253 int noblock, int *peeked, int *off, int *err);
254 int *off, int *err)
255{
256 return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
257 udp_skb_destructor, peeked, off, err);
258}
259static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags, 254static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
260 int noblock, int *err) 255 int noblock, int *err)
261{ 256{
diff --git a/include/net/udplite.h b/include/net/udplite.h
index ea340524f99b..b7a18f63d86d 100644
--- a/include/net/udplite.h
+++ b/include/net/udplite.h
@@ -26,8 +26,8 @@ static __inline__ int udplite_getfrag(void *from, char *to, int offset,
26/* Designate sk as UDP-Lite socket */ 26/* Designate sk as UDP-Lite socket */
27static inline int udplite_sk_init(struct sock *sk) 27static inline int udplite_sk_init(struct sock *sk)
28{ 28{
29 udp_init_sock(sk);
29 udp_sk(sk)->pcflag = UDPLITE_BIT; 30 udp_sk(sk)->pcflag = UDPLITE_BIT;
30 sk->sk_destruct = udp_destruct_sock;
31 return 0; 31 return 0;
32} 32}
33 33
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index ea6e4cff9faf..492c76be9230 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1167,19 +1167,24 @@ out:
1167static void udp_rmem_release(struct sock *sk, int size, int partial) 1167static void udp_rmem_release(struct sock *sk, int size, int partial)
1168{ 1168{
1169 struct udp_sock *up = udp_sk(sk); 1169 struct udp_sock *up = udp_sk(sk);
1170 struct sk_buff_head *sk_queue;
1170 int amt; 1171 int amt;
1171 1172
1172 if (likely(partial)) { 1173 if (likely(partial)) {
1173 up->forward_deficit += size; 1174 up->forward_deficit += size;
1174 size = up->forward_deficit; 1175 size = up->forward_deficit;
1175 if (size < (sk->sk_rcvbuf >> 2) && 1176 if (size < (sk->sk_rcvbuf >> 2) &&
1176 !skb_queue_empty(&sk->sk_receive_queue)) 1177 !skb_queue_empty(&up->reader_queue))
1177 return; 1178 return;
1178 } else { 1179 } else {
1179 size += up->forward_deficit; 1180 size += up->forward_deficit;
1180 } 1181 }
1181 up->forward_deficit = 0; 1182 up->forward_deficit = 0;
1182 1183
1184 /* acquire the sk_receive_queue for fwd allocated memory scheduling */
1185 sk_queue = &sk->sk_receive_queue;
1186 spin_lock(&sk_queue->lock);
1187
1183 sk->sk_forward_alloc += size; 1188 sk->sk_forward_alloc += size;
1184 amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); 1189 amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
1185 sk->sk_forward_alloc -= amt; 1190 sk->sk_forward_alloc -= amt;
@@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
1188 __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT); 1193 __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
1189 1194
1190 atomic_sub(size, &sk->sk_rmem_alloc); 1195 atomic_sub(size, &sk->sk_rmem_alloc);
1196
1197 /* this can save us from acquiring the rx queue lock on next receive */
1198 skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
1199
1200 spin_unlock(&sk_queue->lock);
1191} 1201}
1192 1202
1193/* Note: called with sk_receive_queue.lock held. 1203/* Note: called with reader_queue.lock held.
1194 * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch 1204 * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
1195 * This avoids a cache line miss while receive_queue lock is held. 1205 * This avoids a cache line miss while receive_queue lock is held.
1196 * Look at __udp_enqueue_schedule_skb() to find where this copy is done. 1206 * Look at __udp_enqueue_schedule_skb() to find where this copy is done.
@@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
1306void udp_destruct_sock(struct sock *sk) 1316void udp_destruct_sock(struct sock *sk)
1307{ 1317{
1308 /* reclaim completely the forward allocated memory */ 1318 /* reclaim completely the forward allocated memory */
1319 struct udp_sock *up = udp_sk(sk);
1309 unsigned int total = 0; 1320 unsigned int total = 0;
1310 struct sk_buff *skb; 1321 struct sk_buff *skb;
1311 1322
1312 while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) { 1323 skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
1324 while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
1313 total += skb->truesize; 1325 total += skb->truesize;
1314 kfree_skb(skb); 1326 kfree_skb(skb);
1315 } 1327 }
@@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);
1321 1333
1322int udp_init_sock(struct sock *sk) 1334int udp_init_sock(struct sock *sk)
1323{ 1335{
1336 skb_queue_head_init(&udp_sk(sk)->reader_queue);
1324 sk->sk_destruct = udp_destruct_sock; 1337 sk->sk_destruct = udp_destruct_sock;
1325 return 0; 1338 return 0;
1326} 1339}
@@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
1338} 1351}
1339EXPORT_SYMBOL_GPL(skb_consume_udp); 1352EXPORT_SYMBOL_GPL(skb_consume_udp);
1340 1353
1354static struct sk_buff *__first_packet_length(struct sock *sk,
1355 struct sk_buff_head *rcvq,
1356 int *total)
1357{
1358 struct sk_buff *skb;
1359
1360 while ((skb = skb_peek(rcvq)) != NULL &&
1361 udp_lib_checksum_complete(skb)) {
1362 __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
1363 IS_UDPLITE(sk));
1364 __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
1365 IS_UDPLITE(sk));
1366 atomic_inc(&sk->sk_drops);
1367 __skb_unlink(skb, rcvq);
1368 *total += skb->truesize;
1369 kfree_skb(skb);
1370 }
1371 return skb;
1372}
1373
1341/** 1374/**
1342 * first_packet_length - return length of first packet in receive queue 1375 * first_packet_length - return length of first packet in receive queue
1343 * @sk: socket 1376 * @sk: socket
@@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
1347 */ 1380 */
1348static int first_packet_length(struct sock *sk) 1381static int first_packet_length(struct sock *sk)
1349{ 1382{
1350 struct sk_buff_head *rcvq = &sk->sk_receive_queue; 1383 struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
1384 struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
1351 struct sk_buff *skb; 1385 struct sk_buff *skb;
1352 int total = 0; 1386 int total = 0;
1353 int res; 1387 int res;
1354 1388
1355 spin_lock_bh(&rcvq->lock); 1389 spin_lock_bh(&rcvq->lock);
1356 while ((skb = skb_peek(rcvq)) != NULL && 1390 skb = __first_packet_length(sk, rcvq, &total);
1357 udp_lib_checksum_complete(skb)) { 1391 if (!skb && !skb_queue_empty(sk_queue)) {
1358 __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, 1392 spin_lock(&sk_queue->lock);
1359 IS_UDPLITE(sk)); 1393 skb_queue_splice_tail_init(sk_queue, rcvq);
1360 __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, 1394 spin_unlock(&sk_queue->lock);
1361 IS_UDPLITE(sk)); 1395
1362 atomic_inc(&sk->sk_drops); 1396 skb = __first_packet_length(sk, rcvq, &total);
1363 __skb_unlink(skb, rcvq);
1364 total += skb->truesize;
1365 kfree_skb(skb);
1366 } 1397 }
1367 res = skb ? skb->len : -1; 1398 res = skb ? skb->len : -1;
1368 if (total) 1399 if (total)
@@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
1400} 1431}
1401EXPORT_SYMBOL(udp_ioctl); 1432EXPORT_SYMBOL(udp_ioctl);
1402 1433
1434struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
1435 int noblock, int *peeked, int *off, int *err)
1436{
1437 struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
1438 struct sk_buff_head *queue;
1439 struct sk_buff *last;
1440 long timeo;
1441 int error;
1442
1443 queue = &udp_sk(sk)->reader_queue;
1444 flags |= noblock ? MSG_DONTWAIT : 0;
1445 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1446 do {
1447 struct sk_buff *skb;
1448
1449 error = sock_error(sk);
1450 if (error)
1451 break;
1452
1453 error = -EAGAIN;
1454 *peeked = 0;
1455 do {
1456 int _off = *off;
1457
1458 spin_lock_bh(&queue->lock);
1459 skb = __skb_try_recv_from_queue(sk, queue, flags,
1460 udp_skb_destructor,
1461 peeked, &_off, err,
1462 &last);
1463 if (skb) {
1464 spin_unlock_bh(&queue->lock);
1465 *off = _off;
1466 return skb;
1467 }
1468
1469 if (skb_queue_empty(sk_queue)) {
1470 spin_unlock_bh(&queue->lock);
1471 goto busy_check;
1472 }
1473
1474 /* refill the reader queue and walk it again */
1475 _off = *off;
1476 spin_lock(&sk_queue->lock);
1477 skb_queue_splice_tail_init(sk_queue, queue);
1478 spin_unlock(&sk_queue->lock);
1479
1480 skb = __skb_try_recv_from_queue(sk, queue, flags,
1481 udp_skb_destructor,
1482 peeked, &_off, err,
1483 &last);
1484 spin_unlock_bh(&queue->lock);
1485 if (skb) {
1486 *off = _off;
1487 return skb;
1488 }
1489
1490busy_check:
1491 if (!sk_can_busy_loop(sk))
1492 break;
1493
1494 sk_busy_loop(sk, flags & MSG_DONTWAIT);
1495 } while (!skb_queue_empty(sk_queue));
1496
1497 /* sk_queue is empty, reader_queue may contain peeked packets */
1498 } while (timeo &&
1499 !__skb_wait_for_more_packets(sk, &error, &timeo,
1500 (struct sk_buff *)sk_queue));
1501
1502 *err = error;
1503 return NULL;
1504}
1505EXPORT_SYMBOL_GPL(__skb_recv_udp);
1506
1403/* 1507/*
1404 * This should be easy, if there is something there we 1508 * This should be easy, if there is something there we
1405 * return it, otherwise we block. 1509 * return it, otherwise we block.
@@ -1490,7 +1594,8 @@ try_again:
1490 return err; 1594 return err;
1491 1595
1492csum_copy_err: 1596csum_copy_err:
1493 if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) { 1597 if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
1598 udp_skb_destructor)) {
1494 UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite); 1599 UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
1495 UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite); 1600 UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
1496 } 1601 }
@@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
2325 unsigned int mask = datagram_poll(file, sock, wait); 2430 unsigned int mask = datagram_poll(file, sock, wait);
2326 struct sock *sk = sock->sk; 2431 struct sock *sk = sock->sk;
2327 2432
2433 if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
2434 mask |= POLLIN | POLLRDNORM;
2435
2328 sock_rps_record_flow(sk); 2436 sock_rps_record_flow(sk);
2329 2437
2330 /* Check for false positives due to checksum errors */ 2438 /* Check for false positives due to checksum errors */
diff --git a/net/ipv6/udp.c b/net/ipv6/udp.c
index 04862abfe4ec..f78fdf8c9f0f 100644
--- a/net/ipv6/udp.c
+++ b/net/ipv6/udp.c
@@ -455,7 +455,8 @@ try_again:
455 return err; 455 return err;
456 456
457csum_copy_err: 457csum_copy_err:
458 if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) { 458 if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
459 udp_skb_destructor)) {
459 if (is_udp4) { 460 if (is_udp4) {
460 UDP_INC_STATS(sock_net(sk), 461 UDP_INC_STATS(sock_net(sk),
461 UDP_MIB_CSUMERRORS, is_udplite); 462 UDP_MIB_CSUMERRORS, is_udplite);