aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-06-25 21:41:32 -0400
committerDavid S. Miller <davem@davemloft.net>2014-06-27 15:50:54 -0400
commit4f1688b2c63cd86f0d7bcf95a9b3040e38bd3c1a (patch)
tree757c43e51ba7260d1745a8d5b2e3bdfb6b0a005a /net/tipc
parente4de5fab806f74622600ab7fd6ed22b7f911a8c5 (diff)
tipc: introduce send functions for chained buffers in link
The current link implementation provides several different transmit functions, depending on the characteristics of the message to be sent: if it is an iovec or an sk_buff, if it needs fragmentation or not, if the caller holds the node_lock or not. The permutation of these options gives us an unwanted amount of unnecessarily complex code. As a first step towards simplifying the send path for all messages, we introduce two new send functions at link level, tipc_link_xmit2() and __tipc_link_xmit2(). The former looks up a link to the message destination, and if one is found, it grabs the node lock and calls the second function, which works exclusively inside the node lock protection. If no link is found, and the destination is on the same node, it delivers the message directly to the local destination socket. The new functions take a buffer chain where all packet headers are already prepared, and the correct MTU has been used. These two functions will later replace all other link-level transmit functions. The functions are not backwards compatible, so we have added them as new functions with temporary names. They are tested, but have no users yet. Those will be added later in this series. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c140
-rw-r--r--net/tipc/link.h2
-rw-r--r--net/tipc/msg.c96
-rw-r--r--net/tipc/msg.h25
4 files changed, 250 insertions, 13 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ad2c57f5868d..68d2afb44f2f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -850,6 +850,144 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
850 return res; 850 return res;
851} 851}
852 852
853/* tipc_link_cong: determine return value and how to treat the
854 * sent buffer during link congestion.
855 * - For plain, errorless user data messages we keep the buffer and
856 * return -ELINKONG.
857 * - For all other messages we discard the buffer and return -EHOSTUNREACH
858 * - For TIPC internal messages we also reset the link
859 */
860static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
861{
862 struct tipc_msg *msg = buf_msg(buf);
863 uint psz = msg_size(msg);
864 uint imp = tipc_msg_tot_importance(msg);
865 u32 oport = msg_tot_origport(msg);
866
867 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
868 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
869 link_schedule_port(link, oport, psz);
870 return -ELINKCONG;
871 }
872 } else {
873 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
874 tipc_link_reset(link);
875 }
876 kfree_skb_list(buf);
877 return -EHOSTUNREACH;
878}
879
880/**
881 * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
882 * @link: link to use
883 * @buf: chain of buffers containing message
884 * Consumes the buffer chain, except when returning -ELINKCONG
885 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
886 * user data messages) or -EHOSTUNREACH (all other messages/senders)
887 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
888 * to act on the return value, since they may need to do more send attempts.
889 */
890int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
891{
892 struct tipc_msg *msg = buf_msg(buf);
893 uint psz = msg_size(msg);
894 uint qsz = link->out_queue_size;
895 uint sndlim = link->queue_limit[0];
896 uint imp = tipc_msg_tot_importance(msg);
897 uint mtu = link->max_pkt;
898 uint ack = mod(link->next_in_no - 1);
899 uint seqno = link->next_out_no;
900 uint bc_last_in = link->owner->bclink.last_in;
901 struct tipc_media_addr *addr = &link->media_addr;
902 struct sk_buff *next = buf->next;
903
904 /* Match queue limits against msg importance: */
905 if (unlikely(qsz >= link->queue_limit[imp]))
906 return tipc_link_cong(link, buf);
907
908 /* Has valid packet limit been used ? */
909 if (unlikely(psz > mtu)) {
910 kfree_skb_list(buf);
911 return -EMSGSIZE;
912 }
913
914 /* Prepare each packet for sending, and add to outqueue: */
915 while (buf) {
916 next = buf->next;
917 msg = buf_msg(buf);
918 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
919 msg_set_bcast_ack(msg, bc_last_in);
920
921 if (!link->first_out) {
922 link->first_out = buf;
923 } else if (qsz < sndlim) {
924 link->last_out->next = buf;
925 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
926 link->stats.sent_bundled++;
927 buf = next;
928 next = buf->next;
929 continue;
930 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
931 link->stats.sent_bundled++;
932 link->stats.sent_bundles++;
933 link->last_out->next = buf;
934 if (!link->next_out)
935 link->next_out = buf;
936 } else {
937 link->last_out->next = buf;
938 if (!link->next_out)
939 link->next_out = buf;
940 }
941
942 /* Send packet if possible: */
943 if (likely(++qsz <= sndlim)) {
944 tipc_bearer_send(link->bearer_id, buf, addr);
945 link->next_out = next;
946 link->unacked_window = 0;
947 }
948 seqno++;
949 link->last_out = buf;
950 buf = next;
951 }
952 link->next_out_no = seqno;
953 link->out_queue_size = qsz;
954 return 0;
955}
956
957/**
958 * tipc_link_xmit2() is the general link level function for message sending
959 * @buf: chain of buffers containing message
960 * @dsz: amount of user data to be sent
961 * @dnode: address of destination node
962 * @selector: a number used for deterministic link selection
963 * Consumes the buffer chain, except when returning -ELINKCONG
964 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
965 */
966int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
967{
968 struct tipc_link *link = NULL;
969 struct tipc_node *node;
970 int rc = -EHOSTUNREACH;
971
972 node = tipc_node_find(dnode);
973 if (node) {
974 tipc_node_lock(node);
975 link = node->active_links[selector & 1];
976 if (link)
977 rc = __tipc_link_xmit2(link, buf);
978 tipc_node_unlock(node);
979 }
980
981 if (link)
982 return rc;
983
984 if (likely(in_own_node(dnode)))
985 return tipc_sk_rcv(buf);
986
987 kfree_skb_list(buf);
988 return rc;
989}
990
853/* 991/*
854 * tipc_link_sync_xmit - synchronize broadcast link endpoints. 992 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
855 * 993 *
@@ -1238,7 +1376,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1238 tipc_bearer_send(l_ptr->bearer_id, buf, 1376 tipc_bearer_send(l_ptr->bearer_id, buf,
1239 &l_ptr->media_addr); 1377 &l_ptr->media_addr);
1240 if (msg_user(msg) == MSG_BUNDLER) 1378 if (msg_user(msg) == MSG_BUNDLER)
1241 msg_set_type(msg, CLOSED_MSG); 1379 msg_set_type(msg, BUNDLE_CLOSED);
1242 l_ptr->next_out = buf->next; 1380 l_ptr->next_out = buf->next;
1243 return 0; 1381 return 0;
1244 } 1382 }
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 200d518b218e..227ff8120897 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -227,8 +227,10 @@ void tipc_link_reset_all(struct tipc_node *node);
227void tipc_link_reset(struct tipc_link *l_ptr); 227void tipc_link_reset(struct tipc_link *l_ptr);
228void tipc_link_reset_list(unsigned int bearer_id); 228void tipc_link_reset_list(unsigned int bearer_id);
229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
230int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
230void tipc_link_names_xmit(struct list_head *message_list, u32 dest); 231void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
231int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); 232int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
233int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
232int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 234int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
233u32 tipc_link_get_max_pkt(u32 dest, u32 selector); 235u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
234int tipc_link_iovec_xmit_fast(struct tipc_port *sender, 236int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 8be6e94a1ca9..e02afc96edd7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -37,20 +37,11 @@
37#include "core.h" 37#include "core.h"
38#include "msg.h" 38#include "msg.h"
39 39
40u32 tipc_msg_tot_importance(struct tipc_msg *m) 40static unsigned int align(unsigned int i)
41{ 41{
42 if (likely(msg_isdata(m))) { 42 return (i + 3) & ~3u;
43 if (likely(msg_orignode(m) == tipc_own_addr))
44 return msg_importance(m);
45 return msg_importance(m) + 4;
46 }
47 if ((msg_user(m) == MSG_FRAGMENTER) &&
48 (msg_type(m) == FIRST_FRAGMENT))
49 return msg_importance(msg_get_wrapped(m));
50 return msg_importance(m);
51} 43}
52 44
53
54void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 45void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
55 u32 destnode) 46 u32 destnode)
56{ 47{
@@ -152,3 +143,86 @@ out_free:
152 kfree_skb(*buf); 143 kfree_skb(*buf);
153 return 0; 144 return 0;
154} 145}
146
147/**
148 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
149 * @bbuf: the existing buffer ("bundle")
150 * @buf: buffer to be appended
151 * @mtu: max allowable size for the bundle buffer
152 * Consumes buffer if successful
153 * Returns true if bundling could be performed, otherwise false
154 */
155bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
156{
157 struct tipc_msg *bmsg = buf_msg(bbuf);
158 struct tipc_msg *msg = buf_msg(buf);
159 unsigned int bsz = msg_size(bmsg);
160 unsigned int msz = msg_size(msg);
161 u32 start = align(bsz);
162 u32 max = mtu - INT_H_SIZE;
163 u32 pad = start - bsz;
164
165 if (likely(msg_user(msg) == MSG_FRAGMENTER))
166 return false;
167 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
168 return false;
169 if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
170 return false;
171 if (likely(msg_user(bmsg) != MSG_BUNDLER))
172 return false;
173 if (likely(msg_type(bmsg) != BUNDLE_OPEN))
174 return false;
175 if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
176 return false;
177 if (unlikely(max < (start + msz)))
178 return false;
179
180 skb_put(bbuf, pad + msz);
181 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
182 msg_set_size(bmsg, start + msz);
183 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
184 bbuf->next = buf->next;
185 kfree_skb(buf);
186 return true;
187}
188
189/**
190 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
191 * @buf: buffer to be appended and replaced
192 * @mtu: max allowable size for the bundle buffer, inclusive header
193 * @dnode: destination node for message. (Not always present in header)
194 * Replaces buffer if successful
195 * Returns true if sucess, otherwise false
196 */
197bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
198{
199 struct sk_buff *bbuf;
200 struct tipc_msg *bmsg;
201 struct tipc_msg *msg = buf_msg(*buf);
202 u32 msz = msg_size(msg);
203 u32 max = mtu - INT_H_SIZE;
204
205 if (msg_user(msg) == MSG_FRAGMENTER)
206 return false;
207 if (msg_user(msg) == CHANGEOVER_PROTOCOL)
208 return false;
209 if (msg_user(msg) == BCAST_PROTOCOL)
210 return false;
211 if (msz > (max / 2))
212 return false;
213
214 bbuf = tipc_buf_acquire(max);
215 if (!bbuf)
216 return false;
217
218 skb_trim(bbuf, INT_H_SIZE);
219 bmsg = buf_msg(bbuf);
220 tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode);
221 msg_set_seqno(bmsg, msg_seqno(msg));
222 msg_set_ack(bmsg, msg_ack(msg));
223 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
224 bbuf->next = (*buf)->next;
225 tipc_msg_bundle(bbuf, *buf, mtu);
226 *buf = bbuf;
227 return true;
228}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 503511903d1d..41a05fa8d608 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
463#define FRAGMENT 1 463#define FRAGMENT 1
464#define LAST_FRAGMENT 2 464#define LAST_FRAGMENT 2
465 465
466/* Bundling protocol message types
467 */
468#define BUNDLE_OPEN 0
469#define BUNDLE_CLOSED 1
470
466/* 471/*
467 * Link management protocol message types 472 * Link management protocol message types
468 */ 473 */
@@ -706,12 +711,30 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
706 msg_set_bits(m, 9, 0, 0xffff, n); 711 msg_set_bits(m, 9, 0, 0xffff, n);
707} 712}
708 713
709u32 tipc_msg_tot_importance(struct tipc_msg *m); 714static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
715{
716 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
717 return msg_importance(msg_get_wrapped(m));
718 return msg_importance(m);
719}
720
721static inline u32 msg_tot_origport(struct tipc_msg *m)
722{
723 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
724 return msg_origport(msg_get_wrapped(m));
725 return msg_origport(m);
726}
727
710void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 728void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
711 u32 destnode); 729 u32 destnode);
730
712int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, 731int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
713 unsigned int len, int max_size, struct sk_buff **buf); 732 unsigned int len, int max_size, struct sk_buff **buf);
714 733
715int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 734int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
716 735
736bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
737
738bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
739
717#endif 740#endif