aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c232
1 files changed, 129 insertions, 103 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 18702f58d111..5ed4b4f7452d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -2,7 +2,7 @@
2 * net/tipc/link.c: TIPC link code 2 * net/tipc/link.c: TIPC link code
3 * 3 *
4 * Copyright (c) 1996-2007, Ericsson AB 4 * Copyright (c) 1996-2007, Ericsson AB
5 * Copyright (c) 2004-2007, Wind River Systems 5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -90,9 +90,10 @@ static void link_handle_out_of_seq_msg(struct link *l_ptr,
90static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); 90static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
91static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); 91static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
92static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); 92static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
93static int link_send_sections_long(struct port *sender, 93static int link_send_sections_long(struct tipc_port *sender,
94 struct iovec const *msg_sect, 94 struct iovec const *msg_sect,
95 u32 num_sect, u32 destnode); 95 u32 num_sect, unsigned int total_len,
96 u32 destnode);
96static void link_check_defragm_bufs(struct link *l_ptr); 97static void link_check_defragm_bufs(struct link *l_ptr);
97static void link_state_event(struct link *l_ptr, u32 event); 98static void link_state_event(struct link *l_ptr, u32 event);
98static void link_reset_statistics(struct link *l_ptr); 99static void link_reset_statistics(struct link *l_ptr);
@@ -113,7 +114,7 @@ static void link_init_max_pkt(struct link *l_ptr)
113{ 114{
114 u32 max_pkt; 115 u32 max_pkt;
115 116
116 max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); 117 max_pkt = (l_ptr->b_ptr->mtu & ~3);
117 if (max_pkt > MAX_MSG_SIZE) 118 if (max_pkt > MAX_MSG_SIZE)
118 max_pkt = MAX_MSG_SIZE; 119 max_pkt = MAX_MSG_SIZE;
119 120
@@ -246,9 +247,6 @@ static void link_timeout(struct link *l_ptr)
246 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 247 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
247 l_ptr->stats.queue_sz_counts++; 248 l_ptr->stats.queue_sz_counts++;
248 249
249 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
250 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
251
252 if (l_ptr->first_out) { 250 if (l_ptr->first_out) {
253 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 251 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
254 u32 length = msg_size(msg); 252 u32 length = msg_size(msg);
@@ -296,19 +294,35 @@ static void link_set_timer(struct link *l_ptr, u32 time)
296 294
297/** 295/**
298 * tipc_link_create - create a new link 296 * tipc_link_create - create a new link
297 * @n_ptr: pointer to associated node
299 * @b_ptr: pointer to associated bearer 298 * @b_ptr: pointer to associated bearer
300 * @peer: network address of node at other end of link
301 * @media_addr: media address to use when sending messages over link 299 * @media_addr: media address to use when sending messages over link
302 * 300 *
303 * Returns pointer to link. 301 * Returns pointer to link.
304 */ 302 */
305 303
306struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, 304struct link *tipc_link_create(struct tipc_node *n_ptr,
305 struct tipc_bearer *b_ptr,
307 const struct tipc_media_addr *media_addr) 306 const struct tipc_media_addr *media_addr)
308{ 307{
309 struct link *l_ptr; 308 struct link *l_ptr;
310 struct tipc_msg *msg; 309 struct tipc_msg *msg;
311 char *if_name; 310 char *if_name;
311 char addr_string[16];
312 u32 peer = n_ptr->addr;
313
314 if (n_ptr->link_cnt >= 2) {
315 tipc_addr_string_fill(addr_string, n_ptr->addr);
316 err("Attempt to establish third link to %s\n", addr_string);
317 return NULL;
318 }
319
320 if (n_ptr->links[b_ptr->identity]) {
321 tipc_addr_string_fill(addr_string, n_ptr->addr);
322 err("Attempt to establish second link on <%s> to %s\n",
323 b_ptr->name, addr_string);
324 return NULL;
325 }
312 326
313 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); 327 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
314 if (!l_ptr) { 328 if (!l_ptr) {
@@ -317,7 +331,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
317 } 331 }
318 332
319 l_ptr->addr = peer; 333 l_ptr->addr = peer;
320 if_name = strchr(b_ptr->publ.name, ':') + 1; 334 if_name = strchr(b_ptr->name, ':') + 1;
321 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", 335 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
322 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 336 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
323 tipc_node(tipc_own_addr), 337 tipc_node(tipc_own_addr),
@@ -325,6 +339,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
325 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 339 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
326 /* note: peer i/f is appended to link name by reset/activate */ 340 /* note: peer i/f is appended to link name by reset/activate */
327 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); 341 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
342 l_ptr->owner = n_ptr;
328 l_ptr->checkpoint = 1; 343 l_ptr->checkpoint = 1;
329 l_ptr->b_ptr = b_ptr; 344 l_ptr->b_ptr = b_ptr;
330 link_set_supervision_props(l_ptr, b_ptr->media->tolerance); 345 link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
@@ -348,11 +363,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
348 363
349 link_reset_statistics(l_ptr); 364 link_reset_statistics(l_ptr);
350 365
351 l_ptr->owner = tipc_node_attach_link(l_ptr); 366 tipc_node_attach_link(n_ptr, l_ptr);
352 if (!l_ptr->owner) {
353 kfree(l_ptr);
354 return NULL;
355 }
356 367
357 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); 368 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
358 list_add_tail(&l_ptr->link_list, &b_ptr->links); 369 list_add_tail(&l_ptr->link_list, &b_ptr->links);
@@ -391,7 +402,9 @@ void tipc_link_delete(struct link *l_ptr)
391 402
392static void link_start(struct link *l_ptr) 403static void link_start(struct link *l_ptr)
393{ 404{
405 tipc_node_lock(l_ptr->owner);
394 link_state_event(l_ptr, STARTING_EVT); 406 link_state_event(l_ptr, STARTING_EVT);
407 tipc_node_unlock(l_ptr->owner);
395} 408}
396 409
397/** 410/**
@@ -406,7 +419,7 @@ static void link_start(struct link *l_ptr)
406 419
407static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) 420static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
408{ 421{
409 struct port *p_ptr; 422 struct tipc_port *p_ptr;
410 423
411 spin_lock_bh(&tipc_port_list_lock); 424 spin_lock_bh(&tipc_port_list_lock);
412 p_ptr = tipc_port_lock(origport); 425 p_ptr = tipc_port_lock(origport);
@@ -415,7 +428,7 @@ static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
415 goto exit; 428 goto exit;
416 if (!list_empty(&p_ptr->wait_list)) 429 if (!list_empty(&p_ptr->wait_list))
417 goto exit; 430 goto exit;
418 p_ptr->publ.congested = 1; 431 p_ptr->congested = 1;
419 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 432 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
420 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 433 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
421 l_ptr->stats.link_congs++; 434 l_ptr->stats.link_congs++;
@@ -428,8 +441,8 @@ exit:
428 441
429void tipc_link_wakeup_ports(struct link *l_ptr, int all) 442void tipc_link_wakeup_ports(struct link *l_ptr, int all)
430{ 443{
431 struct port *p_ptr; 444 struct tipc_port *p_ptr;
432 struct port *temp_p_ptr; 445 struct tipc_port *temp_p_ptr;
433 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 446 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
434 447
435 if (all) 448 if (all)
@@ -445,11 +458,11 @@ void tipc_link_wakeup_ports(struct link *l_ptr, int all)
445 if (win <= 0) 458 if (win <= 0)
446 break; 459 break;
447 list_del_init(&p_ptr->wait_list); 460 list_del_init(&p_ptr->wait_list);
448 spin_lock_bh(p_ptr->publ.lock); 461 spin_lock_bh(p_ptr->lock);
449 p_ptr->publ.congested = 0; 462 p_ptr->congested = 0;
450 p_ptr->wakeup(&p_ptr->publ); 463 p_ptr->wakeup(p_ptr);
451 win -= p_ptr->waiting_pkts; 464 win -= p_ptr->waiting_pkts;
452 spin_unlock_bh(p_ptr->publ.lock); 465 spin_unlock_bh(p_ptr->lock);
453 } 466 }
454 467
455exit: 468exit:
@@ -549,7 +562,7 @@ void tipc_link_reset(struct link *l_ptr)
549 tipc_node_link_down(l_ptr->owner, l_ptr); 562 tipc_node_link_down(l_ptr->owner, l_ptr);
550 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); 563 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
551 564
552 if (was_active_link && tipc_node_has_active_links(l_ptr->owner) && 565 if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
553 l_ptr->owner->permit_changeover) { 566 l_ptr->owner->permit_changeover) {
554 l_ptr->reset_checkpoint = checkpoint; 567 l_ptr->reset_checkpoint = checkpoint;
555 l_ptr->exp_msg_count = START_CHANGEOVER; 568 l_ptr->exp_msg_count = START_CHANGEOVER;
@@ -824,7 +837,29 @@ static void link_add_to_outqueue(struct link *l_ptr,
824 l_ptr->last_out = buf; 837 l_ptr->last_out = buf;
825 } else 838 } else
826 l_ptr->first_out = l_ptr->last_out = buf; 839 l_ptr->first_out = l_ptr->last_out = buf;
840
827 l_ptr->out_queue_size++; 841 l_ptr->out_queue_size++;
842 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
843 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
844}
845
846static void link_add_chain_to_outqueue(struct link *l_ptr,
847 struct sk_buff *buf_chain,
848 u32 long_msgno)
849{
850 struct sk_buff *buf;
851 struct tipc_msg *msg;
852
853 if (!l_ptr->next_out)
854 l_ptr->next_out = buf_chain;
855 while (buf_chain) {
856 buf = buf_chain;
857 buf_chain = buf_chain->next;
858
859 msg = buf_msg(buf);
860 msg_set_long_msgno(msg, long_msgno);
861 link_add_to_outqueue(l_ptr, buf, msg);
862 }
828} 863}
829 864
830/* 865/*
@@ -849,8 +884,9 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
849 884
850 if (unlikely(queue_size >= queue_limit)) { 885 if (unlikely(queue_size >= queue_limit)) {
851 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 886 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
852 return link_schedule_port(l_ptr, msg_origport(msg), 887 link_schedule_port(l_ptr, msg_origport(msg), size);
853 size); 888 buf_discard(buf);
889 return -ELINKCONG;
854 } 890 }
855 buf_discard(buf); 891 buf_discard(buf);
856 if (imp > CONN_MANAGER) { 892 if (imp > CONN_MANAGER) {
@@ -867,9 +903,6 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
867 903
868 /* Packet can be queued or sent: */ 904 /* Packet can be queued or sent: */
869 905
870 if (queue_size > l_ptr->stats.max_queue_sz)
871 l_ptr->stats.max_queue_sz = queue_size;
872
873 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 906 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
874 !link_congested(l_ptr))) { 907 !link_congested(l_ptr))) {
875 link_add_to_outqueue(l_ptr, buf, msg); 908 link_add_to_outqueue(l_ptr, buf, msg);
@@ -1027,12 +1060,13 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1027 * except for total message length. 1060 * except for total message length.
1028 * Returns user data length or errno. 1061 * Returns user data length or errno.
1029 */ 1062 */
1030int tipc_link_send_sections_fast(struct port *sender, 1063int tipc_link_send_sections_fast(struct tipc_port *sender,
1031 struct iovec const *msg_sect, 1064 struct iovec const *msg_sect,
1032 const u32 num_sect, 1065 const u32 num_sect,
1066 unsigned int total_len,
1033 u32 destaddr) 1067 u32 destaddr)
1034{ 1068{
1035 struct tipc_msg *hdr = &sender->publ.phdr; 1069 struct tipc_msg *hdr = &sender->phdr;
1036 struct link *l_ptr; 1070 struct link *l_ptr;
1037 struct sk_buff *buf; 1071 struct sk_buff *buf;
1038 struct tipc_node *node; 1072 struct tipc_node *node;
@@ -1045,8 +1079,8 @@ again:
1045 * (Must not hold any locks while building message.) 1079 * (Must not hold any locks while building message.)
1046 */ 1080 */
1047 1081
1048 res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt, 1082 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
1049 !sender->user_port, &buf); 1083 sender->max_pkt, !sender->user_port, &buf);
1050 1084
1051 read_lock_bh(&tipc_net_lock); 1085 read_lock_bh(&tipc_net_lock);
1052 node = tipc_node_find(destaddr); 1086 node = tipc_node_find(destaddr);
@@ -1056,9 +1090,7 @@ again:
1056 if (likely(l_ptr)) { 1090 if (likely(l_ptr)) {
1057 if (likely(buf)) { 1091 if (likely(buf)) {
1058 res = link_send_buf_fast(l_ptr, buf, 1092 res = link_send_buf_fast(l_ptr, buf,
1059 &sender->publ.max_pkt); 1093 &sender->max_pkt);
1060 if (unlikely(res < 0))
1061 buf_discard(buf);
1062exit: 1094exit:
1063 tipc_node_unlock(node); 1095 tipc_node_unlock(node);
1064 read_unlock_bh(&tipc_net_lock); 1096 read_unlock_bh(&tipc_net_lock);
@@ -1075,7 +1107,7 @@ exit:
1075 if (link_congested(l_ptr) || 1107 if (link_congested(l_ptr) ||
1076 !list_empty(&l_ptr->b_ptr->cong_links)) { 1108 !list_empty(&l_ptr->b_ptr->cong_links)) {
1077 res = link_schedule_port(l_ptr, 1109 res = link_schedule_port(l_ptr,
1078 sender->publ.ref, res); 1110 sender->ref, res);
1079 goto exit; 1111 goto exit;
1080 } 1112 }
1081 1113
@@ -1084,16 +1116,17 @@ exit:
1084 * then re-try fast path or fragment the message 1116 * then re-try fast path or fragment the message
1085 */ 1117 */
1086 1118
1087 sender->publ.max_pkt = l_ptr->max_pkt; 1119 sender->max_pkt = l_ptr->max_pkt;
1088 tipc_node_unlock(node); 1120 tipc_node_unlock(node);
1089 read_unlock_bh(&tipc_net_lock); 1121 read_unlock_bh(&tipc_net_lock);
1090 1122
1091 1123
1092 if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt) 1124 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1093 goto again; 1125 goto again;
1094 1126
1095 return link_send_sections_long(sender, msg_sect, 1127 return link_send_sections_long(sender, msg_sect,
1096 num_sect, destaddr); 1128 num_sect, total_len,
1129 destaddr);
1097 } 1130 }
1098 tipc_node_unlock(node); 1131 tipc_node_unlock(node);
1099 } 1132 }
@@ -1105,7 +1138,7 @@ exit:
1105 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1138 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1106 if (res >= 0) 1139 if (res >= 0)
1107 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1140 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1108 TIPC_ERR_NO_NODE); 1141 total_len, TIPC_ERR_NO_NODE);
1109 return res; 1142 return res;
1110} 1143}
1111 1144
@@ -1123,15 +1156,16 @@ exit:
1123 * 1156 *
1124 * Returns user data length or errno. 1157 * Returns user data length or errno.
1125 */ 1158 */
1126static int link_send_sections_long(struct port *sender, 1159static int link_send_sections_long(struct tipc_port *sender,
1127 struct iovec const *msg_sect, 1160 struct iovec const *msg_sect,
1128 u32 num_sect, 1161 u32 num_sect,
1162 unsigned int total_len,
1129 u32 destaddr) 1163 u32 destaddr)
1130{ 1164{
1131 struct link *l_ptr; 1165 struct link *l_ptr;
1132 struct tipc_node *node; 1166 struct tipc_node *node;
1133 struct tipc_msg *hdr = &sender->publ.phdr; 1167 struct tipc_msg *hdr = &sender->phdr;
1134 u32 dsz = msg_data_sz(hdr); 1168 u32 dsz = total_len;
1135 u32 max_pkt, fragm_sz, rest; 1169 u32 max_pkt, fragm_sz, rest;
1136 struct tipc_msg fragm_hdr; 1170 struct tipc_msg fragm_hdr;
1137 struct sk_buff *buf, *buf_chain, *prev; 1171 struct sk_buff *buf, *buf_chain, *prev;
@@ -1142,7 +1176,7 @@ static int link_send_sections_long(struct port *sender,
1142 1176
1143again: 1177again:
1144 fragm_no = 1; 1178 fragm_no = 1;
1145 max_pkt = sender->publ.max_pkt - INT_H_SIZE; 1179 max_pkt = sender->max_pkt - INT_H_SIZE;
1146 /* leave room for tunnel header in case of link changeover */ 1180 /* leave room for tunnel header in case of link changeover */
1147 fragm_sz = max_pkt - INT_H_SIZE; 1181 fragm_sz = max_pkt - INT_H_SIZE;
1148 /* leave room for fragmentation header in each fragment */ 1182 /* leave room for fragmentation header in each fragment */
@@ -1157,7 +1191,6 @@ again:
1157 1191
1158 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 1192 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1159 INT_H_SIZE, msg_destnode(hdr)); 1193 INT_H_SIZE, msg_destnode(hdr));
1160 msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1161 msg_set_size(&fragm_hdr, max_pkt); 1194 msg_set_size(&fragm_hdr, max_pkt);
1162 msg_set_fragm_no(&fragm_hdr, 1); 1195 msg_set_fragm_no(&fragm_hdr, 1);
1163 1196
@@ -1238,13 +1271,13 @@ error:
1238 node = tipc_node_find(destaddr); 1271 node = tipc_node_find(destaddr);
1239 if (likely(node)) { 1272 if (likely(node)) {
1240 tipc_node_lock(node); 1273 tipc_node_lock(node);
1241 l_ptr = node->active_links[sender->publ.ref & 1]; 1274 l_ptr = node->active_links[sender->ref & 1];
1242 if (!l_ptr) { 1275 if (!l_ptr) {
1243 tipc_node_unlock(node); 1276 tipc_node_unlock(node);
1244 goto reject; 1277 goto reject;
1245 } 1278 }
1246 if (l_ptr->max_pkt < max_pkt) { 1279 if (l_ptr->max_pkt < max_pkt) {
1247 sender->publ.max_pkt = l_ptr->max_pkt; 1280 sender->max_pkt = l_ptr->max_pkt;
1248 tipc_node_unlock(node); 1281 tipc_node_unlock(node);
1249 for (; buf_chain; buf_chain = buf) { 1282 for (; buf_chain; buf_chain = buf) {
1250 buf = buf_chain->next; 1283 buf = buf_chain->next;
@@ -1259,28 +1292,15 @@ reject:
1259 buf_discard(buf_chain); 1292 buf_discard(buf_chain);
1260 } 1293 }
1261 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1294 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1262 TIPC_ERR_NO_NODE); 1295 total_len, TIPC_ERR_NO_NODE);
1263 } 1296 }
1264 1297
1265 /* Append whole chain to send queue: */ 1298 /* Append chain of fragments to send queue & send them */
1266 1299
1267 buf = buf_chain; 1300 l_ptr->long_msg_seq_no++;
1268 l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); 1301 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1269 if (!l_ptr->next_out) 1302 l_ptr->stats.sent_fragments += fragm_no;
1270 l_ptr->next_out = buf_chain;
1271 l_ptr->stats.sent_fragmented++; 1303 l_ptr->stats.sent_fragmented++;
1272 while (buf) {
1273 struct sk_buff *next = buf->next;
1274 struct tipc_msg *msg = buf_msg(buf);
1275
1276 l_ptr->stats.sent_fragments++;
1277 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1278 link_add_to_outqueue(l_ptr, buf, msg);
1279 buf = next;
1280 }
1281
1282 /* Send it, if possible: */
1283
1284 tipc_link_push_queue(l_ptr); 1304 tipc_link_push_queue(l_ptr);
1285 tipc_node_unlock(node); 1305 tipc_node_unlock(node);
1286 return dsz; 1306 return dsz;
@@ -1441,7 +1461,7 @@ static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1441 info("Outstanding acks: %lu\n", 1461 info("Outstanding acks: %lu\n",
1442 (unsigned long) TIPC_SKB_CB(buf)->handle); 1462 (unsigned long) TIPC_SKB_CB(buf)->handle);
1443 1463
1444 n_ptr = l_ptr->owner->next; 1464 n_ptr = tipc_bclink_retransmit_to();
1445 tipc_node_lock(n_ptr); 1465 tipc_node_lock(n_ptr);
1446 1466
1447 tipc_addr_string_fill(addr_string, n_ptr->addr); 1467 tipc_addr_string_fill(addr_string, n_ptr->addr);
@@ -1595,11 +1615,10 @@ static int link_recv_buf_validate(struct sk_buff *buf)
1595 * structure (i.e. cannot be NULL), but bearer can be inactive. 1615 * structure (i.e. cannot be NULL), but bearer can be inactive.
1596 */ 1616 */
1597 1617
1598void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr) 1618void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1599{ 1619{
1600 read_lock_bh(&tipc_net_lock); 1620 read_lock_bh(&tipc_net_lock);
1601 while (head) { 1621 while (head) {
1602 struct bearer *b_ptr = (struct bearer *)tb_ptr;
1603 struct tipc_node *n_ptr; 1622 struct tipc_node *n_ptr;
1604 struct link *l_ptr; 1623 struct link *l_ptr;
1605 struct sk_buff *crs; 1624 struct sk_buff *crs;
@@ -1735,10 +1754,6 @@ deliver:
1735 tipc_node_unlock(n_ptr); 1754 tipc_node_unlock(n_ptr);
1736 tipc_link_recv_bundle(buf); 1755 tipc_link_recv_bundle(buf);
1737 continue; 1756 continue;
1738 case ROUTE_DISTRIBUTOR:
1739 tipc_node_unlock(n_ptr);
1740 buf_discard(buf);
1741 continue;
1742 case NAME_DISTRIBUTOR: 1757 case NAME_DISTRIBUTOR:
1743 tipc_node_unlock(n_ptr); 1758 tipc_node_unlock(n_ptr);
1744 tipc_named_recv(buf); 1759 tipc_named_recv(buf);
@@ -1765,6 +1780,10 @@ deliver:
1765 goto protocol_check; 1780 goto protocol_check;
1766 } 1781 }
1767 break; 1782 break;
1783 default:
1784 buf_discard(buf);
1785 buf = NULL;
1786 break;
1768 } 1787 }
1769 } 1788 }
1770 tipc_node_unlock(n_ptr); 1789 tipc_node_unlock(n_ptr);
@@ -1900,6 +1919,7 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
1900 struct sk_buff *buf = NULL; 1919 struct sk_buff *buf = NULL;
1901 struct tipc_msg *msg = l_ptr->pmsg; 1920 struct tipc_msg *msg = l_ptr->pmsg;
1902 u32 msg_size = sizeof(l_ptr->proto_msg); 1921 u32 msg_size = sizeof(l_ptr->proto_msg);
1922 int r_flag;
1903 1923
1904 if (link_blocked(l_ptr)) 1924 if (link_blocked(l_ptr))
1905 return; 1925 return;
@@ -1950,15 +1970,14 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
1950 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); 1970 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1951 msg_set_seq_gap(msg, 0); 1971 msg_set_seq_gap(msg, 0);
1952 msg_set_next_sent(msg, 1); 1972 msg_set_next_sent(msg, 1);
1973 msg_set_probe(msg, 0);
1953 msg_set_link_tolerance(msg, l_ptr->tolerance); 1974 msg_set_link_tolerance(msg, l_ptr->tolerance);
1954 msg_set_linkprio(msg, l_ptr->priority); 1975 msg_set_linkprio(msg, l_ptr->priority);
1955 msg_set_max_pkt(msg, l_ptr->max_pkt_target); 1976 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1956 } 1977 }
1957 1978
1958 if (tipc_node_has_redundant_links(l_ptr->owner)) 1979 r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1959 msg_set_redundant_link(msg); 1980 msg_set_redundant_link(msg, r_flag);
1960 else
1961 msg_clear_redundant_link(msg);
1962 msg_set_linkprio(msg, l_ptr->priority); 1981 msg_set_linkprio(msg, l_ptr->priority);
1963 1982
1964 /* Ensure sequence number will not fit : */ 1983 /* Ensure sequence number will not fit : */
@@ -1978,7 +1997,6 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
1978 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1997 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1979 return; 1998 return;
1980 } 1999 }
1981 msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
1982 2000
1983 /* Message can be sent */ 2001 /* Message can be sent */
1984 2002
@@ -2066,7 +2084,7 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2066 l_ptr->peer_bearer_id = msg_bearer_id(msg); 2084 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2067 2085
2068 /* Synchronize broadcast sequence numbers */ 2086 /* Synchronize broadcast sequence numbers */
2069 if (!tipc_node_has_redundant_links(l_ptr->owner)) 2087 if (!tipc_node_redundant_links(l_ptr->owner))
2070 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); 2088 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2071 break; 2089 break;
2072 case STATE_MSG: 2090 case STATE_MSG:
@@ -2397,6 +2415,8 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
2397 */ 2415 */
2398static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) 2416static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2399{ 2417{
2418 struct sk_buff *buf_chain = NULL;
2419 struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2400 struct tipc_msg *inmsg = buf_msg(buf); 2420 struct tipc_msg *inmsg = buf_msg(buf);
2401 struct tipc_msg fragm_hdr; 2421 struct tipc_msg fragm_hdr;
2402 u32 insize = msg_size(inmsg); 2422 u32 insize = msg_size(inmsg);
@@ -2405,7 +2425,7 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2405 u32 rest = insize; 2425 u32 rest = insize;
2406 u32 pack_sz = l_ptr->max_pkt; 2426 u32 pack_sz = l_ptr->max_pkt;
2407 u32 fragm_sz = pack_sz - INT_H_SIZE; 2427 u32 fragm_sz = pack_sz - INT_H_SIZE;
2408 u32 fragm_no = 1; 2428 u32 fragm_no = 0;
2409 u32 destaddr; 2429 u32 destaddr;
2410 2430
2411 if (msg_short(inmsg)) 2431 if (msg_short(inmsg))
@@ -2413,17 +2433,10 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2413 else 2433 else
2414 destaddr = msg_destnode(inmsg); 2434 destaddr = msg_destnode(inmsg);
2415 2435
2416 if (msg_routed(inmsg))
2417 msg_set_prevnode(inmsg, tipc_own_addr);
2418
2419 /* Prepare reusable fragment header: */ 2436 /* Prepare reusable fragment header: */
2420 2437
2421 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 2438 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2422 INT_H_SIZE, destaddr); 2439 INT_H_SIZE, destaddr);
2423 msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2424 msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2425 msg_set_fragm_no(&fragm_hdr, fragm_no);
2426 l_ptr->stats.sent_fragmented++;
2427 2440
2428 /* Chop up message: */ 2441 /* Chop up message: */
2429 2442
@@ -2436,27 +2449,37 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2436 } 2449 }
2437 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 2450 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2438 if (fragm == NULL) { 2451 if (fragm == NULL) {
2439 warn("Link unable to fragment message\n"); 2452 buf_discard(buf);
2440 dsz = -ENOMEM; 2453 while (buf_chain) {
2441 goto exit; 2454 buf = buf_chain;
2455 buf_chain = buf_chain->next;
2456 buf_discard(buf);
2457 }
2458 return -ENOMEM;
2442 } 2459 }
2443 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2460 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2461 fragm_no++;
2462 msg_set_fragm_no(&fragm_hdr, fragm_no);
2444 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); 2463 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2445 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, 2464 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2446 fragm_sz); 2465 fragm_sz);
2447 /* Send queued messages first, if any: */ 2466 buf_chain_tail->next = fragm;
2467 buf_chain_tail = fragm;
2448 2468
2449 l_ptr->stats.sent_fragments++;
2450 tipc_link_send_buf(l_ptr, fragm);
2451 if (!tipc_link_is_up(l_ptr))
2452 return dsz;
2453 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2454 rest -= fragm_sz; 2469 rest -= fragm_sz;
2455 crs += fragm_sz; 2470 crs += fragm_sz;
2456 msg_set_type(&fragm_hdr, FRAGMENT); 2471 msg_set_type(&fragm_hdr, FRAGMENT);
2457 } 2472 }
2458exit:
2459 buf_discard(buf); 2473 buf_discard(buf);
2474
2475 /* Append chain of fragments to send queue & send them */
2476
2477 l_ptr->long_msg_seq_no++;
2478 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2479 l_ptr->stats.sent_fragments += fragm_no;
2480 l_ptr->stats.sent_fragmented++;
2481 tipc_link_push_queue(l_ptr);
2482
2460 return dsz; 2483 return dsz;
2461} 2484}
2462 2485
@@ -2464,7 +2487,7 @@ exit:
2464 * A pending message being re-assembled must store certain values 2487 * A pending message being re-assembled must store certain values
2465 * to handle subsequent fragments correctly. The following functions 2488 * to handle subsequent fragments correctly. The following functions
2466 * help storing these values in unused, available fields in the 2489 * help storing these values in unused, available fields in the
2467 * pending message. This makes dynamic memory allocation unecessary. 2490 * pending message. This makes dynamic memory allocation unnecessary.
2468 */ 2491 */
2469 2492
2470static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno) 2493static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
@@ -2618,6 +2641,9 @@ static void link_check_defragm_bufs(struct link *l_ptr)
2618 2641
2619static void link_set_supervision_props(struct link *l_ptr, u32 tolerance) 2642static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2620{ 2643{
2644 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
2645 return;
2646
2621 l_ptr->tolerance = tolerance; 2647 l_ptr->tolerance = tolerance;
2622 l_ptr->continuity_interval = 2648 l_ptr->continuity_interval =
2623 ((tolerance / 4) > 500) ? 500 : tolerance / 4; 2649 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
@@ -2658,7 +2684,7 @@ void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2658static struct link *link_find_link(const char *name, struct tipc_node **node) 2684static struct link *link_find_link(const char *name, struct tipc_node **node)
2659{ 2685{
2660 struct link_name link_name_parts; 2686 struct link_name link_name_parts;
2661 struct bearer *b_ptr; 2687 struct tipc_bearer *b_ptr;
2662 struct link *l_ptr; 2688 struct link *l_ptr;
2663 2689
2664 if (!link_name_validate(name, &link_name_parts)) 2690 if (!link_name_validate(name, &link_name_parts))
@@ -2961,7 +2987,7 @@ static void link_print(struct link *l_ptr, const char *str)
2961 2987
2962 tipc_printf(buf, str); 2988 tipc_printf(buf, str);
2963 tipc_printf(buf, "Link %x<%s>:", 2989 tipc_printf(buf, "Link %x<%s>:",
2964 l_ptr->addr, l_ptr->b_ptr->publ.name); 2990 l_ptr->addr, l_ptr->b_ptr->name);
2965 2991
2966#ifdef CONFIG_TIPC_DEBUG 2992#ifdef CONFIG_TIPC_DEBUG
2967 if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr)) 2993 if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
@@ -2981,9 +3007,9 @@ static void link_print(struct link *l_ptr, const char *str)
2981 != (l_ptr->out_queue_size - 1)) || 3007 != (l_ptr->out_queue_size - 1)) ||
2982 (l_ptr->last_out->next != NULL)) { 3008 (l_ptr->last_out->next != NULL)) {
2983 tipc_printf(buf, "\nSend queue inconsistency\n"); 3009 tipc_printf(buf, "\nSend queue inconsistency\n");
2984 tipc_printf(buf, "first_out= %x ", l_ptr->first_out); 3010 tipc_printf(buf, "first_out= %p ", l_ptr->first_out);
2985 tipc_printf(buf, "next_out= %x ", l_ptr->next_out); 3011 tipc_printf(buf, "next_out= %p ", l_ptr->next_out);
2986 tipc_printf(buf, "last_out= %x ", l_ptr->last_out); 3012 tipc_printf(buf, "last_out= %p ", l_ptr->last_out);
2987 } 3013 }
2988 } else 3014 } else
2989 tipc_printf(buf, "[]"); 3015 tipc_printf(buf, "[]");