aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:41 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:02 -0500
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/link.c
parent94153e36e709e78fc4e1f93dc4e4da785690c7d1 (diff)
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c247
1 files changed, 104 insertions, 143 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 41cb09aa41de..942491234099 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -113,10 +113,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
113static void link_print(struct tipc_link *l_ptr, const char *str); 113static void link_print(struct tipc_link *l_ptr, const char *str);
114static void tipc_link_sync_xmit(struct tipc_link *l); 114static void tipc_link_sync_xmit(struct tipc_link *l);
115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
116static int tipc_link_input(struct net *net, struct tipc_link *l, 116static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
117 struct sk_buff *buf); 117static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
118static int tipc_link_prepare_input(struct net *net, struct tipc_link *l,
119 struct sk_buff **buf);
120 118
121/* 119/*
122 * Simple link routines 120 * Simple link routines
@@ -318,8 +316,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
318 l_ptr->next_out_no = 1; 316 l_ptr->next_out_no = 1;
319 __skb_queue_head_init(&l_ptr->outqueue); 317 __skb_queue_head_init(&l_ptr->outqueue);
320 __skb_queue_head_init(&l_ptr->deferred_queue); 318 __skb_queue_head_init(&l_ptr->deferred_queue);
321 skb_queue_head_init(&l_ptr->waiting_sks); 319 skb_queue_head_init(&l_ptr->wakeupq);
322 320 skb_queue_head_init(&l_ptr->inputq);
321 skb_queue_head_init(&l_ptr->namedq);
323 link_reset_statistics(l_ptr); 322 link_reset_statistics(l_ptr);
324 tipc_node_attach_link(n_ptr, l_ptr); 323 tipc_node_attach_link(n_ptr, l_ptr);
325 setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); 324 setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
@@ -387,7 +386,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
387 return false; 386 return false;
388 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 387 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
389 TIPC_SKB_CB(buf)->chain_imp = imp; 388 TIPC_SKB_CB(buf)->chain_imp = imp;
390 skb_queue_tail(&link->waiting_sks, buf); 389 skb_queue_tail(&link->wakeupq, buf);
391 link->stats.link_congs++; 390 link->stats.link_congs++;
392 return true; 391 return true;
393} 392}
@@ -398,17 +397,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
398 * Move a number of waiting users, as permitted by available space in 397 * Move a number of waiting users, as permitted by available space in
399 * the send queue, from link wait queue to node wait queue for wakeup 398 * the send queue, from link wait queue to node wait queue for wakeup
400 */ 399 */
401static void link_prepare_wakeup(struct tipc_link *link) 400void link_prepare_wakeup(struct tipc_link *link)
402{ 401{
403 uint pend_qsz = skb_queue_len(&link->outqueue); 402 uint pend_qsz = skb_queue_len(&link->outqueue);
404 struct sk_buff *skb, *tmp; 403 struct sk_buff *skb, *tmp;
405 404
406 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { 405 skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
407 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) 406 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
408 break; 407 break;
409 pend_qsz += TIPC_SKB_CB(skb)->chain_sz; 408 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
410 skb_unlink(skb, &link->waiting_sks); 409 skb_unlink(skb, &link->wakeupq);
411 skb_queue_tail(&link->owner->waiting_sks, skb); 410 skb_queue_tail(&link->inputq, skb);
411 link->owner->inputq = &link->inputq;
412 link->owner->action_flags |= TIPC_MSG_EVT;
412 } 413 }
413} 414}
414 415
@@ -461,13 +462,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
461 l_ptr->exp_msg_count = START_CHANGEOVER; 462 l_ptr->exp_msg_count = START_CHANGEOVER;
462 } 463 }
463 464
464 /* Clean up all queues: */ 465 /* Clean up all queues, except inputq: */
465 __skb_queue_purge(&l_ptr->outqueue); 466 __skb_queue_purge(&l_ptr->outqueue);
466 __skb_queue_purge(&l_ptr->deferred_queue); 467 __skb_queue_purge(&l_ptr->deferred_queue);
467 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 468 skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq);
468 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 469 if (!skb_queue_empty(&l_ptr->inputq))
469 owner->action_flags |= TIPC_WAKEUP_USERS; 470 owner->action_flags |= TIPC_MSG_EVT;
470 } 471 owner->inputq = &l_ptr->inputq;
471 l_ptr->next_out = NULL; 472 l_ptr->next_out = NULL;
472 l_ptr->unacked_window = 0; 473 l_ptr->unacked_window = 0;
473 l_ptr->checkpoint = 1; 474 l_ptr->checkpoint = 1;
@@ -795,7 +796,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
795 796
796static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) 797static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
797{ 798{
798 __skb_queue_head_init(list); 799 skb_queue_head_init(list);
799 __skb_queue_tail(list, skb); 800 __skb_queue_tail(list, skb);
800} 801}
801 802
@@ -841,19 +842,13 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
841 rc = __tipc_link_xmit(net, link, list); 842 rc = __tipc_link_xmit(net, link, list);
842 tipc_node_unlock(node); 843 tipc_node_unlock(node);
843 } 844 }
844
845 if (link) 845 if (link)
846 return rc; 846 return rc;
847 847
848 if (likely(in_own_node(net, dnode))) { 848 if (likely(in_own_node(net, dnode)))
849 /* As a node local message chain never contains more than one 849 return tipc_sk_rcv(net, list);
850 * buffer, we just need to dequeue one SKB buffer from the
851 * head list.
852 */
853 return tipc_sk_rcv(net, __skb_dequeue(list));
854 }
855 __skb_queue_purge(list);
856 850
851 __skb_queue_purge(list);
857 return rc; 852 return rc;
858} 853}
859 854
@@ -1162,7 +1157,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1162 /* Locate unicast link endpoint that should handle message */ 1157 /* Locate unicast link endpoint that should handle message */
1163 l_ptr = n_ptr->links[b_ptr->identity]; 1158 l_ptr = n_ptr->links[b_ptr->identity];
1164 if (unlikely(!l_ptr)) 1159 if (unlikely(!l_ptr))
1165 goto unlock_discard; 1160 goto unlock;
1166 1161
1167 /* Verify that communication with node is currently allowed */ 1162 /* Verify that communication with node is currently allowed */
1168 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && 1163 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
@@ -1173,7 +1168,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1173 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; 1168 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1174 1169
1175 if (tipc_node_blocked(n_ptr)) 1170 if (tipc_node_blocked(n_ptr))
1176 goto unlock_discard; 1171 goto unlock;
1177 1172
1178 /* Validate message sequence number info */ 1173 /* Validate message sequence number info */
1179 seq_no = msg_seqno(msg); 1174 seq_no = msg_seqno(msg);
@@ -1197,18 +1192,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1197 if (unlikely(l_ptr->next_out)) 1192 if (unlikely(l_ptr->next_out))
1198 tipc_link_push_packets(l_ptr); 1193 tipc_link_push_packets(l_ptr);
1199 1194
1200 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1195 if (released && !skb_queue_empty(&l_ptr->wakeupq))
1201 link_prepare_wakeup(l_ptr); 1196 link_prepare_wakeup(l_ptr);
1202 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1203 }
1204 1197
1205 /* Process the incoming packet */ 1198 /* Process the incoming packet */
1206 if (unlikely(!link_working_working(l_ptr))) { 1199 if (unlikely(!link_working_working(l_ptr))) {
1207 if (msg_user(msg) == LINK_PROTOCOL) { 1200 if (msg_user(msg) == LINK_PROTOCOL) {
1208 tipc_link_proto_rcv(l_ptr, skb); 1201 tipc_link_proto_rcv(l_ptr, skb);
1209 link_retrieve_defq(l_ptr, &head); 1202 link_retrieve_defq(l_ptr, &head);
1210 tipc_node_unlock(n_ptr); 1203 skb = NULL;
1211 continue; 1204 goto unlock;
1212 } 1205 }
1213 1206
1214 /* Traffic message. Conditionally activate link */ 1207 /* Traffic message. Conditionally activate link */
@@ -1217,18 +1210,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1217 if (link_working_working(l_ptr)) { 1210 if (link_working_working(l_ptr)) {
1218 /* Re-insert buffer in front of queue */ 1211 /* Re-insert buffer in front of queue */
1219 __skb_queue_head(&head, skb); 1212 __skb_queue_head(&head, skb);
1220 tipc_node_unlock(n_ptr); 1213 skb = NULL;
1221 continue; 1214 goto unlock;
1222 } 1215 }
1223 goto unlock_discard; 1216 goto unlock;
1224 } 1217 }
1225 1218
1226 /* Link is now in state WORKING_WORKING */ 1219 /* Link is now in state WORKING_WORKING */
1227 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1220 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1228 link_handle_out_of_seq_msg(l_ptr, skb); 1221 link_handle_out_of_seq_msg(l_ptr, skb);
1229 link_retrieve_defq(l_ptr, &head); 1222 link_retrieve_defq(l_ptr, &head);
1230 tipc_node_unlock(n_ptr); 1223 skb = NULL;
1231 continue; 1224 goto unlock;
1232 } 1225 }
1233 l_ptr->next_in_no++; 1226 l_ptr->next_in_no++;
1234 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) 1227 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
@@ -1238,97 +1231,102 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1238 l_ptr->stats.sent_acks++; 1231 l_ptr->stats.sent_acks++;
1239 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1232 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1240 } 1233 }
1241 1234 tipc_link_input(l_ptr, skb);
1242 if (tipc_link_prepare_input(net, l_ptr, &skb)) { 1235 skb = NULL;
1243 tipc_node_unlock(n_ptr); 1236unlock:
1244 continue;
1245 }
1246 tipc_node_unlock(n_ptr);
1247
1248 if (tipc_link_input(net, l_ptr, skb) != 0)
1249 goto discard;
1250 continue;
1251unlock_discard:
1252 tipc_node_unlock(n_ptr); 1237 tipc_node_unlock(n_ptr);
1253discard: 1238discard:
1254 kfree_skb(skb); 1239 if (unlikely(skb))
1240 kfree_skb(skb);
1255 } 1241 }
1256} 1242}
1257 1243
1258/** 1244/* tipc_data_input - deliver data and name distr msgs to upper layer
1259 * tipc_link_prepare_input - process TIPC link messages
1260 *
1261 * returns nonzero if the message was consumed
1262 * 1245 *
1246 * Consumes buffer if message is of right type
1263 * Node lock must be held 1247 * Node lock must be held
1264 */ 1248 */
1265static int tipc_link_prepare_input(struct net *net, struct tipc_link *l, 1249static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1266 struct sk_buff **buf)
1267{ 1250{
1268 struct tipc_node *n; 1251 struct tipc_node *node = link->owner;
1269 struct tipc_msg *msg; 1252 struct tipc_msg *msg = buf_msg(skb);
1270 int res = -EINVAL; 1253 u32 dport = msg_destport(msg);
1271 1254
1272 n = l->owner;
1273 msg = buf_msg(*buf);
1274 switch (msg_user(msg)) { 1255 switch (msg_user(msg)) {
1275 case CHANGEOVER_PROTOCOL: 1256 case TIPC_LOW_IMPORTANCE:
1276 if (tipc_link_tunnel_rcv(n, buf)) 1257 case TIPC_MEDIUM_IMPORTANCE:
1277 res = 0; 1258 case TIPC_HIGH_IMPORTANCE:
1278 break; 1259 case TIPC_CRITICAL_IMPORTANCE:
1279 case MSG_FRAGMENTER: 1260 case CONN_MANAGER:
1280 l->stats.recv_fragments++; 1261 if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
1281 if (tipc_buf_append(&l->reasm_buf, buf)) { 1262 node->inputq = &link->inputq;
1282 l->stats.recv_fragmented++; 1263 node->action_flags |= TIPC_MSG_EVT;
1283 res = 0;
1284 } else if (!l->reasm_buf) {
1285 tipc_link_reset(l);
1286 } 1264 }
1287 break; 1265 return true;
1288 case MSG_BUNDLER:
1289 l->stats.recv_bundles++;
1290 l->stats.recv_bundled += msg_msgcnt(msg);
1291 res = 0;
1292 break;
1293 case NAME_DISTRIBUTOR: 1266 case NAME_DISTRIBUTOR:
1294 n->bclink.recv_permitted = true; 1267 node->bclink.recv_permitted = true;
1295 res = 0; 1268 node->namedq = &link->namedq;
1296 break; 1269 skb_queue_tail(&link->namedq, skb);
1270 if (skb_queue_len(&link->namedq) == 1)
1271 node->action_flags |= TIPC_NAMED_MSG_EVT;
1272 return true;
1273 case MSG_BUNDLER:
1274 case CHANGEOVER_PROTOCOL:
1275 case MSG_FRAGMENTER:
1297 case BCAST_PROTOCOL: 1276 case BCAST_PROTOCOL:
1298 tipc_link_sync_rcv(n, *buf); 1277 return false;
1299 break;
1300 default: 1278 default:
1301 res = 0; 1279 pr_warn("Dropping received illegal msg type\n");
1302 } 1280 kfree_skb(skb);
1303 return res; 1281 return false;
1282 };
1304} 1283}
1305/** 1284
1306 * tipc_link_input - Deliver message too higher layers 1285/* tipc_link_input - process packet that has passed link protocol check
1286 *
1287 * Consumes buffer
1288 * Node lock must be held
1307 */ 1289 */
1308static int tipc_link_input(struct net *net, struct tipc_link *l, 1290static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1309 struct sk_buff *buf)
1310{ 1291{
1311 struct tipc_msg *msg = buf_msg(buf); 1292 struct tipc_node *node = link->owner;
1312 int res = 0; 1293 struct tipc_msg *msg = buf_msg(skb);
1294 struct sk_buff *iskb;
1295 int pos = 0;
1296
1297 if (likely(tipc_data_input(link, skb)))
1298 return;
1313 1299
1314 switch (msg_user(msg)) { 1300 switch (msg_user(msg)) {
1315 case TIPC_LOW_IMPORTANCE: 1301 case CHANGEOVER_PROTOCOL:
1316 case TIPC_MEDIUM_IMPORTANCE: 1302 if (!tipc_link_tunnel_rcv(node, &skb))
1317 case TIPC_HIGH_IMPORTANCE: 1303 break;
1318 case TIPC_CRITICAL_IMPORTANCE: 1304 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1319 case CONN_MANAGER: 1305 tipc_data_input(link, skb);
1320 tipc_sk_rcv(net, buf); 1306 break;
1307 }
1308 case MSG_BUNDLER:
1309 link->stats.recv_bundles++;
1310 link->stats.recv_bundled += msg_msgcnt(msg);
1311
1312 while (tipc_msg_extract(skb, &iskb, &pos))
1313 tipc_data_input(link, iskb);
1321 break; 1314 break;
1322 case NAME_DISTRIBUTOR: 1315 case MSG_FRAGMENTER:
1323 tipc_named_rcv(net, buf); 1316 link->stats.recv_fragments++;
1317 if (tipc_buf_append(&link->reasm_buf, &skb)) {
1318 link->stats.recv_fragmented++;
1319 tipc_data_input(link, skb);
1320 } else if (!link->reasm_buf) {
1321 tipc_link_reset(link);
1322 }
1324 break; 1323 break;
1325 case MSG_BUNDLER: 1324 case BCAST_PROTOCOL:
1326 tipc_link_bundle_rcv(net, buf); 1325 tipc_link_sync_rcv(node, skb);
1327 break; 1326 break;
1328 default: 1327 default:
1329 res = -EINVAL; 1328 break;
1330 } 1329 };
1331 return res;
1332} 1330}
1333 1331
1334/** 1332/**
@@ -1779,7 +1777,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1779 * @from_pos: offset to extract from 1777 * @from_pos: offset to extract from
1780 * 1778 *
1781 * Returns a new message buffer containing an embedded message. The 1779 * Returns a new message buffer containing an embedded message. The
1782 * encapsulating message itself is left unchanged. 1780 * encapsulating buffer is left unchanged.
1783 */ 1781 */
1784static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 1782static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1785{ 1783{
@@ -1793,8 +1791,6 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1793 return eb; 1791 return eb;
1794} 1792}
1795 1793
1796
1797
1798/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1794/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1799 * Owner node is locked. 1795 * Owner node is locked.
1800 */ 1796 */
@@ -1893,41 +1889,6 @@ exit:
1893 return *buf != NULL; 1889 return *buf != NULL;
1894} 1890}
1895 1891
1896/*
1897 * Bundler functionality:
1898 */
1899void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf)
1900{
1901 u32 msgcount = msg_msgcnt(buf_msg(buf));
1902 u32 pos = INT_H_SIZE;
1903 struct sk_buff *obuf;
1904 struct tipc_msg *omsg;
1905
1906 while (msgcount--) {
1907 obuf = buf_extract(buf, pos);
1908 if (obuf == NULL) {
1909 pr_warn("Link unable to unbundle message(s)\n");
1910 break;
1911 }
1912 omsg = buf_msg(obuf);
1913 pos += align(msg_size(omsg));
1914 if (msg_isdata(omsg)) {
1915 if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
1916 tipc_sk_mcast_rcv(net, obuf);
1917 else
1918 tipc_sk_rcv(net, obuf);
1919 } else if (msg_user(omsg) == CONN_MANAGER) {
1920 tipc_sk_rcv(net, obuf);
1921 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1922 tipc_named_rcv(net, obuf);
1923 } else {
1924 pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
1925 kfree_skb(obuf);
1926 }
1927 }
1928 kfree_skb(buf);
1929}
1930
1931static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) 1892static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1932{ 1893{
1933 unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; 1894 unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;