aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-06-25 21:41:42 -0400
committerDavid S. Miller <davem@davemloft.net>2014-06-27 15:50:56 -0400
commit60120526c26f42fd658e32bf4a6d548483d09da8 (patch)
treed8d74d0ae3e05b1228498f5db04ff88a66b03035 /net
parentac0074ee70ddb32f62d918b31cb20e3c947c75a1 (diff)
tipc: simplify connection congestion handling
As a consequence of the recently introduced serialized access to the socket in commit 8d94168a761819d10252bab1f8de6d7b202c3baa ("tipc: same receive code path for connection protocol and data messages") we can make a number of simplifications in the detection and handling of connection congestion situations. - We don't need to keep two counters, one for sent messages and one for acked messages. There is no longer any risk for races between acknowledge messages arriving in BH and data message sending running in user context. So we merge this into one counter, 'sent_unacked', which is incremented at sending and subtracted from at acknowledge reception. - We don't need to set the 'congested' field in tipc_port to true before we sent the message, and clear it when sending is successful. (As a matter of fact, it was never necessary; the field was set in link_schedule_port() before any wakeup could arrive anyway.) - We keep the conditions for link congestion and connection connection congestion separated. There would otherwise be a risk that an arriving acknowledge message may wake up a user sleeping because of link congestion. - We can simplify reception of acknowledge messages. We also make some cosmetic/structural changes: - We rename the 'congested' field to the more correct 'link_congĀ“. - We rename 'conn_unacked' to 'rcv_unacked' - We move the above mentioned fields from struct tipc_port to struct tipc_sock. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/link.c10
-rw-r--r--net/tipc/port.c12
-rw-r--r--net/tipc/port.h16
-rw-r--r--net/tipc/socket.c48
-rw-r--r--net/tipc/socket.h11
5 files changed, 43 insertions, 54 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 96a8072f73cc..a081e7d08d22 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -332,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
332static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 332static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
333{ 333{
334 struct tipc_port *p_ptr; 334 struct tipc_port *p_ptr;
335 struct tipc_sock *tsk;
335 336
336 spin_lock_bh(&tipc_port_list_lock); 337 spin_lock_bh(&tipc_port_list_lock);
337 p_ptr = tipc_port_lock(origport); 338 p_ptr = tipc_port_lock(origport);
338 if (p_ptr) { 339 if (p_ptr) {
339 if (!list_empty(&p_ptr->wait_list)) 340 if (!list_empty(&p_ptr->wait_list))
340 goto exit; 341 goto exit;
341 p_ptr->congested = 1; 342 tsk = tipc_port_to_sock(p_ptr);
343 tsk->link_cong = 1;
342 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 344 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
343 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 345 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
344 l_ptr->stats.link_congs++; 346 l_ptr->stats.link_congs++;
@@ -352,6 +354,7 @@ exit:
352void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 354void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
353{ 355{
354 struct tipc_port *p_ptr; 356 struct tipc_port *p_ptr;
357 struct tipc_sock *tsk;
355 struct tipc_port *temp_p_ptr; 358 struct tipc_port *temp_p_ptr;
356 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 359 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
357 360
@@ -367,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
367 wait_list) { 370 wait_list) {
368 if (win <= 0) 371 if (win <= 0)
369 break; 372 break;
373 tsk = tipc_port_to_sock(p_ptr);
370 list_del_init(&p_ptr->wait_list); 374 list_del_init(&p_ptr->wait_list);
371 spin_lock_bh(p_ptr->lock); 375 spin_lock_bh(p_ptr->lock);
372 p_ptr->congested = 0; 376 tsk->link_cong = 0;
373 tipc_port_wakeup(p_ptr); 377 tipc_sock_wakeup(tsk);
374 win -= p_ptr->waiting_pkts; 378 win -= p_ptr->waiting_pkts;
375 spin_unlock_bh(p_ptr->lock); 379 spin_unlock_bh(p_ptr->lock);
376 } 380 }
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 9f53d5ac35e1..0d09dcb6da18 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -186,12 +186,6 @@ exit:
186 tipc_port_list_free(dp); 186 tipc_port_list_free(dp);
187} 187}
188 188
189
190void tipc_port_wakeup(struct tipc_port *port)
191{
192 tipc_sock_wakeup(tipc_port_to_sock(port));
193}
194
195/* tipc_port_init - intiate TIPC port and lock it 189/* tipc_port_init - intiate TIPC port and lock it
196 * 190 *
197 * Returns obtained reference if initialization is successful, zero otherwise 191 * Returns obtained reference if initialization is successful, zero otherwise
@@ -209,7 +203,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
209 } 203 }
210 204
211 p_ptr->max_pkt = MAX_PKT_DEFAULT; 205 p_ptr->max_pkt = MAX_PKT_DEFAULT;
212 p_ptr->sent = 1;
213 p_ptr->ref = ref; 206 p_ptr->ref = ref;
214 INIT_LIST_HEAD(&p_ptr->wait_list); 207 INIT_LIST_HEAD(&p_ptr->wait_list);
215 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 208 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
@@ -459,10 +452,9 @@ void tipc_acknowledge(u32 ref, u32 ack)
459 p_ptr = tipc_port_lock(ref); 452 p_ptr = tipc_port_lock(ref);
460 if (!p_ptr) 453 if (!p_ptr)
461 return; 454 return;
462 if (p_ptr->connected) { 455 if (p_ptr->connected)
463 p_ptr->conn_unacked -= ack;
464 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); 456 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
465 } 457
466 tipc_port_unlock(p_ptr); 458 tipc_port_unlock(p_ptr);
467 if (!buf) 459 if (!buf)
468 return; 460 return;
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 3a4f808135c3..0e47052daa29 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -53,17 +53,13 @@
53 * @connected: non-zero if port is currently connected to a peer port 53 * @connected: non-zero if port is currently connected to a peer port
54 * @conn_type: TIPC type used when connection was established 54 * @conn_type: TIPC type used when connection was established
55 * @conn_instance: TIPC instance used when connection was established 55 * @conn_instance: TIPC instance used when connection was established
56 * @conn_unacked: number of unacknowledged messages received from peer port
57 * @published: non-zero if port has one or more associated names 56 * @published: non-zero if port has one or more associated names
58 * @congested: non-zero if cannot send because of link or port congestion
59 * @max_pkt: maximum packet size "hint" used when building messages sent by port 57 * @max_pkt: maximum packet size "hint" used when building messages sent by port
60 * @ref: unique reference to port in TIPC object registry 58 * @ref: unique reference to port in TIPC object registry
61 * @phdr: preformatted message header used when sending messages 59 * @phdr: preformatted message header used when sending messages
62 * @port_list: adjacent ports in TIPC's global list of ports 60 * @port_list: adjacent ports in TIPC's global list of ports
63 * @wait_list: adjacent ports in list of ports waiting on link congestion 61 * @wait_list: adjacent ports in list of ports waiting on link congestion
64 * @waiting_pkts: 62 * @waiting_pkts:
65 * @sent: # of non-empty messages sent by port
66 * @acked: # of non-empty message acknowledgements from connected port's peer
67 * @publications: list of publications for port 63 * @publications: list of publications for port
68 * @pub_count: total # of publications port has made during its lifetime 64 * @pub_count: total # of publications port has made during its lifetime
69 * @probing_state: 65 * @probing_state:
@@ -76,17 +72,13 @@ struct tipc_port {
76 int connected; 72 int connected;
77 u32 conn_type; 73 u32 conn_type;
78 u32 conn_instance; 74 u32 conn_instance;
79 u32 conn_unacked;
80 int published; 75 int published;
81 u32 congested;
82 u32 max_pkt; 76 u32 max_pkt;
83 u32 ref; 77 u32 ref;
84 struct tipc_msg phdr; 78 struct tipc_msg phdr;
85 struct list_head port_list; 79 struct list_head port_list;
86 struct list_head wait_list; 80 struct list_head wait_list;
87 u32 waiting_pkts; 81 u32 waiting_pkts;
88 u32 sent;
89 u32 acked;
90 struct list_head publications; 82 struct list_head publications;
91 u32 pub_count; 83 u32 pub_count;
92 u32 probing_state; 84 u32 probing_state;
@@ -120,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
120 112
121int tipc_port_shutdown(u32 ref); 113int tipc_port_shutdown(u32 ref);
122 114
123void tipc_port_wakeup(struct tipc_port *port);
124
125/* 115/*
126 * The following routines require that the port be locked on entry 116 * The following routines require that the port be locked on entry
127 */ 117 */
@@ -161,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
161 spin_unlock_bh(p_ptr->lock); 151 spin_unlock_bh(p_ptr->lock);
162} 152}
163 153
164static inline int tipc_port_congested(struct tipc_port *p_ptr)
165{
166 return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
167}
168
169
170static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) 154static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
171{ 155{
172 return msg_destnode(&p_ptr->phdr); 156 return msg_destnode(&p_ptr->phdr);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1762323156af..ede78b144dcf 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -207,7 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
207 sk->sk_data_ready = tipc_data_ready; 207 sk->sk_data_ready = tipc_data_ready;
208 sk->sk_write_space = tipc_write_space; 208 sk->sk_write_space = tipc_write_space;
209 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; 209 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
210 tsk->port.sent = 0; 210 tsk->sent_unacked = 0;
211 atomic_set(&tsk->dupl_rcvcnt, 0); 211 atomic_set(&tsk->dupl_rcvcnt, 0);
212 tipc_port_unlock(port); 212 tipc_port_unlock(port);
213 213
@@ -513,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
513 513
514 switch ((int)sock->state) { 514 switch ((int)sock->state) {
515 case SS_UNCONNECTED: 515 case SS_UNCONNECTED:
516 if (!tsk->port.congested) 516 if (!tsk->link_cong)
517 mask |= POLLOUT; 517 mask |= POLLOUT;
518 break; 518 break;
519 case SS_READY: 519 case SS_READY:
520 case SS_CONNECTED: 520 case SS_CONNECTED:
521 if (!tsk->port.congested) 521 if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
522 mask |= POLLOUT; 522 mask |= POLLOUT;
523 /* fall thru' */ 523 /* fall thru' */
524 case SS_CONNECTING: 524 case SS_CONNECTING:
@@ -546,7 +546,7 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
546{ 546{
547 struct tipc_msg *msg = buf_msg(buf); 547 struct tipc_msg *msg = buf_msg(buf);
548 struct tipc_port *port = &tsk->port; 548 struct tipc_port *port = &tsk->port;
549 int wakeable; 549 int conn_cong;
550 550
551 /* Ignore if connection cannot be validated: */ 551 /* Ignore if connection cannot be validated: */
552 if (!port->connected || !tipc_port_peer_msg(port, msg)) 552 if (!port->connected || !tipc_port_peer_msg(port, msg))
@@ -555,13 +555,10 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
555 port->probing_state = TIPC_CONN_OK; 555 port->probing_state = TIPC_CONN_OK;
556 556
557 if (msg_type(msg) == CONN_ACK) { 557 if (msg_type(msg) == CONN_ACK) {
558 wakeable = tipc_port_congested(port) && port->congested; 558 conn_cong = tipc_sk_conn_cong(tsk);
559 port->acked += msg_msgcnt(msg); 559 tsk->sent_unacked -= msg_msgcnt(msg);
560 if (!tipc_port_congested(port)) { 560 if (conn_cong)
561 port->congested = 0; 561 tipc_sock_wakeup(tsk);
562 if (wakeable)
563 tipc_port_wakeup(port);
564 }
565 } else if (msg_type(msg) == CONN_PROBE) { 562 } else if (msg_type(msg) == CONN_PROBE) {
566 if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) 563 if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
567 return TIPC_OK; 564 return TIPC_OK;
@@ -626,7 +623,7 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
626 return sock_intr_errno(*timeo_p); 623 return sock_intr_errno(*timeo_p);
627 624
628 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); 625 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
629 done = sk_wait_event(sk, timeo_p, !tsk->port.congested); 626 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
630 finish_wait(sk_sleep(sk), &wait); 627 finish_wait(sk_sleep(sk), &wait);
631 } while (!done); 628 } while (!done);
632 return 0; 629 return 0;
@@ -800,7 +797,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
800{ 797{
801 struct sock *sk = sock->sk; 798 struct sock *sk = sock->sk;
802 struct tipc_sock *tsk = tipc_sk(sk); 799 struct tipc_sock *tsk = tipc_sk(sk);
803 struct tipc_port *port = &tsk->port;
804 DEFINE_WAIT(wait); 800 DEFINE_WAIT(wait);
805 int done; 801 int done;
806 802
@@ -819,7 +815,9 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
819 815
820 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); 816 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
821 done = sk_wait_event(sk, timeo_p, 817 done = sk_wait_event(sk, timeo_p,
822 (!port->congested || !port->connected)); 818 (!tsk->link_cong &&
819 !tipc_sk_conn_cong(tsk)) ||
820 !tsk->port.connected);
823 finish_wait(sk_sleep(sk), &wait); 821 finish_wait(sk_sleep(sk), &wait);
824 } while (!done); 822 } while (!done);
825 return 0; 823 return 0;
@@ -856,7 +854,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
856 if (unlikely(dest)) { 854 if (unlikely(dest)) {
857 rc = tipc_sendmsg(iocb, sock, m, dsz); 855 rc = tipc_sendmsg(iocb, sock, m, dsz);
858 if (dsz && (dsz == rc)) 856 if (dsz && (dsz == rc))
859 tsk->port.sent = 1; 857 tsk->sent_unacked = 1;
860 return rc; 858 return rc;
861 } 859 }
862 if (dsz > (uint)INT_MAX) 860 if (dsz > (uint)INT_MAX)
@@ -875,7 +873,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
875 873
876 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 874 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
877 dnode = tipc_port_peernode(port); 875 dnode = tipc_port_peernode(port);
878 port->congested = 1;
879 876
880next: 877next:
881 mtu = port->max_pkt; 878 mtu = port->max_pkt;
@@ -884,11 +881,10 @@ next:
884 if (unlikely(rc < 0)) 881 if (unlikely(rc < 0))
885 goto exit; 882 goto exit;
886 do { 883 do {
887 port->congested = 1; 884 if (likely(!tipc_sk_conn_cong(tsk))) {
888 if (likely(!tipc_port_congested(port))) {
889 rc = tipc_link_xmit2(buf, dnode, ref); 885 rc = tipc_link_xmit2(buf, dnode, ref);
890 if (likely(!rc)) { 886 if (likely(!rc)) {
891 port->sent++; 887 tsk->sent_unacked++;
892 sent += send; 888 sent += send;
893 if (sent == dsz) 889 if (sent == dsz)
894 break; 890 break;
@@ -903,8 +899,6 @@ next:
903 } 899 }
904 rc = tipc_wait_for_sndpkt(sock, &timeo); 900 rc = tipc_wait_for_sndpkt(sock, &timeo);
905 } while (!rc); 901 } while (!rc);
906
907 port->congested = 0;
908exit: 902exit:
909 if (iocb) 903 if (iocb)
910 release_sock(sk); 904 release_sock(sk);
@@ -1169,8 +1163,10 @@ restart:
1169 /* Consume received message (optional) */ 1163 /* Consume received message (optional) */
1170 if (likely(!(flags & MSG_PEEK))) { 1164 if (likely(!(flags & MSG_PEEK))) {
1171 if ((sock->state != SS_READY) && 1165 if ((sock->state != SS_READY) &&
1172 (++port->conn_unacked >= TIPC_CONNACK_INTV)) 1166 (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1173 tipc_acknowledge(port->ref, port->conn_unacked); 1167 tipc_acknowledge(port->ref, tsk->rcv_unacked);
1168 tsk->rcv_unacked = 0;
1169 }
1174 advance_rx_queue(sk); 1170 advance_rx_queue(sk);
1175 } 1171 }
1176exit: 1172exit:
@@ -1278,8 +1274,10 @@ restart:
1278 1274
1279 /* Consume received message (optional) */ 1275 /* Consume received message (optional) */
1280 if (likely(!(flags & MSG_PEEK))) { 1276 if (likely(!(flags & MSG_PEEK))) {
1281 if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) 1277 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1282 tipc_acknowledge(port->ref, port->conn_unacked); 1278 tipc_acknowledge(port->ref, tsk->rcv_unacked);
1279 tsk->rcv_unacked = 0;
1280 }
1283 advance_rx_queue(sk); 1281 advance_rx_queue(sk);
1284 } 1282 }
1285 1283
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 69fd06bce68a..2cdede9eda1b 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -48,6 +48,9 @@
48 * @peer_name: the peer of the connection, if any 48 * @peer_name: the peer of the connection, if any
49 * @conn_timeout: the time we can wait for an unresponded setup request 49 * @conn_timeout: the time we can wait for an unresponded setup request
50 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 50 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
51 * @link_cong: non-zero if owner must sleep because of link congestion
52 * @sent_unacked: # messages sent by socket, and not yet acked by peer
53 * @rcv_unacked: # messages read by user, but not yet acked back to peer
51 */ 54 */
52 55
53struct tipc_sock { 56struct tipc_sock {
@@ -55,6 +58,9 @@ struct tipc_sock {
55 struct tipc_port port; 58 struct tipc_port port;
56 unsigned int conn_timeout; 59 unsigned int conn_timeout;
57 atomic_t dupl_rcvcnt; 60 atomic_t dupl_rcvcnt;
61 int link_cong;
62 uint sent_unacked;
63 uint rcv_unacked;
58}; 64};
59 65
60static inline struct tipc_sock *tipc_sk(const struct sock *sk) 66static inline struct tipc_sock *tipc_sk(const struct sock *sk)
@@ -72,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
72 tsk->sk.sk_write_space(&tsk->sk); 78 tsk->sk.sk_write_space(&tsk->sk);
73} 79}
74 80
81static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
82{
83 return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
84}
85
75int tipc_sk_rcv(struct sk_buff *buf); 86int tipc_sk_rcv(struct sk_buff *buf);
76 87
77#endif 88#endif