summaryrefslogtreecommitdiffstats
path: root/net/tipc/server.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-02-15 04:40:44 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 15:26:33 -0500
commitdf79d040dcd7d7e580c50edf40b82e677fe84801 (patch)
treef2d1629f209ee648d71ac4278ce490b0f648205f /net/tipc/server.c
parentc901d26d4a8137f3ad0e5865d331f7c63c42d9f9 (diff)
tipc: eliminate struct tipc_subscriber
It is unnecessary to keep two structures, struct tipc_conn and struct tipc_subscriber, with a one-to-one relationship and still with different life cycles. The fact that the two often run in different contexts, and still may access each other via direct pointers constitutes an additional hazard, something we have experienced at several occasions, and still see happening. We have identified at least two remaining problems that are easier to fix if we simplify the topology server data structure somewhat. - When there is a race between a subscription up/down event and a timeout event, it is fully possible that the former might be delivered after the latter, leading to confusion for the receiver. - The function tipc_subcrp_timeout() is executing in interrupt context, while the following call chain is at least theoretically possible: tipc_subscrp_timeout() tipc_subscrp_send_event() tipc_conn_sendmsg() conn_put() tipc_conn_kref_release() sock_release(sock) I.e., we end up calling a function that might try to sleep in interrupt context. To eliminate this, we need to ensure that the tipc_conn structure and the socket, as well as the subscription instances, only are deleted in work queue context, i.e., after the timeout event really has been sent out. We now remove this unnecessary complexity, by merging data and functionality of the subscriber structure into struct tipc_conn and the associated file server.c. We thereafter add a spinlock and a new 'inactive' state to the subscription structure. Using those, both problems described above can be easily solved. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/server.c')
-rw-r--r--net/tipc/server.c161
1 files changed, 105 insertions, 56 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 8aa2a33b1e48..b8268c0882a7 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -2,6 +2,7 @@
2 * net/tipc/server.c: TIPC server infrastructure 2 * net/tipc/server.c: TIPC server infrastructure
3 * 3 *
4 * Copyright (c) 2012-2013, Wind River Systems 4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved. 6 * All rights reserved.
6 * 7 *
7 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -57,12 +58,13 @@
57 * @sock: socket handler associated with connection 58 * @sock: socket handler associated with connection
58 * @flags: indicates connection state 59 * @flags: indicates connection state
59 * @server: pointer to connected server 60 * @server: pointer to connected server
61 * @sub_list: lsit to all pertaing subscriptions
62 * @sub_lock: lock protecting the subscription list
63 * @outqueue_lock: control access to the outqueue
60 * @rwork: receive work item 64 * @rwork: receive work item
61 * @usr_data: user-specified field
62 * @rx_action: what to do when connection socket is active 65 * @rx_action: what to do when connection socket is active
63 * @outqueue: pointer to first outbound message in queue 66 * @outqueue: pointer to first outbound message in queue
64 * @outqueue_lock: control access to the outqueue 67 * @outqueue_lock: control access to the outqueue
65 * @outqueue: list of connection objects for its server
66 * @swork: send work item 68 * @swork: send work item
67 */ 69 */
68struct tipc_conn { 70struct tipc_conn {
@@ -71,9 +73,10 @@ struct tipc_conn {
71 struct socket *sock; 73 struct socket *sock;
72 unsigned long flags; 74 unsigned long flags;
73 struct tipc_server *server; 75 struct tipc_server *server;
76 struct list_head sub_list;
77 spinlock_t sub_lock; /* for subscription list */
74 struct work_struct rwork; 78 struct work_struct rwork;
75 int (*rx_action) (struct tipc_conn *con); 79 int (*rx_action) (struct tipc_conn *con);
76 void *usr_data;
77 struct list_head outqueue; 80 struct list_head outqueue;
78 spinlock_t outqueue_lock; 81 spinlock_t outqueue_lock;
79 struct work_struct swork; 82 struct work_struct swork;
@@ -81,6 +84,7 @@ struct tipc_conn {
81 84
82/* An entry waiting to be sent */ 85/* An entry waiting to be sent */
83struct outqueue_entry { 86struct outqueue_entry {
87 u32 evt;
84 struct list_head list; 88 struct list_head list;
85 struct kvec iov; 89 struct kvec iov;
86}; 90};
@@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work);
89static void tipc_send_work(struct work_struct *work); 93static void tipc_send_work(struct work_struct *work);
90static void tipc_clean_outqueues(struct tipc_conn *con); 94static void tipc_clean_outqueues(struct tipc_conn *con);
91 95
96static bool connected(struct tipc_conn *con)
97{
98 return con && test_bit(CF_CONNECTED, &con->flags);
99}
100
101/**
102 * htohl - convert value to endianness used by destination
103 * @in: value to convert
104 * @swap: non-zero if endianness must be reversed
105 *
106 * Returns converted value
107 */
108static u32 htohl(u32 in, int swap)
109{
110 return swap ? swab32(in) : in;
111}
112
92static void tipc_conn_kref_release(struct kref *kref) 113static void tipc_conn_kref_release(struct kref *kref)
93{ 114{
94 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); 115 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
95 struct tipc_server *s = con->server; 116 struct tipc_server *s = con->server;
96 struct socket *sock = con->sock; 117 struct socket *sock = con->sock;
97 struct sock *sk;
98 118
99 if (sock) { 119 if (sock) {
100 sk = sock->sk;
101 if (test_bit(CF_SERVER, &con->flags)) { 120 if (test_bit(CF_SERVER, &con->flags)) {
102 __module_get(sock->ops->owner); 121 __module_get(sock->ops->owner);
103 __module_get(sk->sk_prot_creator->owner); 122 __module_get(sock->sk->sk_prot_creator->owner);
104 } 123 }
105 sock_release(sock); 124 sock_release(sock);
106 con->sock = NULL; 125 con->sock = NULL;
@@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
129 148
130 spin_lock_bh(&s->idr_lock); 149 spin_lock_bh(&s->idr_lock);
131 con = idr_find(&s->conn_idr, conid); 150 con = idr_find(&s->conn_idr, conid);
132 if (con) { 151 if (!connected(con) || !kref_get_unless_zero(&con->kref))
133 if (!test_bit(CF_CONNECTED, &con->flags) || 152 con = NULL;
134 !kref_get_unless_zero(&con->kref))
135 con = NULL;
136 }
137 spin_unlock_bh(&s->idr_lock); 153 spin_unlock_bh(&s->idr_lock);
138 return con; 154 return con;
139} 155}
@@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk)
144 160
145 read_lock_bh(&sk->sk_callback_lock); 161 read_lock_bh(&sk->sk_callback_lock);
146 con = sock2con(sk); 162 con = sock2con(sk);
147 if (con && test_bit(CF_CONNECTED, &con->flags)) { 163 if (connected(con)) {
148 conn_get(con); 164 conn_get(con);
149 if (!queue_work(con->server->rcv_wq, &con->rwork)) 165 if (!queue_work(con->server->rcv_wq, &con->rwork))
150 conn_put(con); 166 conn_put(con);
@@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk)
158 174
159 read_lock_bh(&sk->sk_callback_lock); 175 read_lock_bh(&sk->sk_callback_lock);
160 con = sock2con(sk); 176 con = sock2con(sk);
161 if (con && test_bit(CF_CONNECTED, &con->flags)) { 177 if (connected(con)) {
162 conn_get(con); 178 conn_get(con);
163 if (!queue_work(con->server->send_wq, &con->swork)) 179 if (!queue_work(con->server->send_wq, &con->swork))
164 conn_put(con); 180 conn_put(con);
@@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
181 write_unlock_bh(&sk->sk_callback_lock); 197 write_unlock_bh(&sk->sk_callback_lock);
182} 198}
183 199
200/* tipc_con_delete_sub - delete a specific or all subscriptions
201 * for a given subscriber
202 */
203static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
204{
205 struct list_head *sub_list = &con->sub_list;
206 struct tipc_subscription *sub, *tmp;
207
208 spin_lock_bh(&con->sub_lock);
209 list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
210 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
211 tipc_sub_delete(sub);
212 else if (s)
213 break;
214 }
215 spin_unlock_bh(&con->sub_lock);
216}
217
184static void tipc_close_conn(struct tipc_conn *con) 218static void tipc_close_conn(struct tipc_conn *con)
185{ 219{
186 struct sock *sk = con->sock->sk; 220 struct sock *sk = con->sock->sk;
@@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con)
188 222
189 write_lock_bh(&sk->sk_callback_lock); 223 write_lock_bh(&sk->sk_callback_lock);
190 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); 224 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
225
191 if (disconnect) { 226 if (disconnect) {
192 sk->sk_user_data = NULL; 227 sk->sk_user_data = NULL;
193 if (con->conid) 228 if (con->conid)
194 tipc_subscrb_delete(con->usr_data); 229 tipc_con_delete_sub(con, NULL);
195 } 230 }
196 write_unlock_bh(&sk->sk_callback_lock); 231 write_unlock_bh(&sk->sk_callback_lock);
197 232
@@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
215 250
216 kref_init(&con->kref); 251 kref_init(&con->kref);
217 INIT_LIST_HEAD(&con->outqueue); 252 INIT_LIST_HEAD(&con->outqueue);
253 INIT_LIST_HEAD(&con->sub_list);
218 spin_lock_init(&con->outqueue_lock); 254 spin_lock_init(&con->outqueue_lock);
255 spin_lock_init(&con->sub_lock);
219 INIT_WORK(&con->swork, tipc_send_work); 256 INIT_WORK(&con->swork, tipc_send_work);
220 INIT_WORK(&con->rwork, tipc_recv_work); 257 INIT_WORK(&con->rwork, tipc_recv_work);
221 258
@@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
236 return con; 273 return con;
237} 274}
238 275
276int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
277 void *buf, size_t len)
278{
279 struct tipc_subscr *s = (struct tipc_subscr *)buf;
280 struct tipc_subscription *sub;
281 bool status;
282 int swap;
283
284 /* Determine subscriber's endianness */
285 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
286 TIPC_SUB_CANCEL));
287
288 /* Detect & process a subscription cancellation request */
289 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
290 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
291 tipc_con_delete_sub(con, s);
292 return 0;
293 }
294 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
295 sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
296 if (!sub)
297 return -1;
298
299 spin_lock_bh(&con->sub_lock);
300 list_add(&sub->subscrp_list, &con->sub_list);
301 spin_unlock_bh(&con->sub_lock);
302 return 0;
303}
304
239static int tipc_receive_from_sock(struct tipc_conn *con) 305static int tipc_receive_from_sock(struct tipc_conn *con)
240{ 306{
241 struct tipc_server *s = con->server; 307 struct tipc_server *s = con->server;
@@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
262 } 328 }
263 329
264 read_lock_bh(&sk->sk_callback_lock); 330 read_lock_bh(&sk->sk_callback_lock);
265 if (test_bit(CF_CONNECTED, &con->flags)) 331 ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
266 ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
267 con->usr_data, buf, ret);
268 read_unlock_bh(&sk->sk_callback_lock); 332 read_unlock_bh(&sk->sk_callback_lock);
269 kmem_cache_free(s->rcvbuf_cache, buf); 333 kmem_cache_free(s->rcvbuf_cache, buf);
270 if (ret < 0) 334 if (ret < 0)
@@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
302 newcon->rx_action = tipc_receive_from_sock; 366 newcon->rx_action = tipc_receive_from_sock;
303 tipc_register_callbacks(newsock, newcon); 367 tipc_register_callbacks(newsock, newcon);
304 368
305 /* Notify that new connection is incoming */
306 newcon->usr_data = tipc_subscrb_create(newcon->conid);
307
308 if (!newcon->usr_data) {
309 sock_release(newsock);
310 conn_put(newcon);
311 return -ENOMEM;
312 }
313
314 /* Wake up receive process in case of 'SYN+' message */ 369 /* Wake up receive process in case of 'SYN+' message */
315 newsock->sk->sk_data_ready(newsock->sk); 370 newsock->sk->sk_data_ready(newsock->sk);
316 return ret; 371 return ret;
@@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
427} 482}
428 483
429int tipc_conn_sendmsg(struct tipc_server *s, int conid, 484int tipc_conn_sendmsg(struct tipc_server *s, int conid,
430 void *data, size_t len) 485 u32 evt, void *data, size_t len)
431{ 486{
432 struct outqueue_entry *e; 487 struct outqueue_entry *e;
433 struct tipc_conn *con; 488 struct tipc_conn *con;
@@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
436 if (!con) 491 if (!con)
437 return -EINVAL; 492 return -EINVAL;
438 493
439 if (!test_bit(CF_CONNECTED, &con->flags)) { 494 if (!connected(con)) {
440 conn_put(con); 495 conn_put(con);
441 return 0; 496 return 0;
442 } 497 }
@@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
446 conn_put(con); 501 conn_put(con);
447 return -ENOMEM; 502 return -ENOMEM;
448 } 503 }
449 504 e->evt = evt;
450 spin_lock_bh(&con->outqueue_lock); 505 spin_lock_bh(&con->outqueue_lock);
451 list_add_tail(&e->list, &con->outqueue); 506 list_add_tail(&e->list, &con->outqueue);
452 spin_unlock_bh(&con->outqueue_lock); 507 spin_unlock_bh(&con->outqueue_lock);
@@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
470bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 525bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
471 u32 upper, u32 filter, int *conid) 526 u32 upper, u32 filter, int *conid)
472{ 527{
473 struct tipc_subscriber *scbr;
474 struct tipc_subscr sub; 528 struct tipc_subscr sub;
475 struct tipc_server *s;
476 struct tipc_conn *con; 529 struct tipc_conn *con;
530 int rc;
477 531
478 sub.seq.type = type; 532 sub.seq.type = type;
479 sub.seq.lower = lower; 533 sub.seq.lower = lower;
@@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
487 return false; 541 return false;
488 542
489 *conid = con->conid; 543 *conid = con->conid;
490 s = con->server;
491 scbr = tipc_subscrb_create(*conid);
492 if (!scbr) {
493 conn_put(con);
494 return false;
495 }
496
497 con->usr_data = scbr;
498 con->sock = NULL; 544 con->sock = NULL;
499 tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); 545 rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
500 return true; 546 if (rc < 0)
547 tipc_close_conn(con);
548 return !rc;
501} 549}
502 550
503void tipc_topsrv_kern_unsubscr(struct net *net, int conid) 551void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
504{ 552{
505 struct tipc_conn *con; 553 struct tipc_conn *con;
506 struct tipc_server *srv;
507 554
508 con = tipc_conn_lookup(tipc_topsrv(net), conid); 555 con = tipc_conn_lookup(tipc_topsrv(net), conid);
509 if (!con) 556 if (!con)
510 return; 557 return;
511 558
512 test_and_clear_bit(CF_CONNECTED, &con->flags); 559 test_and_clear_bit(CF_CONNECTED, &con->flags);
513 srv = con->server; 560 tipc_con_delete_sub(con, NULL);
514 if (con->conid)
515 tipc_subscrb_delete(con->usr_data);
516 conn_put(con); 561 conn_put(con);
517 conn_put(con); 562 conn_put(con);
518} 563}
@@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
537 582
538static void tipc_send_to_sock(struct tipc_conn *con) 583static void tipc_send_to_sock(struct tipc_conn *con)
539{ 584{
540 struct tipc_server *s = con->server; 585 struct list_head *queue = &con->outqueue;
586 struct tipc_server *srv = con->server;
541 struct outqueue_entry *e; 587 struct outqueue_entry *e;
542 struct tipc_event *evt; 588 struct tipc_event *evt;
543 struct msghdr msg; 589 struct msghdr msg;
@@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con)
545 int ret; 591 int ret;
546 592
547 spin_lock_bh(&con->outqueue_lock); 593 spin_lock_bh(&con->outqueue_lock);
548 while (test_bit(CF_CONNECTED, &con->flags)) { 594
549 e = list_entry(con->outqueue.next, struct outqueue_entry, list); 595 while (!list_empty(queue)) {
550 if ((struct list_head *) e == &con->outqueue) 596 e = list_first_entry(queue, struct outqueue_entry, list);
551 break;
552 597
553 spin_unlock_bh(&con->outqueue_lock); 598 spin_unlock_bh(&con->outqueue_lock);
554 599
600 if (e->evt == TIPC_SUBSCR_TIMEOUT) {
601 evt = (struct tipc_event *)e->iov.iov_base;
602 tipc_con_delete_sub(con, &evt->s);
603 }
604 memset(&msg, 0, sizeof(msg));
605 msg.msg_flags = MSG_DONTWAIT;
606
555 if (con->sock) { 607 if (con->sock) {
556 memset(&msg, 0, sizeof(msg));
557 msg.msg_flags = MSG_DONTWAIT;
558 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, 608 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
559 e->iov.iov_len); 609 e->iov.iov_len);
560 if (ret == -EWOULDBLOCK || ret == 0) { 610 if (ret == -EWOULDBLOCK || ret == 0) {
@@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
565 } 615 }
566 } else { 616 } else {
567 evt = e->iov.iov_base; 617 evt = e->iov.iov_base;
568 tipc_send_kern_top_evt(s->net, evt); 618 tipc_send_kern_top_evt(srv->net, evt);
569 } 619 }
570 620
571 /* Don't starve users filling buffers */ 621 /* Don't starve users filling buffers */
@@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
573 cond_resched(); 623 cond_resched();
574 count = 0; 624 count = 0;
575 } 625 }
576
577 spin_lock_bh(&con->outqueue_lock); 626 spin_lock_bh(&con->outqueue_lock);
578 list_del(&e->list); 627 list_del(&e->list);
579 tipc_free_entry(e); 628 tipc_free_entry(e);
@@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work)
591 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); 640 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
592 int count = 0; 641 int count = 0;
593 642
594 while (test_bit(CF_CONNECTED, &con->flags)) { 643 while (connected(con)) {
595 if (con->rx_action(con)) 644 if (con->rx_action(con))
596 break; 645 break;
597 646
@@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work)
608{ 657{
609 struct tipc_conn *con = container_of(work, struct tipc_conn, swork); 658 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
610 659
611 if (test_bit(CF_CONNECTED, &con->flags)) 660 if (connected(con))
612 tipc_send_to_sock(con); 661 tipc_send_to_sock(con);
613 662
614 conn_put(con); 663 conn_put(con);