aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:26 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:00 -0400
commitb7d42635517fde2b095deddd0fba37be2302a285 (patch)
tree531940dc481a0e08ca7a2bbd0a328452344b82e1 /net/tipc/socket.c
parentae236fb208a6fbbd2e7a6913385e8fb78ac807f8 (diff)
tipc: introduce flow control for group broadcast messages
We introduce an end-to-end flow control mechanism for group broadcast messages. This ensures that no messages are ever lost because of destination receive buffer overflow, with minimal impact on performance. For now, the algorithm is based on the assumption that there is only one active transmitter at any moment in time. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c48
1 files changed, 36 insertions, 12 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 0a2eac309177..50145c95ac96 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
201 return tsk->snt_unacked > tsk->snd_win; 201 return tsk->snt_unacked > tsk->snd_win;
202} 202}
203 203
204static u16 tsk_blocks(int len)
205{
206 return ((len / FLOWCTL_BLK_SZ) + 1);
207}
208
204/* tsk_blocks(): translate a buffer size in bytes to number of 209/* tsk_blocks(): translate a buffer size in bytes to number of
205 * advertisable blocks, taking into account the ratio truesize(len)/len 210 * advertisable blocks, taking into account the ratio truesize(len)/len
206 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ 211 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
831 struct tipc_group *grp = tsk->group; 836 struct tipc_group *grp = tsk->group;
832 struct tipc_nlist *dsts = tipc_group_dests(grp); 837 struct tipc_nlist *dsts = tipc_group_dests(grp);
833 struct tipc_mc_method *method = &tsk->mc_method; 838 struct tipc_mc_method *method = &tsk->mc_method;
839 int blks = tsk_blocks(MCAST_H_SIZE + dlen);
834 struct tipc_msg *hdr = &tsk->phdr; 840 struct tipc_msg *hdr = &tsk->phdr;
835 int mtu = tipc_bcast_get_mtu(net); 841 int mtu = tipc_bcast_get_mtu(net);
836 struct sk_buff_head pkts; 842 struct sk_buff_head pkts;
@@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
839 if (!dsts->local && !dsts->remote) 845 if (!dsts->local && !dsts->remote)
840 return -EHOSTUNREACH; 846 return -EHOSTUNREACH;
841 847
842 /* Block or return if any destination link is congested */ 848 /* Block or return if any destination link or member is congested */
843 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); 849 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
850 !tipc_group_bc_cong(grp, blks));
844 if (unlikely(rc)) 851 if (unlikely(rc))
845 return rc; 852 return rc;
846 853
847 /* Complete message header */ 854 /* Complete message header */
848 msg_set_type(hdr, TIPC_GRP_BCAST_MSG); 855 msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
849 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 856 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
850 msg_set_destport(hdr, 0); 857 msg_set_destport(hdr, 0);
851 msg_set_destnode(hdr, 0); 858 msg_set_destnode(hdr, 0);
852 msg_set_nameinst(hdr, 0); 859 msg_set_nameinst(hdr, 0);
@@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
864 if (unlikely(rc)) 871 if (unlikely(rc))
865 return rc; 872 return rc;
866 873
867 /* Update broadcast sequence number */ 874 /* Update broadcast sequence number and send windows */
868 tipc_group_update_bc_members(tsk->group); 875 tipc_group_update_bc_members(tsk->group, blks);
869
870 return dlen; 876 return dlen;
871} 877}
872 878
@@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1024 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) 1030 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
1025 return -EMSGSIZE; 1031 return -EMSGSIZE;
1026 1032
1027 if (unlikely(grp)) 1033 if (unlikely(grp && !dest))
1028 return tipc_send_group_bcast(sock, m, dlen, timeout); 1034 return tipc_send_group_bcast(sock, m, dlen, timeout);
1029 1035
1030 if (unlikely(!dest)) { 1036 if (unlikely(!dest)) {
@@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1420 bool connected = !tipc_sk_type_connectionless(sk); 1426 bool connected = !tipc_sk_type_connectionless(sk);
1421 struct tipc_sock *tsk = tipc_sk(sk); 1427 struct tipc_sock *tsk = tipc_sk(sk);
1422 int rc, err, hlen, dlen, copy; 1428 int rc, err, hlen, dlen, copy;
1429 struct sk_buff_head xmitq;
1423 struct tipc_msg *hdr; 1430 struct tipc_msg *hdr;
1424 struct sk_buff *skb; 1431 struct sk_buff *skb;
1425 bool grp_evt; 1432 bool grp_evt;
@@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1436 } 1443 }
1437 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 1444 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1438 1445
1446 /* Step rcv queue to first msg with data or error; wait if necessary */
1439 do { 1447 do {
1440 /* Look at first msg in receive queue; wait if necessary */
1441 rc = tipc_wait_for_rcvmsg(sock, &timeout); 1448 rc = tipc_wait_for_rcvmsg(sock, &timeout);
1442 if (unlikely(rc)) 1449 if (unlikely(rc))
1443 goto exit; 1450 goto exit;
@@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1485 if (unlikely(flags & MSG_PEEK)) 1492 if (unlikely(flags & MSG_PEEK))
1486 goto exit; 1493 goto exit;
1487 1494
1495 /* Send group flow control advertisement when applicable */
1496 if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1497 skb_queue_head_init(&xmitq);
1498 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1499 msg_orignode(hdr), msg_origport(hdr),
1500 &xmitq);
1501 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1502 }
1503
1488 tsk_advance_rx_queue(sk); 1504 tsk_advance_rx_queue(sk);
1489 1505
1490 if (likely(!connected)) 1506 if (likely(!connected))
1491 goto exit; 1507 goto exit;
1492 1508
1493 /* Send connection flow control ack when applicable */ 1509 /* Send connection flow control advertisement when applicable */
1494 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); 1510 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1495 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) 1511 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1496 tipc_sk_send_ack(tsk); 1512 tipc_sk_send_ack(tsk);
@@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1650 struct tipc_sock *tsk = tipc_sk(sk); 1666 struct tipc_sock *tsk = tipc_sk(sk);
1651 struct tipc_msg *hdr = buf_msg(skb); 1667 struct tipc_msg *hdr = buf_msg(skb);
1652 struct tipc_group *grp = tsk->group; 1668 struct tipc_group *grp = tsk->group;
1669 bool wakeup = false;
1653 1670
1654 switch (msg_user(hdr)) { 1671 switch (msg_user(hdr)) {
1655 case CONN_MANAGER: 1672 case CONN_MANAGER:
@@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1658 case SOCK_WAKEUP: 1675 case SOCK_WAKEUP:
1659 tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); 1676 tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
1660 tsk->cong_link_cnt--; 1677 tsk->cong_link_cnt--;
1661 sk->sk_write_space(sk); 1678 wakeup = true;
1662 break; 1679 break;
1663 case GROUP_PROTOCOL: 1680 case GROUP_PROTOCOL:
1664 tipc_group_proto_rcv(grp, hdr, inputq, xmitq); 1681 tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
1665 break; 1682 break;
1666 case TOP_SRV: 1683 case TOP_SRV:
1667 tipc_group_member_evt(tsk->group, skb, inputq, xmitq); 1684 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1685 skb, inputq, xmitq);
1668 skb = NULL; 1686 skb = NULL;
1669 break; 1687 break;
1670 default: 1688 default:
1671 break; 1689 break;
1672 } 1690 }
1673 1691
1692 if (wakeup)
1693 sk->sk_write_space(sk);
1694
1674 kfree_skb(skb); 1695 kfree_skb(skb);
1675} 1696}
1676 1697
@@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1785 struct tipc_sock *tsk = tipc_sk(sk); 1806 struct tipc_sock *tsk = tipc_sk(sk);
1786 struct tipc_msg *hdr = buf_msg(skb); 1807 struct tipc_msg *hdr = buf_msg(skb);
1787 1808
1809 if (unlikely(msg_in_group(hdr)))
1810 return sk->sk_rcvbuf;
1811
1788 if (unlikely(!msg_connected(hdr))) 1812 if (unlikely(!msg_connected(hdr)))
1789 return sk->sk_rcvbuf << msg_importance(hdr); 1813 return sk->sk_rcvbuf << msg_importance(hdr);
1790 1814