aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c140
-rw-r--r--net/tipc/link.h2
-rw-r--r--net/tipc/msg.c96
-rw-r--r--net/tipc/msg.h25
4 files changed, 250 insertions, 13 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ad2c57f5868d..68d2afb44f2f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -850,6 +850,144 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
850 return res; 850 return res;
851} 851}
852 852
853/* tipc_link_cong: determine return value and how to treat the
854 * sent buffer during link congestion.
855 * - For plain, errorless user data messages we keep the buffer and
856 * return -ELINKONG.
857 * - For all other messages we discard the buffer and return -EHOSTUNREACH
858 * - For TIPC internal messages we also reset the link
859 */
860static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
861{
862 struct tipc_msg *msg = buf_msg(buf);
863 uint psz = msg_size(msg);
864 uint imp = tipc_msg_tot_importance(msg);
865 u32 oport = msg_tot_origport(msg);
866
867 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
868 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
869 link_schedule_port(link, oport, psz);
870 return -ELINKCONG;
871 }
872 } else {
873 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
874 tipc_link_reset(link);
875 }
876 kfree_skb_list(buf);
877 return -EHOSTUNREACH;
878}
879
880/**
881 * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
882 * @link: link to use
883 * @buf: chain of buffers containing message
884 * Consumes the buffer chain, except when returning -ELINKCONG
885 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
886 * user data messages) or -EHOSTUNREACH (all other messages/senders)
887 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
888 * to act on the return value, since they may need to do more send attempts.
889 */
890int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
891{
892 struct tipc_msg *msg = buf_msg(buf);
893 uint psz = msg_size(msg);
894 uint qsz = link->out_queue_size;
895 uint sndlim = link->queue_limit[0];
896 uint imp = tipc_msg_tot_importance(msg);
897 uint mtu = link->max_pkt;
898 uint ack = mod(link->next_in_no - 1);
899 uint seqno = link->next_out_no;
900 uint bc_last_in = link->owner->bclink.last_in;
901 struct tipc_media_addr *addr = &link->media_addr;
902 struct sk_buff *next = buf->next;
903
904 /* Match queue limits against msg importance: */
905 if (unlikely(qsz >= link->queue_limit[imp]))
906 return tipc_link_cong(link, buf);
907
908 /* Has valid packet limit been used ? */
909 if (unlikely(psz > mtu)) {
910 kfree_skb_list(buf);
911 return -EMSGSIZE;
912 }
913
914 /* Prepare each packet for sending, and add to outqueue: */
915 while (buf) {
916 next = buf->next;
917 msg = buf_msg(buf);
918 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
919 msg_set_bcast_ack(msg, bc_last_in);
920
921 if (!link->first_out) {
922 link->first_out = buf;
923 } else if (qsz < sndlim) {
924 link->last_out->next = buf;
925 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
926 link->stats.sent_bundled++;
927 buf = next;
928 next = buf->next;
929 continue;
930 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
931 link->stats.sent_bundled++;
932 link->stats.sent_bundles++;
933 link->last_out->next = buf;
934 if (!link->next_out)
935 link->next_out = buf;
936 } else {
937 link->last_out->next = buf;
938 if (!link->next_out)
939 link->next_out = buf;
940 }
941
942 /* Send packet if possible: */
943 if (likely(++qsz <= sndlim)) {
944 tipc_bearer_send(link->bearer_id, buf, addr);
945 link->next_out = next;
946 link->unacked_window = 0;
947 }
948 seqno++;
949 link->last_out = buf;
950 buf = next;
951 }
952 link->next_out_no = seqno;
953 link->out_queue_size = qsz;
954 return 0;
955}
956
957/**
958 * tipc_link_xmit2() is the general link level function for message sending
959 * @buf: chain of buffers containing message
960 * @dsz: amount of user data to be sent
961 * @dnode: address of destination node
962 * @selector: a number used for deterministic link selection
963 * Consumes the buffer chain, except when returning -ELINKCONG
964 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
965 */
966int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
967{
968 struct tipc_link *link = NULL;
969 struct tipc_node *node;
970 int rc = -EHOSTUNREACH;
971
972 node = tipc_node_find(dnode);
973 if (node) {
974 tipc_node_lock(node);
975 link = node->active_links[selector & 1];
976 if (link)
977 rc = __tipc_link_xmit2(link, buf);
978 tipc_node_unlock(node);
979 }
980
981 if (link)
982 return rc;
983
984 if (likely(in_own_node(dnode)))
985 return tipc_sk_rcv(buf);
986
987 kfree_skb_list(buf);
988 return rc;
989}
990
853/* 991/*
854 * tipc_link_sync_xmit - synchronize broadcast link endpoints. 992 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
855 * 993 *
@@ -1238,7 +1376,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1238 tipc_bearer_send(l_ptr->bearer_id, buf, 1376 tipc_bearer_send(l_ptr->bearer_id, buf,
1239 &l_ptr->media_addr); 1377 &l_ptr->media_addr);
1240 if (msg_user(msg) == MSG_BUNDLER) 1378 if (msg_user(msg) == MSG_BUNDLER)
1241 msg_set_type(msg, CLOSED_MSG); 1379 msg_set_type(msg, BUNDLE_CLOSED);
1242 l_ptr->next_out = buf->next; 1380 l_ptr->next_out = buf->next;
1243 return 0; 1381 return 0;
1244 } 1382 }
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 200d518b218e..227ff8120897 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -227,8 +227,10 @@ void tipc_link_reset_all(struct tipc_node *node);
227void tipc_link_reset(struct tipc_link *l_ptr); 227void tipc_link_reset(struct tipc_link *l_ptr);
228void tipc_link_reset_list(unsigned int bearer_id); 228void tipc_link_reset_list(unsigned int bearer_id);
229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
230int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
230void tipc_link_names_xmit(struct list_head *message_list, u32 dest); 231void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
231int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); 232int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
233int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
232int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 234int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
233u32 tipc_link_get_max_pkt(u32 dest, u32 selector); 235u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
234int tipc_link_iovec_xmit_fast(struct tipc_port *sender, 236int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 8be6e94a1ca9..e02afc96edd7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -37,20 +37,11 @@
37#include "core.h" 37#include "core.h"
38#include "msg.h" 38#include "msg.h"
39 39
40u32 tipc_msg_tot_importance(struct tipc_msg *m) 40static unsigned int align(unsigned int i)
41{ 41{
42 if (likely(msg_isdata(m))) { 42 return (i + 3) & ~3u;
43 if (likely(msg_orignode(m) == tipc_own_addr))
44 return msg_importance(m);
45 return msg_importance(m) + 4;
46 }
47 if ((msg_user(m) == MSG_FRAGMENTER) &&
48 (msg_type(m) == FIRST_FRAGMENT))
49 return msg_importance(msg_get_wrapped(m));
50 return msg_importance(m);
51} 43}
52 44
53
54void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 45void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
55 u32 destnode) 46 u32 destnode)
56{ 47{
@@ -152,3 +143,86 @@ out_free:
152 kfree_skb(*buf); 143 kfree_skb(*buf);
153 return 0; 144 return 0;
154} 145}
146
147/**
148 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
149 * @bbuf: the existing buffer ("bundle")
150 * @buf: buffer to be appended
151 * @mtu: max allowable size for the bundle buffer
152 * Consumes buffer if successful
153 * Returns true if bundling could be performed, otherwise false
154 */
155bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
156{
157 struct tipc_msg *bmsg = buf_msg(bbuf);
158 struct tipc_msg *msg = buf_msg(buf);
159 unsigned int bsz = msg_size(bmsg);
160 unsigned int msz = msg_size(msg);
161 u32 start = align(bsz);
162 u32 max = mtu - INT_H_SIZE;
163 u32 pad = start - bsz;
164
165 if (likely(msg_user(msg) == MSG_FRAGMENTER))
166 return false;
167 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
168 return false;
169 if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
170 return false;
171 if (likely(msg_user(bmsg) != MSG_BUNDLER))
172 return false;
173 if (likely(msg_type(bmsg) != BUNDLE_OPEN))
174 return false;
175 if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
176 return false;
177 if (unlikely(max < (start + msz)))
178 return false;
179
180 skb_put(bbuf, pad + msz);
181 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
182 msg_set_size(bmsg, start + msz);
183 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
184 bbuf->next = buf->next;
185 kfree_skb(buf);
186 return true;
187}
188
189/**
190 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
191 * @buf: buffer to be appended and replaced
192 * @mtu: max allowable size for the bundle buffer, inclusive header
193 * @dnode: destination node for message. (Not always present in header)
194 * Replaces buffer if successful
195 * Returns true if sucess, otherwise false
196 */
197bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
198{
199 struct sk_buff *bbuf;
200 struct tipc_msg *bmsg;
201 struct tipc_msg *msg = buf_msg(*buf);
202 u32 msz = msg_size(msg);
203 u32 max = mtu - INT_H_SIZE;
204
205 if (msg_user(msg) == MSG_FRAGMENTER)
206 return false;
207 if (msg_user(msg) == CHANGEOVER_PROTOCOL)
208 return false;
209 if (msg_user(msg) == BCAST_PROTOCOL)
210 return false;
211 if (msz > (max / 2))
212 return false;
213
214 bbuf = tipc_buf_acquire(max);
215 if (!bbuf)
216 return false;
217
218 skb_trim(bbuf, INT_H_SIZE);
219 bmsg = buf_msg(bbuf);
220 tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode);
221 msg_set_seqno(bmsg, msg_seqno(msg));
222 msg_set_ack(bmsg, msg_ack(msg));
223 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
224 bbuf->next = (*buf)->next;
225 tipc_msg_bundle(bbuf, *buf, mtu);
226 *buf = bbuf;
227 return true;
228}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 503511903d1d..41a05fa8d608 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
463#define FRAGMENT 1 463#define FRAGMENT 1
464#define LAST_FRAGMENT 2 464#define LAST_FRAGMENT 2
465 465
466/* Bundling protocol message types
467 */
468#define BUNDLE_OPEN 0
469#define BUNDLE_CLOSED 1
470
466/* 471/*
467 * Link management protocol message types 472 * Link management protocol message types
468 */ 473 */
@@ -706,12 +711,30 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
706 msg_set_bits(m, 9, 0, 0xffff, n); 711 msg_set_bits(m, 9, 0, 0xffff, n);
707} 712}
708 713
709u32 tipc_msg_tot_importance(struct tipc_msg *m); 714static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
715{
716 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
717 return msg_importance(msg_get_wrapped(m));
718 return msg_importance(m);
719}
720
721static inline u32 msg_tot_origport(struct tipc_msg *m)
722{
723 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
724 return msg_origport(msg_get_wrapped(m));
725 return msg_origport(m);
726}
727
710void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 728void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
711 u32 destnode); 729 u32 destnode);
730
712int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, 731int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
713 unsigned int len, int max_size, struct sk_buff **buf); 732 unsigned int len, int max_size, struct sk_buff **buf);
714 733
715int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 734int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
716 735
736bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
737
738bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
739
717#endif 740#endif