aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c104
1 files changed, 60 insertions, 44 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ebf338f7b14e..5ed4b4f7452d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -92,7 +92,8 @@ static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
92static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); 92static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
93static int link_send_sections_long(struct tipc_port *sender, 93static int link_send_sections_long(struct tipc_port *sender,
94 struct iovec const *msg_sect, 94 struct iovec const *msg_sect,
95 u32 num_sect, u32 destnode); 95 u32 num_sect, unsigned int total_len,
96 u32 destnode);
96static void link_check_defragm_bufs(struct link *l_ptr); 97static void link_check_defragm_bufs(struct link *l_ptr);
97static void link_state_event(struct link *l_ptr, u32 event); 98static void link_state_event(struct link *l_ptr, u32 event);
98static void link_reset_statistics(struct link *l_ptr); 99static void link_reset_statistics(struct link *l_ptr);
@@ -842,6 +843,25 @@ static void link_add_to_outqueue(struct link *l_ptr,
842 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; 843 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
843} 844}
844 845
846static void link_add_chain_to_outqueue(struct link *l_ptr,
847 struct sk_buff *buf_chain,
848 u32 long_msgno)
849{
850 struct sk_buff *buf;
851 struct tipc_msg *msg;
852
853 if (!l_ptr->next_out)
854 l_ptr->next_out = buf_chain;
855 while (buf_chain) {
856 buf = buf_chain;
857 buf_chain = buf_chain->next;
858
859 msg = buf_msg(buf);
860 msg_set_long_msgno(msg, long_msgno);
861 link_add_to_outqueue(l_ptr, buf, msg);
862 }
863}
864
845/* 865/*
846 * tipc_link_send_buf() is the 'full path' for messages, called from 866 * tipc_link_send_buf() is the 'full path' for messages, called from
847 * inside TIPC when the 'fast path' in tipc_send_buf 867 * inside TIPC when the 'fast path' in tipc_send_buf
@@ -864,8 +884,9 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
864 884
865 if (unlikely(queue_size >= queue_limit)) { 885 if (unlikely(queue_size >= queue_limit)) {
866 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 886 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
867 return link_schedule_port(l_ptr, msg_origport(msg), 887 link_schedule_port(l_ptr, msg_origport(msg), size);
868 size); 888 buf_discard(buf);
889 return -ELINKCONG;
869 } 890 }
870 buf_discard(buf); 891 buf_discard(buf);
871 if (imp > CONN_MANAGER) { 892 if (imp > CONN_MANAGER) {
@@ -1042,6 +1063,7 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1042int tipc_link_send_sections_fast(struct tipc_port *sender, 1063int tipc_link_send_sections_fast(struct tipc_port *sender,
1043 struct iovec const *msg_sect, 1064 struct iovec const *msg_sect,
1044 const u32 num_sect, 1065 const u32 num_sect,
1066 unsigned int total_len,
1045 u32 destaddr) 1067 u32 destaddr)
1046{ 1068{
1047 struct tipc_msg *hdr = &sender->phdr; 1069 struct tipc_msg *hdr = &sender->phdr;
@@ -1057,8 +1079,8 @@ again:
1057 * (Must not hold any locks while building message.) 1079 * (Must not hold any locks while building message.)
1058 */ 1080 */
1059 1081
1060 res = tipc_msg_build(hdr, msg_sect, num_sect, sender->max_pkt, 1082 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
1061 !sender->user_port, &buf); 1083 sender->max_pkt, !sender->user_port, &buf);
1062 1084
1063 read_lock_bh(&tipc_net_lock); 1085 read_lock_bh(&tipc_net_lock);
1064 node = tipc_node_find(destaddr); 1086 node = tipc_node_find(destaddr);
@@ -1069,8 +1091,6 @@ again:
1069 if (likely(buf)) { 1091 if (likely(buf)) {
1070 res = link_send_buf_fast(l_ptr, buf, 1092 res = link_send_buf_fast(l_ptr, buf,
1071 &sender->max_pkt); 1093 &sender->max_pkt);
1072 if (unlikely(res < 0))
1073 buf_discard(buf);
1074exit: 1094exit:
1075 tipc_node_unlock(node); 1095 tipc_node_unlock(node);
1076 read_unlock_bh(&tipc_net_lock); 1096 read_unlock_bh(&tipc_net_lock);
@@ -1105,7 +1125,8 @@ exit:
1105 goto again; 1125 goto again;
1106 1126
1107 return link_send_sections_long(sender, msg_sect, 1127 return link_send_sections_long(sender, msg_sect,
1108 num_sect, destaddr); 1128 num_sect, total_len,
1129 destaddr);
1109 } 1130 }
1110 tipc_node_unlock(node); 1131 tipc_node_unlock(node);
1111 } 1132 }
@@ -1117,7 +1138,7 @@ exit:
1117 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1138 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1118 if (res >= 0) 1139 if (res >= 0)
1119 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1140 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1120 TIPC_ERR_NO_NODE); 1141 total_len, TIPC_ERR_NO_NODE);
1121 return res; 1142 return res;
1122} 1143}
1123 1144
@@ -1138,12 +1159,13 @@ exit:
1138static int link_send_sections_long(struct tipc_port *sender, 1159static int link_send_sections_long(struct tipc_port *sender,
1139 struct iovec const *msg_sect, 1160 struct iovec const *msg_sect,
1140 u32 num_sect, 1161 u32 num_sect,
1162 unsigned int total_len,
1141 u32 destaddr) 1163 u32 destaddr)
1142{ 1164{
1143 struct link *l_ptr; 1165 struct link *l_ptr;
1144 struct tipc_node *node; 1166 struct tipc_node *node;
1145 struct tipc_msg *hdr = &sender->phdr; 1167 struct tipc_msg *hdr = &sender->phdr;
1146 u32 dsz = msg_data_sz(hdr); 1168 u32 dsz = total_len;
1147 u32 max_pkt, fragm_sz, rest; 1169 u32 max_pkt, fragm_sz, rest;
1148 struct tipc_msg fragm_hdr; 1170 struct tipc_msg fragm_hdr;
1149 struct sk_buff *buf, *buf_chain, *prev; 1171 struct sk_buff *buf, *buf_chain, *prev;
@@ -1169,7 +1191,6 @@ again:
1169 1191
1170 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 1192 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1171 INT_H_SIZE, msg_destnode(hdr)); 1193 INT_H_SIZE, msg_destnode(hdr));
1172 msg_set_link_selector(&fragm_hdr, sender->ref);
1173 msg_set_size(&fragm_hdr, max_pkt); 1194 msg_set_size(&fragm_hdr, max_pkt);
1174 msg_set_fragm_no(&fragm_hdr, 1); 1195 msg_set_fragm_no(&fragm_hdr, 1);
1175 1196
@@ -1271,28 +1292,15 @@ reject:
1271 buf_discard(buf_chain); 1292 buf_discard(buf_chain);
1272 } 1293 }
1273 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1294 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1274 TIPC_ERR_NO_NODE); 1295 total_len, TIPC_ERR_NO_NODE);
1275 } 1296 }
1276 1297
1277 /* Append whole chain to send queue: */ 1298 /* Append chain of fragments to send queue & send them */
1278 1299
1279 buf = buf_chain; 1300 l_ptr->long_msg_seq_no++;
1280 l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); 1301 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1281 if (!l_ptr->next_out) 1302 l_ptr->stats.sent_fragments += fragm_no;
1282 l_ptr->next_out = buf_chain;
1283 l_ptr->stats.sent_fragmented++; 1303 l_ptr->stats.sent_fragmented++;
1284 while (buf) {
1285 struct sk_buff *next = buf->next;
1286 struct tipc_msg *msg = buf_msg(buf);
1287
1288 l_ptr->stats.sent_fragments++;
1289 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1290 link_add_to_outqueue(l_ptr, buf, msg);
1291 buf = next;
1292 }
1293
1294 /* Send it, if possible: */
1295
1296 tipc_link_push_queue(l_ptr); 1304 tipc_link_push_queue(l_ptr);
1297 tipc_node_unlock(node); 1305 tipc_node_unlock(node);
1298 return dsz; 1306 return dsz;
@@ -2407,6 +2415,8 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
2407 */ 2415 */
2408static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) 2416static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2409{ 2417{
2418 struct sk_buff *buf_chain = NULL;
2419 struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2410 struct tipc_msg *inmsg = buf_msg(buf); 2420 struct tipc_msg *inmsg = buf_msg(buf);
2411 struct tipc_msg fragm_hdr; 2421 struct tipc_msg fragm_hdr;
2412 u32 insize = msg_size(inmsg); 2422 u32 insize = msg_size(inmsg);
@@ -2415,7 +2425,7 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2415 u32 rest = insize; 2425 u32 rest = insize;
2416 u32 pack_sz = l_ptr->max_pkt; 2426 u32 pack_sz = l_ptr->max_pkt;
2417 u32 fragm_sz = pack_sz - INT_H_SIZE; 2427 u32 fragm_sz = pack_sz - INT_H_SIZE;
2418 u32 fragm_no = 1; 2428 u32 fragm_no = 0;
2419 u32 destaddr; 2429 u32 destaddr;
2420 2430
2421 if (msg_short(inmsg)) 2431 if (msg_short(inmsg))
@@ -2427,10 +2437,6 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2427 2437
2428 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 2438 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2429 INT_H_SIZE, destaddr); 2439 INT_H_SIZE, destaddr);
2430 msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2431 msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2432 msg_set_fragm_no(&fragm_hdr, fragm_no);
2433 l_ptr->stats.sent_fragmented++;
2434 2440
2435 /* Chop up message: */ 2441 /* Chop up message: */
2436 2442
@@ -2443,27 +2449,37 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2443 } 2449 }
2444 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 2450 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2445 if (fragm == NULL) { 2451 if (fragm == NULL) {
2446 warn("Link unable to fragment message\n"); 2452 buf_discard(buf);
2447 dsz = -ENOMEM; 2453 while (buf_chain) {
2448 goto exit; 2454 buf = buf_chain;
2455 buf_chain = buf_chain->next;
2456 buf_discard(buf);
2457 }
2458 return -ENOMEM;
2449 } 2459 }
2450 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2460 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2461 fragm_no++;
2462 msg_set_fragm_no(&fragm_hdr, fragm_no);
2451 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); 2463 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2452 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, 2464 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2453 fragm_sz); 2465 fragm_sz);
2454 /* Send queued messages first, if any: */ 2466 buf_chain_tail->next = fragm;
2467 buf_chain_tail = fragm;
2455 2468
2456 l_ptr->stats.sent_fragments++;
2457 tipc_link_send_buf(l_ptr, fragm);
2458 if (!tipc_link_is_up(l_ptr))
2459 return dsz;
2460 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2461 rest -= fragm_sz; 2469 rest -= fragm_sz;
2462 crs += fragm_sz; 2470 crs += fragm_sz;
2463 msg_set_type(&fragm_hdr, FRAGMENT); 2471 msg_set_type(&fragm_hdr, FRAGMENT);
2464 } 2472 }
2465exit:
2466 buf_discard(buf); 2473 buf_discard(buf);
2474
2475 /* Append chain of fragments to send queue & send them */
2476
2477 l_ptr->long_msg_seq_no++;
2478 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2479 l_ptr->stats.sent_fragments += fragm_no;
2480 l_ptr->stats.sent_fragmented++;
2481 tipc_link_push_queue(l_ptr);
2482
2467 return dsz; 2483 return dsz;
2468} 2484}
2469 2485