aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorTuong Lien <tuong.t.lien@dektech.com.au>2019-04-04 00:09:53 -0400
committerDavid S. Miller <davem@davemloft.net>2019-04-04 21:29:25 -0400
commit58ee86b8c7750a6b67d665a031aa3ff13a9b6863 (patch)
treeeb2b99d9b5a9e4b67f8b110fed5b0106d6e548c0 /net/tipc
parent382f598fb66b14a8451f2794abf70ea7b5826c96 (diff)
tipc: adapt link failover for new Gap-ACK algorithm
In commit 0ae955e2656d ("tipc: improve TIPC throughput by Gap ACK blocks"), we enhance the link transmq by releasing as many packets as possible with the multi-ACKs from peer node. This also means the queue is now non-linear and the peer link deferdq becomes vital. Whereas, in the case of link failover, all messages in the link transmq need to be transmitted as tunnel messages in such a way that message sequentiality and cardinality per sender is preserved. This requires us to maintain the link deferdq somehow, so that when the tunnel messages arrive, the inner user messages along with the ones in the deferdq will be delivered to upper layer correctly. The commit accomplishes this by defining a new queue in the TIPC link structure to hold the old link deferdq when link failover happens and process it upon receipt of tunnel messages. Also, in the case of link syncing, the link deferdq will not be purged to avoid unnecessary retransmissions that in the worst case will fail because the packets might have been freed on the sending side. Acked-by: Ying Xue <ying.xue@windriver.com> Acked-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c106
1 files changed, 80 insertions, 26 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1f2cde0d025f..3cb9f326ee6f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -151,6 +151,7 @@ struct tipc_link {
151 /* Failover/synch */ 151 /* Failover/synch */
152 u16 drop_point; 152 u16 drop_point;
153 struct sk_buff *failover_reasm_skb; 153 struct sk_buff *failover_reasm_skb;
154 struct sk_buff_head failover_deferdq;
154 155
155 /* Max packet negotiation */ 156 /* Max packet negotiation */
156 u16 mtu; 157 u16 mtu;
@@ -498,6 +499,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
498 __skb_queue_head_init(&l->transmq); 499 __skb_queue_head_init(&l->transmq);
499 __skb_queue_head_init(&l->backlogq); 500 __skb_queue_head_init(&l->backlogq);
500 __skb_queue_head_init(&l->deferdq); 501 __skb_queue_head_init(&l->deferdq);
502 __skb_queue_head_init(&l->failover_deferdq);
501 skb_queue_head_init(&l->wakeupq); 503 skb_queue_head_init(&l->wakeupq);
502 skb_queue_head_init(l->inputq); 504 skb_queue_head_init(l->inputq);
503 return true; 505 return true;
@@ -888,6 +890,7 @@ void tipc_link_reset(struct tipc_link *l)
888 __skb_queue_purge(&l->transmq); 890 __skb_queue_purge(&l->transmq);
889 __skb_queue_purge(&l->deferdq); 891 __skb_queue_purge(&l->deferdq);
890 __skb_queue_purge(&l->backlogq); 892 __skb_queue_purge(&l->backlogq);
893 __skb_queue_purge(&l->failover_deferdq);
891 l->backlog[TIPC_LOW_IMPORTANCE].len = 0; 894 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
892 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; 895 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
893 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; 896 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
@@ -1159,34 +1162,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1159 * Consumes buffer 1162 * Consumes buffer
1160 */ 1163 */
1161static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 1164static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1162 struct sk_buff_head *inputq) 1165 struct sk_buff_head *inputq,
1166 struct sk_buff **reasm_skb)
1163{ 1167{
1164 struct tipc_msg *hdr = buf_msg(skb); 1168 struct tipc_msg *hdr = buf_msg(skb);
1165 struct sk_buff **reasm_skb = &l->reasm_buf;
1166 struct sk_buff *iskb; 1169 struct sk_buff *iskb;
1167 struct sk_buff_head tmpq; 1170 struct sk_buff_head tmpq;
1168 int usr = msg_user(hdr); 1171 int usr = msg_user(hdr);
1169 int rc = 0;
1170 int pos = 0; 1172 int pos = 0;
1171 int ipos = 0;
1172
1173 if (unlikely(usr == TUNNEL_PROTOCOL)) {
1174 if (msg_type(hdr) == SYNCH_MSG) {
1175 __skb_queue_purge(&l->deferdq);
1176 goto drop;
1177 }
1178 if (!tipc_msg_extract(skb, &iskb, &ipos))
1179 return rc;
1180 kfree_skb(skb);
1181 skb = iskb;
1182 hdr = buf_msg(skb);
1183 if (less(msg_seqno(hdr), l->drop_point))
1184 goto drop;
1185 if (tipc_data_input(l, skb, inputq))
1186 return rc;
1187 usr = msg_user(hdr);
1188 reasm_skb = &l->failover_reasm_skb;
1189 }
1190 1173
1191 if (usr == MSG_BUNDLER) { 1174 if (usr == MSG_BUNDLER) {
1192 skb_queue_head_init(&tmpq); 1175 skb_queue_head_init(&tmpq);
@@ -1211,11 +1194,66 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1211 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); 1194 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
1212 tipc_bcast_unlock(l->net); 1195 tipc_bcast_unlock(l->net);
1213 } 1196 }
1214drop: 1197
1215 kfree_skb(skb); 1198 kfree_skb(skb);
1216 return 0; 1199 return 0;
1217} 1200}
1218 1201
1202/* tipc_link_tnl_rcv() - receive TUNNEL_PROTOCOL message, drop or process the
1203 * inner message along with the ones in the old link's
1204 * deferdq
1205 * @l: tunnel link
1206 * @skb: TUNNEL_PROTOCOL message
1207 * @inputq: queue to put messages ready for delivery
1208 */
1209static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
1210 struct sk_buff_head *inputq)
1211{
1212 struct sk_buff **reasm_skb = &l->failover_reasm_skb;
1213 struct sk_buff_head *fdefq = &l->failover_deferdq;
1214 struct tipc_msg *hdr = buf_msg(skb);
1215 struct sk_buff *iskb;
1216 int ipos = 0;
1217 int rc = 0;
1218 u16 seqno;
1219
1220 /* SYNCH_MSG */
1221 if (msg_type(hdr) == SYNCH_MSG)
1222 goto drop;
1223
1224 /* FAILOVER_MSG */
1225 if (!tipc_msg_extract(skb, &iskb, &ipos)) {
1226 pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
1227 skb_queue_len(fdefq));
1228 return rc;
1229 }
1230
1231 do {
1232 seqno = buf_seqno(iskb);
1233
1234 if (unlikely(less(seqno, l->drop_point))) {
1235 kfree_skb(iskb);
1236 continue;
1237 }
1238
1239 if (unlikely(seqno != l->drop_point)) {
1240 __tipc_skb_queue_sorted(fdefq, seqno, iskb);
1241 continue;
1242 }
1243
1244 l->drop_point++;
1245
1246 if (!tipc_data_input(l, iskb, inputq))
1247 rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
1248 if (unlikely(rc))
1249 break;
1250 } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
1251
1252drop:
1253 kfree_skb(skb);
1254 return rc;
1255}
1256
1219static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) 1257static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1220{ 1258{
1221 bool released = false; 1259 bool released = false;
@@ -1457,8 +1495,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1457 /* Deliver packet */ 1495 /* Deliver packet */
1458 l->rcv_nxt++; 1496 l->rcv_nxt++;
1459 l->stats.recv_pkts++; 1497 l->stats.recv_pkts++;
1460 if (!tipc_data_input(l, skb, l->inputq)) 1498
1461 rc |= tipc_link_input(l, skb, l->inputq); 1499 if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL))
1500 rc |= tipc_link_tnl_rcv(l, skb, l->inputq);
1501 else if (!tipc_data_input(l, skb, l->inputq))
1502 rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf);
1462 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1503 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1463 rc |= tipc_link_build_state_msg(l, xmitq); 1504 rc |= tipc_link_build_state_msg(l, xmitq);
1464 if (unlikely(rc & ~TIPC_LINK_SND_STATE)) 1505 if (unlikely(rc & ~TIPC_LINK_SND_STATE))
@@ -1588,6 +1629,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
1588void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 1629void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
1589 int mtyp, struct sk_buff_head *xmitq) 1630 int mtyp, struct sk_buff_head *xmitq)
1590{ 1631{
1632 struct sk_buff_head *fdefq = &tnl->failover_deferdq;
1591 struct sk_buff *skb, *tnlskb; 1633 struct sk_buff *skb, *tnlskb;
1592 struct tipc_msg *hdr, tnlhdr; 1634 struct tipc_msg *hdr, tnlhdr;
1593 struct sk_buff_head *queue = &l->transmq; 1635 struct sk_buff_head *queue = &l->transmq;
@@ -1615,7 +1657,11 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
1615 /* Initialize reusable tunnel packet header */ 1657 /* Initialize reusable tunnel packet header */
1616 tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, 1658 tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
1617 mtyp, INT_H_SIZE, l->addr); 1659 mtyp, INT_H_SIZE, l->addr);
1618 pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); 1660 if (mtyp == SYNCH_MSG)
1661 pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq));
1662 else
1663 pktcnt = skb_queue_len(&l->transmq);
1664 pktcnt += skb_queue_len(&l->backlogq);
1619 msg_set_msgcnt(&tnlhdr, pktcnt); 1665 msg_set_msgcnt(&tnlhdr, pktcnt);
1620 msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); 1666 msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);
1621tnl: 1667tnl:
@@ -1646,6 +1692,14 @@ tnl:
1646 tnl->drop_point = l->rcv_nxt; 1692 tnl->drop_point = l->rcv_nxt;
1647 tnl->failover_reasm_skb = l->reasm_buf; 1693 tnl->failover_reasm_skb = l->reasm_buf;
1648 l->reasm_buf = NULL; 1694 l->reasm_buf = NULL;
1695
1696 /* Failover the link's deferdq */
1697 if (unlikely(!skb_queue_empty(fdefq))) {
1698 pr_warn("Link failover deferdq not empty: %d!\n",
1699 skb_queue_len(fdefq));
1700 __skb_queue_purge(fdefq);
1701 }
1702 skb_queue_splice_init(&l->deferdq, fdefq);
1649 } 1703 }
1650} 1704}
1651 1705