diff options
author | Jon Maloy <jon.maloy@ericsson.com> | 2017-10-13 05:04:26 -0400 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2017-10-13 11:46:00 -0400 |
commit | b7d42635517fde2b095deddd0fba37be2302a285 (patch) | |
tree | 531940dc481a0e08ca7a2bbd0a328452344b82e1 /net/tipc/socket.c | |
parent | ae236fb208a6fbbd2e7a6913385e8fb78ac807f8 (diff) |
tipc: introduce flow control for group broadcast messages
We introduce an end-to-end flow control mechanism for group broadcast
messages. This ensures that no messages are ever lost because of
destination receive buffer overflow, with minimal impact on performance.
For now, the algorithm is based on the assumption that there is only one
active transmitter at any moment in time.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 48 |
1 files changed, 36 insertions, 12 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 0a2eac309177..50145c95ac96 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c | |||
@@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk) | |||
201 | return tsk->snt_unacked > tsk->snd_win; | 201 | return tsk->snt_unacked > tsk->snd_win; |
202 | } | 202 | } |
203 | 203 | ||
204 | static u16 tsk_blocks(int len) | ||
205 | { | ||
206 | return ((len / FLOWCTL_BLK_SZ) + 1); | ||
207 | } | ||
208 | |||
204 | /* tsk_blocks(): translate a buffer size in bytes to number of | 209 | /* tsk_blocks(): translate a buffer size in bytes to number of |
205 | * advertisable blocks, taking into account the ratio truesize(len)/len | 210 | * advertisable blocks, taking into account the ratio truesize(len)/len |
206 | * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ | 211 | * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ |
@@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, | |||
831 | struct tipc_group *grp = tsk->group; | 836 | struct tipc_group *grp = tsk->group; |
832 | struct tipc_nlist *dsts = tipc_group_dests(grp); | 837 | struct tipc_nlist *dsts = tipc_group_dests(grp); |
833 | struct tipc_mc_method *method = &tsk->mc_method; | 838 | struct tipc_mc_method *method = &tsk->mc_method; |
839 | int blks = tsk_blocks(MCAST_H_SIZE + dlen); | ||
834 | struct tipc_msg *hdr = &tsk->phdr; | 840 | struct tipc_msg *hdr = &tsk->phdr; |
835 | int mtu = tipc_bcast_get_mtu(net); | 841 | int mtu = tipc_bcast_get_mtu(net); |
836 | struct sk_buff_head pkts; | 842 | struct sk_buff_head pkts; |
@@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, | |||
839 | if (!dsts->local && !dsts->remote) | 845 | if (!dsts->local && !dsts->remote) |
840 | return -EHOSTUNREACH; | 846 | return -EHOSTUNREACH; |
841 | 847 | ||
842 | /* Block or return if any destination link is congested */ | 848 | /* Block or return if any destination link or member is congested */ |
843 | rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); | 849 | rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt && |
850 | !tipc_group_bc_cong(grp, blks)); | ||
844 | if (unlikely(rc)) | 851 | if (unlikely(rc)) |
845 | return rc; | 852 | return rc; |
846 | 853 | ||
847 | /* Complete message header */ | 854 | /* Complete message header */ |
848 | msg_set_type(hdr, TIPC_GRP_BCAST_MSG); | 855 | msg_set_type(hdr, TIPC_GRP_BCAST_MSG); |
849 | msg_set_hdr_sz(hdr, MCAST_H_SIZE); | 856 | msg_set_hdr_sz(hdr, GROUP_H_SIZE); |
850 | msg_set_destport(hdr, 0); | 857 | msg_set_destport(hdr, 0); |
851 | msg_set_destnode(hdr, 0); | 858 | msg_set_destnode(hdr, 0); |
852 | msg_set_nameinst(hdr, 0); | 859 | msg_set_nameinst(hdr, 0); |
@@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, | |||
864 | if (unlikely(rc)) | 871 | if (unlikely(rc)) |
865 | return rc; | 872 | return rc; |
866 | 873 | ||
867 | /* Update broadcast sequence number */ | 874 | /* Update broadcast sequence number and send windows */ |
868 | tipc_group_update_bc_members(tsk->group); | 875 | tipc_group_update_bc_members(tsk->group, blks); |
869 | |||
870 | return dlen; | 876 | return dlen; |
871 | } | 877 | } |
872 | 878 | ||
@@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) | |||
1024 | if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) | 1030 | if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) |
1025 | return -EMSGSIZE; | 1031 | return -EMSGSIZE; |
1026 | 1032 | ||
1027 | if (unlikely(grp)) | 1033 | if (unlikely(grp && !dest)) |
1028 | return tipc_send_group_bcast(sock, m, dlen, timeout); | 1034 | return tipc_send_group_bcast(sock, m, dlen, timeout); |
1029 | 1035 | ||
1030 | if (unlikely(!dest)) { | 1036 | if (unlikely(!dest)) { |
@@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, | |||
1420 | bool connected = !tipc_sk_type_connectionless(sk); | 1426 | bool connected = !tipc_sk_type_connectionless(sk); |
1421 | struct tipc_sock *tsk = tipc_sk(sk); | 1427 | struct tipc_sock *tsk = tipc_sk(sk); |
1422 | int rc, err, hlen, dlen, copy; | 1428 | int rc, err, hlen, dlen, copy; |
1429 | struct sk_buff_head xmitq; | ||
1423 | struct tipc_msg *hdr; | 1430 | struct tipc_msg *hdr; |
1424 | struct sk_buff *skb; | 1431 | struct sk_buff *skb; |
1425 | bool grp_evt; | 1432 | bool grp_evt; |
@@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, | |||
1436 | } | 1443 | } |
1437 | timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); | 1444 | timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); |
1438 | 1445 | ||
1446 | /* Step rcv queue to first msg with data or error; wait if necessary */ | ||
1439 | do { | 1447 | do { |
1440 | /* Look at first msg in receive queue; wait if necessary */ | ||
1441 | rc = tipc_wait_for_rcvmsg(sock, &timeout); | 1448 | rc = tipc_wait_for_rcvmsg(sock, &timeout); |
1442 | if (unlikely(rc)) | 1449 | if (unlikely(rc)) |
1443 | goto exit; | 1450 | goto exit; |
@@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, | |||
1485 | if (unlikely(flags & MSG_PEEK)) | 1492 | if (unlikely(flags & MSG_PEEK)) |
1486 | goto exit; | 1493 | goto exit; |
1487 | 1494 | ||
1495 | /* Send group flow control advertisement when applicable */ | ||
1496 | if (tsk->group && msg_in_group(hdr) && !grp_evt) { | ||
1497 | skb_queue_head_init(&xmitq); | ||
1498 | tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), | ||
1499 | msg_orignode(hdr), msg_origport(hdr), | ||
1500 | &xmitq); | ||
1501 | tipc_node_distr_xmit(sock_net(sk), &xmitq); | ||
1502 | } | ||
1503 | |||
1488 | tsk_advance_rx_queue(sk); | 1504 | tsk_advance_rx_queue(sk); |
1489 | 1505 | ||
1490 | if (likely(!connected)) | 1506 | if (likely(!connected)) |
1491 | goto exit; | 1507 | goto exit; |
1492 | 1508 | ||
1493 | /* Send connection flow control ack when applicable */ | 1509 | /* Send connection flow control advertisement when applicable */ |
1494 | tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); | 1510 | tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); |
1495 | if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) | 1511 | if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) |
1496 | tipc_sk_send_ack(tsk); | 1512 | tipc_sk_send_ack(tsk); |
@@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, | |||
1650 | struct tipc_sock *tsk = tipc_sk(sk); | 1666 | struct tipc_sock *tsk = tipc_sk(sk); |
1651 | struct tipc_msg *hdr = buf_msg(skb); | 1667 | struct tipc_msg *hdr = buf_msg(skb); |
1652 | struct tipc_group *grp = tsk->group; | 1668 | struct tipc_group *grp = tsk->group; |
1669 | bool wakeup = false; | ||
1653 | 1670 | ||
1654 | switch (msg_user(hdr)) { | 1671 | switch (msg_user(hdr)) { |
1655 | case CONN_MANAGER: | 1672 | case CONN_MANAGER: |
@@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk, | |||
1658 | case SOCK_WAKEUP: | 1675 | case SOCK_WAKEUP: |
1659 | tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); | 1676 | tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); |
1660 | tsk->cong_link_cnt--; | 1677 | tsk->cong_link_cnt--; |
1661 | sk->sk_write_space(sk); | 1678 | wakeup = true; |
1662 | break; | 1679 | break; |
1663 | case GROUP_PROTOCOL: | 1680 | case GROUP_PROTOCOL: |
1664 | tipc_group_proto_rcv(grp, hdr, inputq, xmitq); | 1681 | tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); |
1665 | break; | 1682 | break; |
1666 | case TOP_SRV: | 1683 | case TOP_SRV: |
1667 | tipc_group_member_evt(tsk->group, skb, inputq, xmitq); | 1684 | tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, |
1685 | skb, inputq, xmitq); | ||
1668 | skb = NULL; | 1686 | skb = NULL; |
1669 | break; | 1687 | break; |
1670 | default: | 1688 | default: |
1671 | break; | 1689 | break; |
1672 | } | 1690 | } |
1673 | 1691 | ||
1692 | if (wakeup) | ||
1693 | sk->sk_write_space(sk); | ||
1694 | |||
1674 | kfree_skb(skb); | 1695 | kfree_skb(skb); |
1675 | } | 1696 | } |
1676 | 1697 | ||
@@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) | |||
1785 | struct tipc_sock *tsk = tipc_sk(sk); | 1806 | struct tipc_sock *tsk = tipc_sk(sk); |
1786 | struct tipc_msg *hdr = buf_msg(skb); | 1807 | struct tipc_msg *hdr = buf_msg(skb); |
1787 | 1808 | ||
1809 | if (unlikely(msg_in_group(hdr))) | ||
1810 | return sk->sk_rcvbuf; | ||
1811 | |||
1788 | if (unlikely(!msg_connected(hdr))) | 1812 | if (unlikely(!msg_connected(hdr))) |
1789 | return sk->sk_rcvbuf << msg_importance(hdr); | 1813 | return sk->sk_rcvbuf << msg_importance(hdr); |
1790 | 1814 | ||