aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-06-25 21:41:38 -0400
committerDavid S. Miller <davem@davemloft.net>2014-06-27 15:50:55 -0400
commit4ccfe5e0419eefcab3010ff6a87ffb03aef86c5d (patch)
tree6ed2040f7bbf6a0602ff8014f87ab203f19cd173 /net/tipc
parente2dafe87d328774a94fdd77718422b9cbd97ed47 (diff)
tipc: connection oriented transport uses new send functions
We move the message sending across established connections to use the message preparation and send functions introduced earlier in this series. We now do the message preparation and call to the link send function directly from the socket, instead of going via the port layer. As a consequence of this change, the functions tipc_send(), tipc_port_iovec_rcv(), tipc_port_iovec_reject() and tipc_reject_msg() become unreferenced and can be eliminated from port.c. For the same reason, the functions tipc_link_xmit_fast(), tipc_link_iovec_xmit_long() and tipc_link_iovec_fast() can be eliminated from link.c. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c249
-rw-r--r--net/tipc/port.c142
-rw-r--r--net/tipc/port.h12
-rw-r--r--net/tipc/socket.c180
4 files changed, 70 insertions, 513 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 68d2afb44f2f..93a8033263c0 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -82,9 +82,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
82static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, 82static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
83 struct sk_buff **buf); 83 struct sk_buff **buf);
84static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); 84static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
85static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
86 struct iovec const *msg_sect,
87 unsigned int len, u32 destnode);
88static void link_state_event(struct tipc_link *l_ptr, u32 event); 85static void link_state_event(struct tipc_link *l_ptr, u32 event);
89static void link_reset_statistics(struct tipc_link *l_ptr); 86static void link_reset_statistics(struct tipc_link *l_ptr);
90static void link_print(struct tipc_link *l_ptr, const char *str); 87static void link_print(struct tipc_link *l_ptr, const char *str);
@@ -1071,252 +1068,6 @@ void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
1071} 1068}
1072 1069
1073/* 1070/*
1074 * tipc_link_xmit_fast: Entry for data messages where the
1075 * destination link is known and the header is complete,
1076 * inclusive total message length. Very time critical.
1077 * Link is locked. Returns user data length.
1078 */
1079static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
1080 u32 *used_max_pkt)
1081{
1082 struct tipc_msg *msg = buf_msg(buf);
1083 int res = msg_data_sz(msg);
1084
1085 if (likely(!link_congested(l_ptr))) {
1086 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1087 link_add_to_outqueue(l_ptr, buf, msg);
1088 tipc_bearer_send(l_ptr->bearer_id, buf,
1089 &l_ptr->media_addr);
1090 l_ptr->unacked_window = 0;
1091 return res;
1092 }
1093 else
1094 *used_max_pkt = l_ptr->max_pkt;
1095 }
1096 return __tipc_link_xmit(l_ptr, buf); /* All other cases */
1097}
1098
1099/*
1100 * tipc_link_iovec_xmit_fast: Entry for messages where the
1101 * destination processor is known and the header is complete,
1102 * except for total message length.
1103 * Returns user data length or errno.
1104 */
1105int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
1106 struct iovec const *msg_sect,
1107 unsigned int len, u32 destaddr)
1108{
1109 struct tipc_msg *hdr = &sender->phdr;
1110 struct tipc_link *l_ptr;
1111 struct sk_buff *buf;
1112 struct tipc_node *node;
1113 int res;
1114 u32 selector = msg_origport(hdr) & 1;
1115
1116again:
1117 /*
1118 * Try building message using port's max_pkt hint.
1119 * (Must not hold any locks while building message.)
1120 */
1121 res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
1122 /* Exit if build request was invalid */
1123 if (unlikely(res < 0))
1124 return res;
1125
1126 node = tipc_node_find(destaddr);
1127 if (likely(node)) {
1128 tipc_node_lock(node);
1129 l_ptr = node->active_links[selector];
1130 if (likely(l_ptr)) {
1131 if (likely(buf)) {
1132 res = tipc_link_xmit_fast(l_ptr, buf,
1133 &sender->max_pkt);
1134exit:
1135 tipc_node_unlock(node);
1136 return res;
1137 }
1138
1139 /* Exit if link (or bearer) is congested */
1140 if (link_congested(l_ptr)) {
1141 res = link_schedule_port(l_ptr,
1142 sender->ref, res);
1143 goto exit;
1144 }
1145
1146 /*
1147 * Message size exceeds max_pkt hint; update hint,
1148 * then re-try fast path or fragment the message
1149 */
1150 sender->max_pkt = l_ptr->max_pkt;
1151 tipc_node_unlock(node);
1152
1153
1154 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1155 goto again;
1156
1157 return tipc_link_iovec_long_xmit(sender, msg_sect,
1158 len, destaddr);
1159 }
1160 tipc_node_unlock(node);
1161 }
1162
1163 /* Couldn't find a link to the destination node */
1164 kfree_skb(buf);
1165 tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
1166 return -ENETUNREACH;
1167}
1168
1169/*
1170 * tipc_link_iovec_long_xmit(): Entry for long messages where the
1171 * destination node is known and the header is complete,
1172 * inclusive total message length.
1173 * Link and bearer congestion status have been checked to be ok,
1174 * and are ignored if they change.
1175 *
1176 * Note that fragments do not use the full link MTU so that they won't have
1177 * to undergo refragmentation if link changeover causes them to be sent
1178 * over another link with an additional tunnel header added as prefix.
1179 * (Refragmentation will still occur if the other link has a smaller MTU.)
1180 *
1181 * Returns user data length or errno.
1182 */
1183static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
1184 struct iovec const *msg_sect,
1185 unsigned int len, u32 destaddr)
1186{
1187 struct tipc_link *l_ptr;
1188 struct tipc_node *node;
1189 struct tipc_msg *hdr = &sender->phdr;
1190 u32 dsz = len;
1191 u32 max_pkt, fragm_sz, rest;
1192 struct tipc_msg fragm_hdr;
1193 struct sk_buff *buf, *buf_chain, *prev;
1194 u32 fragm_crs, fragm_rest, hsz, sect_rest;
1195 const unchar __user *sect_crs;
1196 int curr_sect;
1197 u32 fragm_no;
1198 int res = 0;
1199
1200again:
1201 fragm_no = 1;
1202 max_pkt = sender->max_pkt - INT_H_SIZE;
1203 /* leave room for tunnel header in case of link changeover */
1204 fragm_sz = max_pkt - INT_H_SIZE;
1205 /* leave room for fragmentation header in each fragment */
1206 rest = dsz;
1207 fragm_crs = 0;
1208 fragm_rest = 0;
1209 sect_rest = 0;
1210 sect_crs = NULL;
1211 curr_sect = -1;
1212
1213 /* Prepare reusable fragment header */
1214 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1215 INT_H_SIZE, msg_destnode(hdr));
1216 msg_set_size(&fragm_hdr, max_pkt);
1217 msg_set_fragm_no(&fragm_hdr, 1);
1218
1219 /* Prepare header of first fragment */
1220 buf_chain = buf = tipc_buf_acquire(max_pkt);
1221 if (!buf)
1222 return -ENOMEM;
1223 buf->next = NULL;
1224 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1225 hsz = msg_hdr_sz(hdr);
1226 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1227
1228 /* Chop up message */
1229 fragm_crs = INT_H_SIZE + hsz;
1230 fragm_rest = fragm_sz - hsz;
1231
1232 do { /* For all sections */
1233 u32 sz;
1234
1235 if (!sect_rest) {
1236 sect_rest = msg_sect[++curr_sect].iov_len;
1237 sect_crs = msg_sect[curr_sect].iov_base;
1238 }
1239
1240 if (sect_rest < fragm_rest)
1241 sz = sect_rest;
1242 else
1243 sz = fragm_rest;
1244
1245 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1246 res = -EFAULT;
1247error:
1248 kfree_skb_list(buf_chain);
1249 return res;
1250 }
1251 sect_crs += sz;
1252 sect_rest -= sz;
1253 fragm_crs += sz;
1254 fragm_rest -= sz;
1255 rest -= sz;
1256
1257 if (!fragm_rest && rest) {
1258
1259 /* Initiate new fragment: */
1260 if (rest <= fragm_sz) {
1261 fragm_sz = rest;
1262 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1263 } else {
1264 msg_set_type(&fragm_hdr, FRAGMENT);
1265 }
1266 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1267 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1268 prev = buf;
1269 buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1270 if (!buf) {
1271 res = -ENOMEM;
1272 goto error;
1273 }
1274
1275 buf->next = NULL;
1276 prev->next = buf;
1277 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1278 fragm_crs = INT_H_SIZE;
1279 fragm_rest = fragm_sz;
1280 }
1281 } while (rest > 0);
1282
1283 /*
1284 * Now we have a buffer chain. Select a link and check
1285 * that packet size is still OK
1286 */
1287 node = tipc_node_find(destaddr);
1288 if (likely(node)) {
1289 tipc_node_lock(node);
1290 l_ptr = node->active_links[sender->ref & 1];
1291 if (!l_ptr) {
1292 tipc_node_unlock(node);
1293 goto reject;
1294 }
1295 if (l_ptr->max_pkt < max_pkt) {
1296 sender->max_pkt = l_ptr->max_pkt;
1297 tipc_node_unlock(node);
1298 kfree_skb_list(buf_chain);
1299 goto again;
1300 }
1301 } else {
1302reject:
1303 kfree_skb_list(buf_chain);
1304 tipc_port_iovec_reject(sender, hdr, msg_sect, len,
1305 TIPC_ERR_NO_NODE);
1306 return -ENETUNREACH;
1307 }
1308
1309 /* Append chain of fragments to send queue & send them */
1310 l_ptr->long_msg_seq_no++;
1311 link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1312 l_ptr->stats.sent_fragments += fragm_no;
1313 l_ptr->stats.sent_fragmented++;
1314 tipc_link_push_queue(l_ptr);
1315 tipc_node_unlock(node);
1316 return dsz;
1317}
1318
1319/*
1320 * tipc_link_push_packet: Push one unsent packet to the media 1071 * tipc_link_push_packet: Push one unsent packet to the media
1321 */ 1072 */
1322static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 1073static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 8350ec932514..606ff1a78e2b 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -211,6 +211,7 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
211 } 211 }
212 212
213 p_ptr->max_pkt = MAX_PKT_DEFAULT; 213 p_ptr->max_pkt = MAX_PKT_DEFAULT;
214 p_ptr->sent = 1;
214 p_ptr->ref = ref; 215 p_ptr->ref = ref;
215 INIT_LIST_HEAD(&p_ptr->wait_list); 216 INIT_LIST_HEAD(&p_ptr->wait_list);
216 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 217 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
@@ -279,92 +280,6 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
279 return buf; 280 return buf;
280} 281}
281 282
282int tipc_reject_msg(struct sk_buff *buf, u32 err)
283{
284 struct tipc_msg *msg = buf_msg(buf);
285 struct sk_buff *rbuf;
286 struct tipc_msg *rmsg;
287 int hdr_sz;
288 u32 imp;
289 u32 data_sz = msg_data_sz(msg);
290 u32 src_node;
291 u32 rmsg_sz;
292
293 /* discard rejected message if it shouldn't be returned to sender */
294 if (WARN(!msg_isdata(msg),
295 "attempt to reject message with user=%u", msg_user(msg))) {
296 dump_stack();
297 goto exit;
298 }
299 if (msg_errcode(msg) || msg_dest_droppable(msg))
300 goto exit;
301
302 /*
303 * construct returned message by copying rejected message header and
304 * data (or subset), then updating header fields that need adjusting
305 */
306 hdr_sz = msg_hdr_sz(msg);
307 rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
308
309 rbuf = tipc_buf_acquire(rmsg_sz);
310 if (rbuf == NULL)
311 goto exit;
312
313 rmsg = buf_msg(rbuf);
314 skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
315
316 if (msg_connected(rmsg)) {
317 imp = msg_importance(rmsg);
318 if (imp < TIPC_CRITICAL_IMPORTANCE)
319 msg_set_importance(rmsg, ++imp);
320 }
321 msg_set_non_seq(rmsg, 0);
322 msg_set_size(rmsg, rmsg_sz);
323 msg_set_errcode(rmsg, err);
324 msg_set_prevnode(rmsg, tipc_own_addr);
325 msg_swap_words(rmsg, 4, 5);
326 if (!msg_short(rmsg))
327 msg_swap_words(rmsg, 6, 7);
328
329 /* send self-abort message when rejecting on a connected port */
330 if (msg_connected(msg)) {
331 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
332
333 if (p_ptr) {
334 struct sk_buff *abuf = NULL;
335
336 if (p_ptr->connected)
337 abuf = port_build_self_abort_msg(p_ptr, err);
338 tipc_port_unlock(p_ptr);
339 tipc_net_route_msg(abuf);
340 }
341 }
342
343 /* send returned message & dispose of rejected message */
344 src_node = msg_prevnode(msg);
345 if (in_own_node(src_node))
346 tipc_sk_rcv(rbuf);
347 else
348 tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
349exit:
350 kfree_skb(buf);
351 return data_sz;
352}
353
354int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr,
355 struct iovec const *msg_sect, unsigned int len,
356 int err)
357{
358 struct sk_buff *buf;
359 int res;
360
361 res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
362 if (!buf)
363 return res;
364
365 return tipc_reject_msg(buf, err);
366}
367
368static void port_timeout(unsigned long ref) 283static void port_timeout(unsigned long ref)
369{ 284{
370 struct tipc_port *p_ptr = tipc_port_lock(ref); 285 struct tipc_port *p_ptr = tipc_port_lock(ref);
@@ -698,7 +613,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
698 (net_ev_handler)port_handle_node_down); 613 (net_ev_handler)port_handle_node_down);
699 res = 0; 614 res = 0;
700exit: 615exit:
701 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 616 p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
702 return res; 617 return res;
703} 618}
704 619
@@ -753,56 +668,3 @@ int tipc_port_shutdown(u32 ref)
753 tipc_net_route_msg(buf); 668 tipc_net_route_msg(buf);
754 return tipc_port_disconnect(ref); 669 return tipc_port_disconnect(ref);
755} 670}
756
757/*
758 * tipc_port_iovec_rcv: Concatenate and deliver sectioned
759 * message for this node.
760 */
761static int tipc_port_iovec_rcv(struct tipc_port *sender,
762 struct iovec const *msg_sect,
763 unsigned int len)
764{
765 struct sk_buff *buf;
766 int res;
767
768 res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
769 if (likely(buf))
770 tipc_sk_rcv(buf);
771 return res;
772}
773
774/**
775 * tipc_send - send message sections on connection
776 */
777int tipc_send(struct tipc_port *p_ptr,
778 struct iovec const *msg_sect,
779 unsigned int len)
780{
781 u32 destnode;
782 int res;
783
784 if (!p_ptr->connected)
785 return -EINVAL;
786
787 p_ptr->congested = 1;
788 if (!tipc_port_congested(p_ptr)) {
789 destnode = tipc_port_peernode(p_ptr);
790 if (likely(!in_own_node(destnode)))
791 res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
792 destnode);
793 else
794 res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
795
796 if (likely(res != -ELINKCONG)) {
797 p_ptr->congested = 0;
798 if (res > 0)
799 p_ptr->sent++;
800 return res;
801 }
802 }
803 if (tipc_port_unreliable(p_ptr)) {
804 p_ptr->congested = 0;
805 return len;
806 }
807 return -ELINKCONG;
808}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index e566d55e2655..231d9488189c 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -104,8 +104,6 @@ struct tipc_port_list;
104u32 tipc_port_init(struct tipc_port *p_ptr, 104u32 tipc_port_init(struct tipc_port *p_ptr,
105 const unsigned int importance); 105 const unsigned int importance);
106 106
107int tipc_reject_msg(struct sk_buff *buf, u32 err);
108
109void tipc_acknowledge(u32 port_ref, u32 ack); 107void tipc_acknowledge(u32 port_ref, u32 ack);
110 108
111void tipc_port_destroy(struct tipc_port *p_ptr); 109void tipc_port_destroy(struct tipc_port *p_ptr);
@@ -136,21 +134,11 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
136 * TIPC messaging routines 134 * TIPC messaging routines
137 */ 135 */
138 136
139int tipc_send(struct tipc_port *port,
140 struct iovec const *msg_sect,
141 unsigned int len);
142
143int tipc_port_mcast_xmit(struct tipc_port *port, 137int tipc_port_mcast_xmit(struct tipc_port *port,
144 struct tipc_name_seq const *seq, 138 struct tipc_name_seq const *seq,
145 struct iovec const *msg, 139 struct iovec const *msg,
146 unsigned int len); 140 unsigned int len);
147 141
148int tipc_port_iovec_reject(struct tipc_port *p_ptr,
149 struct tipc_msg *hdr,
150 struct iovec const *msg_sect,
151 unsigned int len,
152 int err);
153
154struct sk_buff *tipc_port_get_ports(void); 142struct sk_buff *tipc_port_get_ports(void);
155void tipc_port_proto_rcv(struct sk_buff *buf); 143void tipc_port_proto_rcv(struct sk_buff *buf);
156void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp); 144void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5690751cbfa5..bfe79bbd83a1 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -206,6 +206,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
206 sk->sk_data_ready = tipc_data_ready; 206 sk->sk_data_ready = tipc_data_ready;
207 sk->sk_write_space = tipc_write_space; 207 sk->sk_write_space = tipc_write_space;
208 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; 208 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
209 tsk->port.sent = 0;
209 atomic_set(&tsk->dupl_rcvcnt, 0); 210 atomic_set(&tsk->dupl_rcvcnt, 0);
210 tipc_port_unlock(port); 211 tipc_port_unlock(port);
211 212
@@ -784,30 +785,40 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
784} 785}
785 786
786/** 787/**
787 * tipc_send_packet - send a connection-oriented message 788 * tipc_send_stream - send stream-oriented data
788 * @iocb: if NULL, indicates that socket lock is already held 789 * @iocb: (unused)
789 * @sock: socket structure 790 * @sock: socket structure
790 * @m: message to send 791 * @m: data to send
791 * @total_len: length of message 792 * @dsz: total length of data to be transmitted
792 * 793 *
793 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data. 794 * Used for SOCK_STREAM data.
794 * 795 *
795 * Returns the number of bytes sent on success, or errno otherwise 796 * Returns the number of bytes sent on success (or partial success),
797 * or errno if no data sent
796 */ 798 */
797static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, 799static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
798 struct msghdr *m, size_t total_len) 800 struct msghdr *m, size_t dsz)
799{ 801{
800 struct sock *sk = sock->sk; 802 struct sock *sk = sock->sk;
801 struct tipc_sock *tsk = tipc_sk(sk); 803 struct tipc_sock *tsk = tipc_sk(sk);
804 struct tipc_port *port = &tsk->port;
805 struct tipc_msg *mhdr = &port->phdr;
806 struct sk_buff *buf;
802 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 807 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
803 int res = -EINVAL; 808 u32 ref = port->ref;
809 int rc = -EINVAL;
804 long timeo; 810 long timeo;
811 u32 dnode;
812 uint mtu, send, sent = 0;
805 813
806 /* Handle implied connection establishment */ 814 /* Handle implied connection establishment */
807 if (unlikely(dest)) 815 if (unlikely(dest)) {
808 return tipc_sendmsg(iocb, sock, m, total_len); 816 rc = tipc_sendmsg(iocb, sock, m, dsz);
809 817 if (dsz && (dsz == rc))
810 if (total_len > TIPC_MAX_USER_MSG_SIZE) 818 tsk->port.sent = 1;
819 return rc;
820 }
821 if (dsz > (uint)INT_MAX)
811 return -EMSGSIZE; 822 return -EMSGSIZE;
812 823
813 if (iocb) 824 if (iocb)
@@ -815,123 +826,68 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
815 826
816 if (unlikely(sock->state != SS_CONNECTED)) { 827 if (unlikely(sock->state != SS_CONNECTED)) {
817 if (sock->state == SS_DISCONNECTING) 828 if (sock->state == SS_DISCONNECTING)
818 res = -EPIPE; 829 rc = -EPIPE;
819 else 830 else
820 res = -ENOTCONN; 831 rc = -ENOTCONN;
821 goto exit; 832 goto exit;
822 } 833 }
823 834
824 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 835 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
836 dnode = tipc_port_peernode(port);
837 port->congested = 1;
838
839next:
840 mtu = port->max_pkt;
841 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
842 rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
843 if (unlikely(rc < 0))
844 goto exit;
825 do { 845 do {
826 res = tipc_send(&tsk->port, m->msg_iov, total_len); 846 port->congested = 1;
827 if (likely(res != -ELINKCONG)) 847 if (likely(!tipc_port_congested(port))) {
828 break; 848 rc = tipc_link_xmit2(buf, dnode, ref);
829 res = tipc_wait_for_sndpkt(sock, &timeo); 849 if (likely(!rc)) {
830 if (res) 850 port->sent++;
831 break; 851 sent += send;
832 } while (1); 852 if (sent == dsz)
853 break;
854 goto next;
855 }
856 if (rc == -EMSGSIZE) {
857 port->max_pkt = tipc_node_get_mtu(dnode, ref);
858 goto next;
859 }
860 if (rc != -ELINKCONG)
861 break;
862 }
863 rc = tipc_wait_for_sndpkt(sock, &timeo);
864 } while (!rc);
865
866 port->congested = 0;
833exit: 867exit:
834 if (iocb) 868 if (iocb)
835 release_sock(sk); 869 release_sock(sk);
836 return res; 870 return sent ? sent : rc;
837} 871}
838 872
839/** 873/**
840 * tipc_send_stream - send stream-oriented data 874 * tipc_send_packet - send a connection-oriented message
841 * @iocb: (unused) 875 * @iocb: if NULL, indicates that socket lock is already held
842 * @sock: socket structure 876 * @sock: socket structure
843 * @m: data to send 877 * @m: message to send
844 * @total_len: total length of data to be sent 878 * @dsz: length of data to be transmitted
845 * 879 *
846 * Used for SOCK_STREAM data. 880 * Used for SOCK_SEQPACKET messages.
847 * 881 *
848 * Returns the number of bytes sent on success (or partial success), 882 * Returns the number of bytes sent on success, or errno otherwise
849 * or errno if no data sent
850 */ 883 */
851static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, 884static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
852 struct msghdr *m, size_t total_len) 885 struct msghdr *m, size_t dsz)
853{ 886{
854 struct sock *sk = sock->sk; 887 if (dsz > TIPC_MAX_USER_MSG_SIZE)
855 struct tipc_sock *tsk = tipc_sk(sk); 888 return -EMSGSIZE;
856 struct msghdr my_msg;
857 struct iovec my_iov;
858 struct iovec *curr_iov;
859 int curr_iovlen;
860 char __user *curr_start;
861 u32 hdr_size;
862 int curr_left;
863 int bytes_to_send;
864 int bytes_sent;
865 int res;
866
867 lock_sock(sk);
868
869 /* Handle special cases where there is no connection */
870 if (unlikely(sock->state != SS_CONNECTED)) {
871 if (sock->state == SS_UNCONNECTED)
872 res = tipc_send_packet(NULL, sock, m, total_len);
873 else
874 res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
875 goto exit;
876 }
877
878 if (unlikely(m->msg_name)) {
879 res = -EISCONN;
880 goto exit;
881 }
882
883 if (total_len > (unsigned int)INT_MAX) {
884 res = -EMSGSIZE;
885 goto exit;
886 }
887
888 /*
889 * Send each iovec entry using one or more messages
890 *
891 * Note: This algorithm is good for the most likely case
892 * (i.e. one large iovec entry), but could be improved to pass sets
893 * of small iovec entries into send_packet().
894 */
895 curr_iov = m->msg_iov;
896 curr_iovlen = m->msg_iovlen;
897 my_msg.msg_iov = &my_iov;
898 my_msg.msg_iovlen = 1;
899 my_msg.msg_flags = m->msg_flags;
900 my_msg.msg_name = NULL;
901 bytes_sent = 0;
902
903 hdr_size = msg_hdr_sz(&tsk->port.phdr);
904
905 while (curr_iovlen--) {
906 curr_start = curr_iov->iov_base;
907 curr_left = curr_iov->iov_len;
908
909 while (curr_left) {
910 bytes_to_send = tsk->port.max_pkt - hdr_size;
911 if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
912 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
913 if (curr_left < bytes_to_send)
914 bytes_to_send = curr_left;
915 my_iov.iov_base = curr_start;
916 my_iov.iov_len = bytes_to_send;
917 res = tipc_send_packet(NULL, sock, &my_msg,
918 bytes_to_send);
919 if (res < 0) {
920 if (bytes_sent)
921 res = bytes_sent;
922 goto exit;
923 }
924 curr_left -= bytes_to_send;
925 curr_start += bytes_to_send;
926 bytes_sent += bytes_to_send;
927 }
928 889
929 curr_iov++; 890 return tipc_send_stream(iocb, sock, m, dsz);
930 }
931 res = bytes_sent;
932exit:
933 release_sock(sk);
934 return res;
935} 891}
936 892
937/** 893/**