aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2019-04-04 21:29:25 -0400
committerDavid S. Miller <davem@davemloft.net>2019-04-04 21:29:25 -0400
commit8f4043f1253292495dbf9c8be0c1b07b4b9902b7 (patch)
treeeb2b99d9b5a9e4b67f8b110fed5b0106d6e548c0 /net/tipc
parent29502bb127b14841f4fdab8898c851a4a83e136f (diff)
parent58ee86b8c7750a6b67d665a031aa3ff13a9b6863 (diff)
Merge branch 'tipc-improve-TIPC-unicast-link-throughput'
Tuong Lien says: ==================== tipc: improve TIPC unicast link throughput The series introduces an algorithm to improve TIPC throughput especially in terms of packet loss, also tries to reduce packet duplication due to overactive NACK sending mechanism. The link failover situation is also covered by the patches. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c266
-rw-r--r--net/tipc/msg.h52
-rw-r--r--net/tipc/node.h6
3 files changed, 276 insertions, 48 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 52d23b3ffaf5..3cb9f326ee6f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -151,6 +151,7 @@ struct tipc_link {
151 /* Failover/synch */ 151 /* Failover/synch */
152 u16 drop_point; 152 u16 drop_point;
153 struct sk_buff *failover_reasm_skb; 153 struct sk_buff *failover_reasm_skb;
154 struct sk_buff_head failover_deferdq;
154 155
155 /* Max packet negotiation */ 156 /* Max packet negotiation */
156 u16 mtu; 157 u16 mtu;
@@ -209,6 +210,7 @@ enum {
209}; 210};
210 211
211#define TIPC_BC_RETR_LIM msecs_to_jiffies(10) /* [ms] */ 212#define TIPC_BC_RETR_LIM msecs_to_jiffies(10) /* [ms] */
213#define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1))
212 214
213/* 215/*
214 * Interval between NACKs when packets arrive out of order 216 * Interval between NACKs when packets arrive out of order
@@ -246,6 +248,10 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
246static void tipc_link_build_bc_init_msg(struct tipc_link *l, 248static void tipc_link_build_bc_init_msg(struct tipc_link *l,
247 struct sk_buff_head *xmitq); 249 struct sk_buff_head *xmitq);
248static bool tipc_link_release_pkts(struct tipc_link *l, u16 to); 250static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
251static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data);
252static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
253 struct tipc_gap_ack_blks *ga,
254 struct sk_buff_head *xmitq);
249 255
250/* 256/*
251 * Simple non-static link routines (i.e. referenced outside this file) 257 * Simple non-static link routines (i.e. referenced outside this file)
@@ -493,6 +499,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
493 __skb_queue_head_init(&l->transmq); 499 __skb_queue_head_init(&l->transmq);
494 __skb_queue_head_init(&l->backlogq); 500 __skb_queue_head_init(&l->backlogq);
495 __skb_queue_head_init(&l->deferdq); 501 __skb_queue_head_init(&l->deferdq);
502 __skb_queue_head_init(&l->failover_deferdq);
496 skb_queue_head_init(&l->wakeupq); 503 skb_queue_head_init(&l->wakeupq);
497 skb_queue_head_init(l->inputq); 504 skb_queue_head_init(l->inputq);
498 return true; 505 return true;
@@ -883,6 +890,7 @@ void tipc_link_reset(struct tipc_link *l)
883 __skb_queue_purge(&l->transmq); 890 __skb_queue_purge(&l->transmq);
884 __skb_queue_purge(&l->deferdq); 891 __skb_queue_purge(&l->deferdq);
885 __skb_queue_purge(&l->backlogq); 892 __skb_queue_purge(&l->backlogq);
893 __skb_queue_purge(&l->failover_deferdq);
886 l->backlog[TIPC_LOW_IMPORTANCE].len = 0; 894 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
887 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; 895 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
888 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; 896 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
@@ -1154,34 +1162,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1154 * Consumes buffer 1162 * Consumes buffer
1155 */ 1163 */
1156static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 1164static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1157 struct sk_buff_head *inputq) 1165 struct sk_buff_head *inputq,
1166 struct sk_buff **reasm_skb)
1158{ 1167{
1159 struct tipc_msg *hdr = buf_msg(skb); 1168 struct tipc_msg *hdr = buf_msg(skb);
1160 struct sk_buff **reasm_skb = &l->reasm_buf;
1161 struct sk_buff *iskb; 1169 struct sk_buff *iskb;
1162 struct sk_buff_head tmpq; 1170 struct sk_buff_head tmpq;
1163 int usr = msg_user(hdr); 1171 int usr = msg_user(hdr);
1164 int rc = 0;
1165 int pos = 0; 1172 int pos = 0;
1166 int ipos = 0;
1167
1168 if (unlikely(usr == TUNNEL_PROTOCOL)) {
1169 if (msg_type(hdr) == SYNCH_MSG) {
1170 __skb_queue_purge(&l->deferdq);
1171 goto drop;
1172 }
1173 if (!tipc_msg_extract(skb, &iskb, &ipos))
1174 return rc;
1175 kfree_skb(skb);
1176 skb = iskb;
1177 hdr = buf_msg(skb);
1178 if (less(msg_seqno(hdr), l->drop_point))
1179 goto drop;
1180 if (tipc_data_input(l, skb, inputq))
1181 return rc;
1182 usr = msg_user(hdr);
1183 reasm_skb = &l->failover_reasm_skb;
1184 }
1185 1173
1186 if (usr == MSG_BUNDLER) { 1174 if (usr == MSG_BUNDLER) {
1187 skb_queue_head_init(&tmpq); 1175 skb_queue_head_init(&tmpq);
@@ -1206,11 +1194,66 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1206 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); 1194 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
1207 tipc_bcast_unlock(l->net); 1195 tipc_bcast_unlock(l->net);
1208 } 1196 }
1209drop: 1197
1210 kfree_skb(skb); 1198 kfree_skb(skb);
1211 return 0; 1199 return 0;
1212} 1200}
1213 1201
1202/* tipc_link_tnl_rcv() - receive TUNNEL_PROTOCOL message, drop or process the
1203 * inner message along with the ones in the old link's
1204 * deferdq
1205 * @l: tunnel link
1206 * @skb: TUNNEL_PROTOCOL message
1207 * @inputq: queue to put messages ready for delivery
1208 */
1209static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
1210 struct sk_buff_head *inputq)
1211{
1212 struct sk_buff **reasm_skb = &l->failover_reasm_skb;
1213 struct sk_buff_head *fdefq = &l->failover_deferdq;
1214 struct tipc_msg *hdr = buf_msg(skb);
1215 struct sk_buff *iskb;
1216 int ipos = 0;
1217 int rc = 0;
1218 u16 seqno;
1219
1220 /* SYNCH_MSG */
1221 if (msg_type(hdr) == SYNCH_MSG)
1222 goto drop;
1223
1224 /* FAILOVER_MSG */
1225 if (!tipc_msg_extract(skb, &iskb, &ipos)) {
1226 pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
1227 skb_queue_len(fdefq));
1228 return rc;
1229 }
1230
1231 do {
1232 seqno = buf_seqno(iskb);
1233
1234 if (unlikely(less(seqno, l->drop_point))) {
1235 kfree_skb(iskb);
1236 continue;
1237 }
1238
1239 if (unlikely(seqno != l->drop_point)) {
1240 __tipc_skb_queue_sorted(fdefq, seqno, iskb);
1241 continue;
1242 }
1243
1244 l->drop_point++;
1245
1246 if (!tipc_data_input(l, iskb, inputq))
1247 rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
1248 if (unlikely(rc))
1249 break;
1250 } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
1251
1252drop:
1253 kfree_skb(skb);
1254 return rc;
1255}
1256
1214static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) 1257static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1215{ 1258{
1216 bool released = false; 1259 bool released = false;
@@ -1226,6 +1269,106 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1226 return released; 1269 return released;
1227} 1270}
1228 1271
1272/* tipc_build_gap_ack_blks - build Gap ACK blocks
1273 * @l: tipc link that data have come with gaps in sequence if any
1274 * @data: data buffer to store the Gap ACK blocks after built
1275 *
1276 * returns the actual allocated memory size
1277 */
1278static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data)
1279{
1280 struct sk_buff *skb = skb_peek(&l->deferdq);
1281 struct tipc_gap_ack_blks *ga = data;
1282 u16 len, expect, seqno = 0;
1283 u8 n = 0;
1284
1285 if (!skb)
1286 goto exit;
1287
1288 expect = buf_seqno(skb);
1289 skb_queue_walk(&l->deferdq, skb) {
1290 seqno = buf_seqno(skb);
1291 if (unlikely(more(seqno, expect))) {
1292 ga->gacks[n].ack = htons(expect - 1);
1293 ga->gacks[n].gap = htons(seqno - expect);
1294 if (++n >= MAX_GAP_ACK_BLKS) {
1295 pr_info_ratelimited("Too few Gap ACK blocks!\n");
1296 goto exit;
1297 }
1298 } else if (unlikely(less(seqno, expect))) {
1299 pr_warn("Unexpected skb in deferdq!\n");
1300 continue;
1301 }
1302 expect = seqno + 1;
1303 }
1304
1305 /* last block */
1306 ga->gacks[n].ack = htons(seqno);
1307 ga->gacks[n].gap = 0;
1308 n++;
1309
1310exit:
1311 len = tipc_gap_ack_blks_sz(n);
1312 ga->len = htons(len);
1313 ga->gack_cnt = n;
1314 return len;
1315}
1316
1317/* tipc_link_advance_transmq - advance TIPC link transmq queue by releasing
1318 * acked packets, also doing retransmissions if
1319 * gaps found
1320 * @l: tipc link with transmq queue to be advanced
1321 * @acked: seqno of last packet acked by peer without any gaps before
1322 * @gap: # of gap packets
1323 * @ga: buffer pointer to Gap ACK blocks from peer
1324 * @xmitq: queue for accumulating the retransmitted packets if any
1325 */
1326static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
1327 struct tipc_gap_ack_blks *ga,
1328 struct sk_buff_head *xmitq)
1329{
1330 struct sk_buff *skb, *_skb, *tmp;
1331 struct tipc_msg *hdr;
1332 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
1333 u16 ack = l->rcv_nxt - 1;
1334 u16 seqno;
1335 u16 n = 0;
1336
1337 skb_queue_walk_safe(&l->transmq, skb, tmp) {
1338 seqno = buf_seqno(skb);
1339
1340next_gap_ack:
1341 if (less_eq(seqno, acked)) {
1342 /* release skb */
1343 __skb_unlink(skb, &l->transmq);
1344 kfree_skb(skb);
1345 } else if (less_eq(seqno, acked + gap)) {
1346 /* retransmit skb */
1347 if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
1348 continue;
1349 TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
1350
1351 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
1352 if (!_skb)
1353 continue;
1354 hdr = buf_msg(_skb);
1355 msg_set_ack(hdr, ack);
1356 msg_set_bcast_ack(hdr, bc_ack);
1357 _skb->priority = TC_PRIO_CONTROL;
1358 __skb_queue_tail(xmitq, _skb);
1359 l->stats.retransmitted++;
1360 } else {
1361 /* retry with Gap ACK blocks if any */
1362 if (!ga || n >= ga->gack_cnt)
1363 break;
1364 acked = ntohs(ga->gacks[n].ack);
1365 gap = ntohs(ga->gacks[n].gap);
1366 n++;
1367 goto next_gap_ack;
1368 }
1369 }
1370}
1371
1229/* tipc_link_build_state_msg: prepare link state message for transmission 1372/* tipc_link_build_state_msg: prepare link state message for transmission
1230 * 1373 *
1231 * Note that sending of broadcast ack is coordinated among nodes, to reduce 1374 * Note that sending of broadcast ack is coordinated among nodes, to reduce
@@ -1280,6 +1423,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
1280 struct sk_buff_head *xmitq) 1423 struct sk_buff_head *xmitq)
1281{ 1424{
1282 u32 def_cnt = ++l->stats.deferred_recv; 1425 u32 def_cnt = ++l->stats.deferred_recv;
1426 u32 defq_len = skb_queue_len(&l->deferdq);
1283 int match1, match2; 1427 int match1, match2;
1284 1428
1285 if (link_is_bc_rcvlink(l)) { 1429 if (link_is_bc_rcvlink(l)) {
@@ -1290,7 +1434,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
1290 return 0; 1434 return 0;
1291 } 1435 }
1292 1436
1293 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1437 if (defq_len >= 3 && !((defq_len - 3) % 16))
1294 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq); 1438 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq);
1295 return 0; 1439 return 0;
1296} 1440}
@@ -1304,29 +1448,29 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1304 struct sk_buff_head *xmitq) 1448 struct sk_buff_head *xmitq)
1305{ 1449{
1306 struct sk_buff_head *defq = &l->deferdq; 1450 struct sk_buff_head *defq = &l->deferdq;
1307 struct tipc_msg *hdr; 1451 struct tipc_msg *hdr = buf_msg(skb);
1308 u16 seqno, rcv_nxt, win_lim; 1452 u16 seqno, rcv_nxt, win_lim;
1309 int rc = 0; 1453 int rc = 0;
1310 1454
1455 /* Verify and update link state */
1456 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1457 return tipc_link_proto_rcv(l, skb, xmitq);
1458
1459 /* Don't send probe at next timeout expiration */
1460 l->silent_intv_cnt = 0;
1461
1311 do { 1462 do {
1312 hdr = buf_msg(skb); 1463 hdr = buf_msg(skb);
1313 seqno = msg_seqno(hdr); 1464 seqno = msg_seqno(hdr);
1314 rcv_nxt = l->rcv_nxt; 1465 rcv_nxt = l->rcv_nxt;
1315 win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; 1466 win_lim = rcv_nxt + TIPC_MAX_LINK_WIN;
1316 1467
1317 /* Verify and update link state */
1318 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1319 return tipc_link_proto_rcv(l, skb, xmitq);
1320
1321 if (unlikely(!link_is_up(l))) { 1468 if (unlikely(!link_is_up(l))) {
1322 if (l->state == LINK_ESTABLISHING) 1469 if (l->state == LINK_ESTABLISHING)
1323 rc = TIPC_LINK_UP_EVT; 1470 rc = TIPC_LINK_UP_EVT;
1324 goto drop; 1471 goto drop;
1325 } 1472 }
1326 1473
1327 /* Don't send probe at next timeout expiration */
1328 l->silent_intv_cnt = 0;
1329
1330 /* Drop if outside receive window */ 1474 /* Drop if outside receive window */
1331 if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) { 1475 if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) {
1332 l->stats.duplicates++; 1476 l->stats.duplicates++;
@@ -1351,13 +1495,16 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1351 /* Deliver packet */ 1495 /* Deliver packet */
1352 l->rcv_nxt++; 1496 l->rcv_nxt++;
1353 l->stats.recv_pkts++; 1497 l->stats.recv_pkts++;
1354 if (!tipc_data_input(l, skb, l->inputq)) 1498
1355 rc |= tipc_link_input(l, skb, l->inputq); 1499 if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL))
1500 rc |= tipc_link_tnl_rcv(l, skb, l->inputq);
1501 else if (!tipc_data_input(l, skb, l->inputq))
1502 rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf);
1356 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1503 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1357 rc |= tipc_link_build_state_msg(l, xmitq); 1504 rc |= tipc_link_build_state_msg(l, xmitq);
1358 if (unlikely(rc & ~TIPC_LINK_SND_STATE)) 1505 if (unlikely(rc & ~TIPC_LINK_SND_STATE))
1359 break; 1506 break;
1360 } while ((skb = __skb_dequeue(defq))); 1507 } while ((skb = __tipc_skb_dequeue(defq, l->rcv_nxt)));
1361 1508
1362 return rc; 1509 return rc;
1363drop: 1510drop:
@@ -1378,6 +1525,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1378 struct tipc_mon_state *mstate = &l->mon_state; 1525 struct tipc_mon_state *mstate = &l->mon_state;
1379 int dlen = 0; 1526 int dlen = 0;
1380 void *data; 1527 void *data;
1528 u16 glen = 0;
1381 1529
1382 /* Don't send protocol message during reset or link failover */ 1530 /* Don't send protocol message during reset or link failover */
1383 if (tipc_link_is_blocked(l)) 1531 if (tipc_link_is_blocked(l))
@@ -1390,8 +1538,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1390 rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt; 1538 rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
1391 1539
1392 skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE, 1540 skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
1393 tipc_max_domain_size, l->addr, 1541 tipc_max_domain_size + MAX_GAP_ACK_BLKS_SZ,
1394 tipc_own_addr(l->net), 0, 0, 0); 1542 l->addr, tipc_own_addr(l->net), 0, 0, 0);
1395 if (!skb) 1543 if (!skb)
1396 return; 1544 return;
1397 1545
@@ -1418,9 +1566,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1418 msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); 1566 msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
1419 msg_set_probe(hdr, probe); 1567 msg_set_probe(hdr, probe);
1420 msg_set_is_keepalive(hdr, probe || probe_reply); 1568 msg_set_is_keepalive(hdr, probe || probe_reply);
1421 tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id); 1569 if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
1422 msg_set_size(hdr, INT_H_SIZE + dlen); 1570 glen = tipc_build_gap_ack_blks(l, data);
1423 skb_trim(skb, INT_H_SIZE + dlen); 1571 tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id);
1572 msg_set_size(hdr, INT_H_SIZE + glen + dlen);
1573 skb_trim(skb, INT_H_SIZE + glen + dlen);
1424 l->stats.sent_states++; 1574 l->stats.sent_states++;
1425 l->rcv_unacked = 0; 1575 l->rcv_unacked = 0;
1426 } else { 1576 } else {
@@ -1479,6 +1629,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
1479void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 1629void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
1480 int mtyp, struct sk_buff_head *xmitq) 1630 int mtyp, struct sk_buff_head *xmitq)
1481{ 1631{
1632 struct sk_buff_head *fdefq = &tnl->failover_deferdq;
1482 struct sk_buff *skb, *tnlskb; 1633 struct sk_buff *skb, *tnlskb;
1483 struct tipc_msg *hdr, tnlhdr; 1634 struct tipc_msg *hdr, tnlhdr;
1484 struct sk_buff_head *queue = &l->transmq; 1635 struct sk_buff_head *queue = &l->transmq;
@@ -1506,7 +1657,11 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
1506 /* Initialize reusable tunnel packet header */ 1657 /* Initialize reusable tunnel packet header */
1507 tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, 1658 tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
1508 mtyp, INT_H_SIZE, l->addr); 1659 mtyp, INT_H_SIZE, l->addr);
1509 pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); 1660 if (mtyp == SYNCH_MSG)
1661 pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq));
1662 else
1663 pktcnt = skb_queue_len(&l->transmq);
1664 pktcnt += skb_queue_len(&l->backlogq);
1510 msg_set_msgcnt(&tnlhdr, pktcnt); 1665 msg_set_msgcnt(&tnlhdr, pktcnt);
1511 msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); 1666 msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);
1512tnl: 1667tnl:
@@ -1537,6 +1692,14 @@ tnl:
1537 tnl->drop_point = l->rcv_nxt; 1692 tnl->drop_point = l->rcv_nxt;
1538 tnl->failover_reasm_skb = l->reasm_buf; 1693 tnl->failover_reasm_skb = l->reasm_buf;
1539 l->reasm_buf = NULL; 1694 l->reasm_buf = NULL;
1695
1696 /* Failover the link's deferdq */
1697 if (unlikely(!skb_queue_empty(fdefq))) {
1698 pr_warn("Link failover deferdq not empty: %d!\n",
1699 skb_queue_len(fdefq));
1700 __skb_queue_purge(fdefq);
1701 }
1702 skb_queue_splice_init(&l->deferdq, fdefq);
1540 } 1703 }
1541} 1704}
1542 1705
@@ -1590,6 +1753,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1590 struct sk_buff_head *xmitq) 1753 struct sk_buff_head *xmitq)
1591{ 1754{
1592 struct tipc_msg *hdr = buf_msg(skb); 1755 struct tipc_msg *hdr = buf_msg(skb);
1756 struct tipc_gap_ack_blks *ga = NULL;
1593 u16 rcvgap = 0; 1757 u16 rcvgap = 0;
1594 u16 ack = msg_ack(hdr); 1758 u16 ack = msg_ack(hdr);
1595 u16 gap = msg_seq_gap(hdr); 1759 u16 gap = msg_seq_gap(hdr);
@@ -1600,6 +1764,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1600 u16 dlen = msg_data_sz(hdr); 1764 u16 dlen = msg_data_sz(hdr);
1601 int mtyp = msg_type(hdr); 1765 int mtyp = msg_type(hdr);
1602 bool reply = msg_probe(hdr); 1766 bool reply = msg_probe(hdr);
1767 u16 glen = 0;
1603 void *data; 1768 void *data;
1604 char *if_name; 1769 char *if_name;
1605 int rc = 0; 1770 int rc = 0;
@@ -1697,7 +1862,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1697 rc = TIPC_LINK_UP_EVT; 1862 rc = TIPC_LINK_UP_EVT;
1698 break; 1863 break;
1699 } 1864 }
1700 tipc_mon_rcv(l->net, data, dlen, l->addr, 1865
1866 /* Receive Gap ACK blocks from peer if any */
1867 if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
1868 ga = (struct tipc_gap_ack_blks *)data;
1869 glen = ntohs(ga->len);
1870 /* sanity check: if failed, ignore Gap ACK blocks */
1871 if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt))
1872 ga = NULL;
1873 }
1874
1875 tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr,
1701 &l->mon_state, l->bearer_id); 1876 &l->mon_state, l->bearer_id);
1702 1877
1703 /* Send NACK if peer has sent pkts we haven't received yet */ 1878 /* Send NACK if peer has sent pkts we haven't received yet */
@@ -1706,13 +1881,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1706 if (rcvgap || reply) 1881 if (rcvgap || reply)
1707 tipc_link_build_proto_msg(l, STATE_MSG, 0, reply, 1882 tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
1708 rcvgap, 0, 0, xmitq); 1883 rcvgap, 0, 0, xmitq);
1709 tipc_link_release_pkts(l, ack); 1884
1885 tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
1710 1886
1711 /* If NACK, retransmit will now start at right position */ 1887 /* If NACK, retransmit will now start at right position */
1712 if (gap) { 1888 if (gap)
1713 rc = tipc_link_retrans(l, l, ack + 1, ack + gap, xmitq);
1714 l->stats.recv_nacks++; 1889 l->stats.recv_nacks++;
1715 }
1716 1890
1717 tipc_link_advance_backlog(l, xmitq); 1891 tipc_link_advance_backlog(l, xmitq);
1718 if (unlikely(!skb_queue_empty(&l->wakeupq))) 1892 if (unlikely(!skb_queue_empty(&l->wakeupq)))
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 528ba9241acc..8de02ad6e352 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -117,6 +117,37 @@ struct tipc_msg {
117 __be32 hdr[15]; 117 __be32 hdr[15];
118}; 118};
119 119
120/* struct tipc_gap_ack - TIPC Gap ACK block
121 * @ack: seqno of the last consecutive packet in link deferdq
122 * @gap: number of gap packets since the last ack
123 *
124 * E.g:
125 * link deferdq: 1 2 3 4 10 11 13 14 15 20
126 * --> Gap ACK blocks: <4, 5>, <11, 1>, <15, 4>, <20, 0>
127 */
128struct tipc_gap_ack {
129 __be16 ack;
130 __be16 gap;
131};
132
133/* struct tipc_gap_ack_blks
134 * @len: actual length of the record
135 * @gack_cnt: number of Gap ACK blocks in the record
136 * @gacks: array of Gap ACK blocks
137 */
138struct tipc_gap_ack_blks {
139 __be16 len;
140 u8 gack_cnt;
141 u8 reserved;
142 struct tipc_gap_ack gacks[];
143};
144
145#define tipc_gap_ack_blks_sz(n) (sizeof(struct tipc_gap_ack_blks) + \
146 sizeof(struct tipc_gap_ack) * (n))
147
148#define MAX_GAP_ACK_BLKS 32
149#define MAX_GAP_ACK_BLKS_SZ tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS)
150
120static inline struct tipc_msg *buf_msg(struct sk_buff *skb) 151static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
121{ 152{
122 return (struct tipc_msg *)skb->data; 153 return (struct tipc_msg *)skb->data;
@@ -1120,4 +1151,25 @@ static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list,
1120 tipc_skb_queue_splice_tail(&tmp, head); 1151 tipc_skb_queue_splice_tail(&tmp, head);
1121} 1152}
1122 1153
1154/* __tipc_skb_dequeue() - dequeue the head skb according to expected seqno
1155 * @list: list to be dequeued from
1156 * @seqno: seqno of the expected msg
1157 *
1158 * returns skb dequeued from the list if its seqno is less than or equal to
1159 * the expected one, otherwise the skb is still hold
1160 *
1161 * Note: must be used with appropriate locks held only
1162 */
1163static inline struct sk_buff *__tipc_skb_dequeue(struct sk_buff_head *list,
1164 u16 seqno)
1165{
1166 struct sk_buff *skb = skb_peek(list);
1167
1168 if (skb && less_eq(buf_seqno(skb), seqno)) {
1169 __skb_unlink(skb, list);
1170 return skb;
1171 }
1172 return NULL;
1173}
1174
1123#endif 1175#endif
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 2404225c5d58..c0bf49ea3de4 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -52,7 +52,8 @@ enum {
52 TIPC_BCAST_RCAST = (1 << 4), 52 TIPC_BCAST_RCAST = (1 << 4),
53 TIPC_NODE_ID128 = (1 << 5), 53 TIPC_NODE_ID128 = (1 << 5),
54 TIPC_LINK_PROTO_SEQNO = (1 << 6), 54 TIPC_LINK_PROTO_SEQNO = (1 << 6),
55 TIPC_MCAST_RBCTL = (1 << 7) 55 TIPC_MCAST_RBCTL = (1 << 7),
56 TIPC_GAP_ACK_BLOCK = (1 << 8)
56}; 57};
57 58
58#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ 59#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
@@ -62,7 +63,8 @@ enum {
62 TIPC_BLOCK_FLOWCTL | \ 63 TIPC_BLOCK_FLOWCTL | \
63 TIPC_NODE_ID128 | \ 64 TIPC_NODE_ID128 | \
64 TIPC_LINK_PROTO_SEQNO | \ 65 TIPC_LINK_PROTO_SEQNO | \
65 TIPC_MCAST_RBCTL) 66 TIPC_MCAST_RBCTL | \
67 TIPC_GAP_ACK_BLOCK)
66#define INVALID_BEARER_ID -1 68#define INVALID_BEARER_ID -1
67 69
68void tipc_node_stop(struct net *net); 70void tipc_node_stop(struct net *net);