summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-02-15 04:40:44 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 15:26:33 -0500
commitdf79d040dcd7d7e580c50edf40b82e677fe84801 (patch)
treef2d1629f209ee648d71ac4278ce490b0f648205f
parentc901d26d4a8137f3ad0e5865d331f7c63c42d9f9 (diff)
tipc: eliminate struct tipc_subscriber
It is unnecessary to keep two structures, struct tipc_conn and struct tipc_subscriber, with a one-to-one relationship and still with different life cycles. The fact that the two often run in different contexts, and still may access each other via direct pointers constitutes an additional hazard, something we have experienced at several occasions, and still see happening. We have identified at least two remaining problems that are easier to fix if we simplify the topology server data structure somewhat. - When there is a race between a subscription up/down event and a timeout event, it is fully possible that the former might be delivered after the latter, leading to confusion for the receiver. - The function tipc_subcrp_timeout() is executing in interrupt context, while the following call chain is at least theoretically possible: tipc_subscrp_timeout() tipc_subscrp_send_event() tipc_conn_sendmsg() conn_put() tipc_conn_kref_release() sock_release(sock) I.e., we end up calling a function that might try to sleep in interrupt context. To eliminate this, we need to ensure that the tipc_conn structure and the socket, as well as the subscription instances, only are deleted in work queue context, i.e., after the timeout event really has been sent out. We now remove this unnecessary complexity, by merging data and functionality of the subscriber structure into struct tipc_conn and the associated file server.c. We thereafter add a spinlock and a new 'inactive' state to the subscription structure. Using those, both problems described above can be easily solved. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/server.c161
-rw-r--r--net/tipc/server.h2
-rw-r--r--net/tipc/subscr.c173
-rw-r--r--net/tipc/subscr.h17
4 files changed, 146 insertions, 207 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 8aa2a33b1e48..b8268c0882a7 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -2,6 +2,7 @@
2 * net/tipc/server.c: TIPC server infrastructure 2 * net/tipc/server.c: TIPC server infrastructure
3 * 3 *
4 * Copyright (c) 2012-2013, Wind River Systems 4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved. 6 * All rights reserved.
6 * 7 *
7 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -57,12 +58,13 @@
57 * @sock: socket handler associated with connection 58 * @sock: socket handler associated with connection
58 * @flags: indicates connection state 59 * @flags: indicates connection state
59 * @server: pointer to connected server 60 * @server: pointer to connected server
61 * @sub_list: lsit to all pertaing subscriptions
62 * @sub_lock: lock protecting the subscription list
63 * @outqueue_lock: control access to the outqueue
60 * @rwork: receive work item 64 * @rwork: receive work item
61 * @usr_data: user-specified field
62 * @rx_action: what to do when connection socket is active 65 * @rx_action: what to do when connection socket is active
63 * @outqueue: pointer to first outbound message in queue 66 * @outqueue: pointer to first outbound message in queue
64 * @outqueue_lock: control access to the outqueue 67 * @outqueue_lock: control access to the outqueue
65 * @outqueue: list of connection objects for its server
66 * @swork: send work item 68 * @swork: send work item
67 */ 69 */
68struct tipc_conn { 70struct tipc_conn {
@@ -71,9 +73,10 @@ struct tipc_conn {
71 struct socket *sock; 73 struct socket *sock;
72 unsigned long flags; 74 unsigned long flags;
73 struct tipc_server *server; 75 struct tipc_server *server;
76 struct list_head sub_list;
77 spinlock_t sub_lock; /* for subscription list */
74 struct work_struct rwork; 78 struct work_struct rwork;
75 int (*rx_action) (struct tipc_conn *con); 79 int (*rx_action) (struct tipc_conn *con);
76 void *usr_data;
77 struct list_head outqueue; 80 struct list_head outqueue;
78 spinlock_t outqueue_lock; 81 spinlock_t outqueue_lock;
79 struct work_struct swork; 82 struct work_struct swork;
@@ -81,6 +84,7 @@ struct tipc_conn {
81 84
82/* An entry waiting to be sent */ 85/* An entry waiting to be sent */
83struct outqueue_entry { 86struct outqueue_entry {
87 u32 evt;
84 struct list_head list; 88 struct list_head list;
85 struct kvec iov; 89 struct kvec iov;
86}; 90};
@@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work);
89static void tipc_send_work(struct work_struct *work); 93static void tipc_send_work(struct work_struct *work);
90static void tipc_clean_outqueues(struct tipc_conn *con); 94static void tipc_clean_outqueues(struct tipc_conn *con);
91 95
96static bool connected(struct tipc_conn *con)
97{
98 return con && test_bit(CF_CONNECTED, &con->flags);
99}
100
101/**
102 * htohl - convert value to endianness used by destination
103 * @in: value to convert
104 * @swap: non-zero if endianness must be reversed
105 *
106 * Returns converted value
107 */
108static u32 htohl(u32 in, int swap)
109{
110 return swap ? swab32(in) : in;
111}
112
92static void tipc_conn_kref_release(struct kref *kref) 113static void tipc_conn_kref_release(struct kref *kref)
93{ 114{
94 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); 115 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
95 struct tipc_server *s = con->server; 116 struct tipc_server *s = con->server;
96 struct socket *sock = con->sock; 117 struct socket *sock = con->sock;
97 struct sock *sk;
98 118
99 if (sock) { 119 if (sock) {
100 sk = sock->sk;
101 if (test_bit(CF_SERVER, &con->flags)) { 120 if (test_bit(CF_SERVER, &con->flags)) {
102 __module_get(sock->ops->owner); 121 __module_get(sock->ops->owner);
103 __module_get(sk->sk_prot_creator->owner); 122 __module_get(sock->sk->sk_prot_creator->owner);
104 } 123 }
105 sock_release(sock); 124 sock_release(sock);
106 con->sock = NULL; 125 con->sock = NULL;
@@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
129 148
130 spin_lock_bh(&s->idr_lock); 149 spin_lock_bh(&s->idr_lock);
131 con = idr_find(&s->conn_idr, conid); 150 con = idr_find(&s->conn_idr, conid);
132 if (con) { 151 if (!connected(con) || !kref_get_unless_zero(&con->kref))
133 if (!test_bit(CF_CONNECTED, &con->flags) || 152 con = NULL;
134 !kref_get_unless_zero(&con->kref))
135 con = NULL;
136 }
137 spin_unlock_bh(&s->idr_lock); 153 spin_unlock_bh(&s->idr_lock);
138 return con; 154 return con;
139} 155}
@@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk)
144 160
145 read_lock_bh(&sk->sk_callback_lock); 161 read_lock_bh(&sk->sk_callback_lock);
146 con = sock2con(sk); 162 con = sock2con(sk);
147 if (con && test_bit(CF_CONNECTED, &con->flags)) { 163 if (connected(con)) {
148 conn_get(con); 164 conn_get(con);
149 if (!queue_work(con->server->rcv_wq, &con->rwork)) 165 if (!queue_work(con->server->rcv_wq, &con->rwork))
150 conn_put(con); 166 conn_put(con);
@@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk)
158 174
159 read_lock_bh(&sk->sk_callback_lock); 175 read_lock_bh(&sk->sk_callback_lock);
160 con = sock2con(sk); 176 con = sock2con(sk);
161 if (con && test_bit(CF_CONNECTED, &con->flags)) { 177 if (connected(con)) {
162 conn_get(con); 178 conn_get(con);
163 if (!queue_work(con->server->send_wq, &con->swork)) 179 if (!queue_work(con->server->send_wq, &con->swork))
164 conn_put(con); 180 conn_put(con);
@@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
181 write_unlock_bh(&sk->sk_callback_lock); 197 write_unlock_bh(&sk->sk_callback_lock);
182} 198}
183 199
200/* tipc_con_delete_sub - delete a specific or all subscriptions
201 * for a given subscriber
202 */
203static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
204{
205 struct list_head *sub_list = &con->sub_list;
206 struct tipc_subscription *sub, *tmp;
207
208 spin_lock_bh(&con->sub_lock);
209 list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
210 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
211 tipc_sub_delete(sub);
212 else if (s)
213 break;
214 }
215 spin_unlock_bh(&con->sub_lock);
216}
217
184static void tipc_close_conn(struct tipc_conn *con) 218static void tipc_close_conn(struct tipc_conn *con)
185{ 219{
186 struct sock *sk = con->sock->sk; 220 struct sock *sk = con->sock->sk;
@@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con)
188 222
189 write_lock_bh(&sk->sk_callback_lock); 223 write_lock_bh(&sk->sk_callback_lock);
190 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); 224 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
225
191 if (disconnect) { 226 if (disconnect) {
192 sk->sk_user_data = NULL; 227 sk->sk_user_data = NULL;
193 if (con->conid) 228 if (con->conid)
194 tipc_subscrb_delete(con->usr_data); 229 tipc_con_delete_sub(con, NULL);
195 } 230 }
196 write_unlock_bh(&sk->sk_callback_lock); 231 write_unlock_bh(&sk->sk_callback_lock);
197 232
@@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
215 250
216 kref_init(&con->kref); 251 kref_init(&con->kref);
217 INIT_LIST_HEAD(&con->outqueue); 252 INIT_LIST_HEAD(&con->outqueue);
253 INIT_LIST_HEAD(&con->sub_list);
218 spin_lock_init(&con->outqueue_lock); 254 spin_lock_init(&con->outqueue_lock);
255 spin_lock_init(&con->sub_lock);
219 INIT_WORK(&con->swork, tipc_send_work); 256 INIT_WORK(&con->swork, tipc_send_work);
220 INIT_WORK(&con->rwork, tipc_recv_work); 257 INIT_WORK(&con->rwork, tipc_recv_work);
221 258
@@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
236 return con; 273 return con;
237} 274}
238 275
276int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
277 void *buf, size_t len)
278{
279 struct tipc_subscr *s = (struct tipc_subscr *)buf;
280 struct tipc_subscription *sub;
281 bool status;
282 int swap;
283
284 /* Determine subscriber's endianness */
285 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
286 TIPC_SUB_CANCEL));
287
288 /* Detect & process a subscription cancellation request */
289 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
290 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
291 tipc_con_delete_sub(con, s);
292 return 0;
293 }
294 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
295 sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
296 if (!sub)
297 return -1;
298
299 spin_lock_bh(&con->sub_lock);
300 list_add(&sub->subscrp_list, &con->sub_list);
301 spin_unlock_bh(&con->sub_lock);
302 return 0;
303}
304
239static int tipc_receive_from_sock(struct tipc_conn *con) 305static int tipc_receive_from_sock(struct tipc_conn *con)
240{ 306{
241 struct tipc_server *s = con->server; 307 struct tipc_server *s = con->server;
@@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
262 } 328 }
263 329
264 read_lock_bh(&sk->sk_callback_lock); 330 read_lock_bh(&sk->sk_callback_lock);
265 if (test_bit(CF_CONNECTED, &con->flags)) 331 ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
266 ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
267 con->usr_data, buf, ret);
268 read_unlock_bh(&sk->sk_callback_lock); 332 read_unlock_bh(&sk->sk_callback_lock);
269 kmem_cache_free(s->rcvbuf_cache, buf); 333 kmem_cache_free(s->rcvbuf_cache, buf);
270 if (ret < 0) 334 if (ret < 0)
@@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
302 newcon->rx_action = tipc_receive_from_sock; 366 newcon->rx_action = tipc_receive_from_sock;
303 tipc_register_callbacks(newsock, newcon); 367 tipc_register_callbacks(newsock, newcon);
304 368
305 /* Notify that new connection is incoming */
306 newcon->usr_data = tipc_subscrb_create(newcon->conid);
307
308 if (!newcon->usr_data) {
309 sock_release(newsock);
310 conn_put(newcon);
311 return -ENOMEM;
312 }
313
314 /* Wake up receive process in case of 'SYN+' message */ 369 /* Wake up receive process in case of 'SYN+' message */
315 newsock->sk->sk_data_ready(newsock->sk); 370 newsock->sk->sk_data_ready(newsock->sk);
316 return ret; 371 return ret;
@@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
427} 482}
428 483
429int tipc_conn_sendmsg(struct tipc_server *s, int conid, 484int tipc_conn_sendmsg(struct tipc_server *s, int conid,
430 void *data, size_t len) 485 u32 evt, void *data, size_t len)
431{ 486{
432 struct outqueue_entry *e; 487 struct outqueue_entry *e;
433 struct tipc_conn *con; 488 struct tipc_conn *con;
@@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
436 if (!con) 491 if (!con)
437 return -EINVAL; 492 return -EINVAL;
438 493
439 if (!test_bit(CF_CONNECTED, &con->flags)) { 494 if (!connected(con)) {
440 conn_put(con); 495 conn_put(con);
441 return 0; 496 return 0;
442 } 497 }
@@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
446 conn_put(con); 501 conn_put(con);
447 return -ENOMEM; 502 return -ENOMEM;
448 } 503 }
449 504 e->evt = evt;
450 spin_lock_bh(&con->outqueue_lock); 505 spin_lock_bh(&con->outqueue_lock);
451 list_add_tail(&e->list, &con->outqueue); 506 list_add_tail(&e->list, &con->outqueue);
452 spin_unlock_bh(&con->outqueue_lock); 507 spin_unlock_bh(&con->outqueue_lock);
@@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
470bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 525bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
471 u32 upper, u32 filter, int *conid) 526 u32 upper, u32 filter, int *conid)
472{ 527{
473 struct tipc_subscriber *scbr;
474 struct tipc_subscr sub; 528 struct tipc_subscr sub;
475 struct tipc_server *s;
476 struct tipc_conn *con; 529 struct tipc_conn *con;
530 int rc;
477 531
478 sub.seq.type = type; 532 sub.seq.type = type;
479 sub.seq.lower = lower; 533 sub.seq.lower = lower;
@@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
487 return false; 541 return false;
488 542
489 *conid = con->conid; 543 *conid = con->conid;
490 s = con->server;
491 scbr = tipc_subscrb_create(*conid);
492 if (!scbr) {
493 conn_put(con);
494 return false;
495 }
496
497 con->usr_data = scbr;
498 con->sock = NULL; 544 con->sock = NULL;
499 tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); 545 rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
500 return true; 546 if (rc < 0)
547 tipc_close_conn(con);
548 return !rc;
501} 549}
502 550
503void tipc_topsrv_kern_unsubscr(struct net *net, int conid) 551void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
504{ 552{
505 struct tipc_conn *con; 553 struct tipc_conn *con;
506 struct tipc_server *srv;
507 554
508 con = tipc_conn_lookup(tipc_topsrv(net), conid); 555 con = tipc_conn_lookup(tipc_topsrv(net), conid);
509 if (!con) 556 if (!con)
510 return; 557 return;
511 558
512 test_and_clear_bit(CF_CONNECTED, &con->flags); 559 test_and_clear_bit(CF_CONNECTED, &con->flags);
513 srv = con->server; 560 tipc_con_delete_sub(con, NULL);
514 if (con->conid)
515 tipc_subscrb_delete(con->usr_data);
516 conn_put(con); 561 conn_put(con);
517 conn_put(con); 562 conn_put(con);
518} 563}
@@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
537 582
538static void tipc_send_to_sock(struct tipc_conn *con) 583static void tipc_send_to_sock(struct tipc_conn *con)
539{ 584{
540 struct tipc_server *s = con->server; 585 struct list_head *queue = &con->outqueue;
586 struct tipc_server *srv = con->server;
541 struct outqueue_entry *e; 587 struct outqueue_entry *e;
542 struct tipc_event *evt; 588 struct tipc_event *evt;
543 struct msghdr msg; 589 struct msghdr msg;
@@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con)
545 int ret; 591 int ret;
546 592
547 spin_lock_bh(&con->outqueue_lock); 593 spin_lock_bh(&con->outqueue_lock);
548 while (test_bit(CF_CONNECTED, &con->flags)) { 594
549 e = list_entry(con->outqueue.next, struct outqueue_entry, list); 595 while (!list_empty(queue)) {
550 if ((struct list_head *) e == &con->outqueue) 596 e = list_first_entry(queue, struct outqueue_entry, list);
551 break;
552 597
553 spin_unlock_bh(&con->outqueue_lock); 598 spin_unlock_bh(&con->outqueue_lock);
554 599
600 if (e->evt == TIPC_SUBSCR_TIMEOUT) {
601 evt = (struct tipc_event *)e->iov.iov_base;
602 tipc_con_delete_sub(con, &evt->s);
603 }
604 memset(&msg, 0, sizeof(msg));
605 msg.msg_flags = MSG_DONTWAIT;
606
555 if (con->sock) { 607 if (con->sock) {
556 memset(&msg, 0, sizeof(msg));
557 msg.msg_flags = MSG_DONTWAIT;
558 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, 608 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
559 e->iov.iov_len); 609 e->iov.iov_len);
560 if (ret == -EWOULDBLOCK || ret == 0) { 610 if (ret == -EWOULDBLOCK || ret == 0) {
@@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
565 } 615 }
566 } else { 616 } else {
567 evt = e->iov.iov_base; 617 evt = e->iov.iov_base;
568 tipc_send_kern_top_evt(s->net, evt); 618 tipc_send_kern_top_evt(srv->net, evt);
569 } 619 }
570 620
571 /* Don't starve users filling buffers */ 621 /* Don't starve users filling buffers */
@@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
573 cond_resched(); 623 cond_resched();
574 count = 0; 624 count = 0;
575 } 625 }
576
577 spin_lock_bh(&con->outqueue_lock); 626 spin_lock_bh(&con->outqueue_lock);
578 list_del(&e->list); 627 list_del(&e->list);
579 tipc_free_entry(e); 628 tipc_free_entry(e);
@@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work)
591 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); 640 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
592 int count = 0; 641 int count = 0;
593 642
594 while (test_bit(CF_CONNECTED, &con->flags)) { 643 while (connected(con)) {
595 if (con->rx_action(con)) 644 if (con->rx_action(con))
596 break; 645 break;
597 646
@@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work)
608{ 657{
609 struct tipc_conn *con = container_of(work, struct tipc_conn, swork); 658 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
610 659
611 if (test_bit(CF_CONNECTED, &con->flags)) 660 if (connected(con))
612 tipc_send_to_sock(con); 661 tipc_send_to_sock(con);
613 662
614 conn_put(con); 663 conn_put(con);
diff --git a/net/tipc/server.h b/net/tipc/server.h
index b4b83bdc8b20..fcc72321362d 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -77,7 +77,7 @@ struct tipc_server {
77}; 77};
78 78
79int tipc_conn_sendmsg(struct tipc_server *s, int conid, 79int tipc_conn_sendmsg(struct tipc_server *s, int conid,
80 void *data, size_t len); 80 u32 evt, void *data, size_t len);
81 81
82bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 82bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
83 u32 upper, u32 filter, int *conid); 83 u32 upper, u32 filter, int *conid);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index b86fbbf7a0b9..c6de1452db69 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/subscr.c: TIPC network topology service 2 * net/tipc/subscr.c: TIPC network topology service
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2017, Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -39,22 +39,6 @@
39#include "subscr.h" 39#include "subscr.h"
40 40
41/** 41/**
42 * struct tipc_subscriber - TIPC network topology subscriber
43 * @kref: reference counter to tipc_subscription object
44 * @conid: connection identifier to server connecting to subscriber
45 * @lock: control access to subscriber
46 * @subscrp_list: list of subscription objects for this subscriber
47 */
48struct tipc_subscriber {
49 struct kref kref;
50 int conid;
51 spinlock_t lock;
52 struct list_head subscrp_list;
53};
54
55static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
56
57/**
58 * htohl - convert value to endianness used by destination 42 * htohl - convert value to endianness used by destination
59 * @in: value to convert 43 * @in: value to convert
60 * @swap: non-zero if endianness must be reversed 44 * @swap: non-zero if endianness must be reversed
@@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
71 u32 event, u32 port_ref, u32 node) 55 u32 event, u32 port_ref, u32 node)
72{ 56{
73 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 57 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
74 struct tipc_subscriber *subscriber = sub->subscriber;
75 struct kvec msg_sect; 58 struct kvec msg_sect;
76 59
60 if (sub->inactive)
61 return;
77 msg_sect.iov_base = (void *)&sub->evt; 62 msg_sect.iov_base = (void *)&sub->evt;
78 msg_sect.iov_len = sizeof(struct tipc_event); 63 msg_sect.iov_len = sizeof(struct tipc_event);
79 sub->evt.event = htohl(event, sub->swap); 64 sub->evt.event = htohl(event, sub->swap);
@@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
81 sub->evt.found_upper = htohl(found_upper, sub->swap); 66 sub->evt.found_upper = htohl(found_upper, sub->swap);
82 sub->evt.port.ref = htohl(port_ref, sub->swap); 67 sub->evt.port.ref = htohl(port_ref, sub->swap);
83 sub->evt.port.node = htohl(node, sub->swap); 68 sub->evt.port.node = htohl(node, sub->swap);
84 tipc_conn_sendmsg(tn->topsrv, subscriber->conid, 69 tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
85 msg_sect.iov_base, msg_sect.iov_len); 70 msg_sect.iov_base, msg_sect.iov_len);
86} 71}
87 72
@@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
132 return; 117 return;
133 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) 118 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
134 return; 119 return;
135 120 spin_lock(&sub->lock);
136 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, 121 tipc_subscrp_send_event(sub, found_lower, found_upper,
137 node); 122 event, port_ref, node);
123 spin_unlock(&sub->lock);
138} 124}
139 125
140static void tipc_subscrp_timeout(struct timer_list *t) 126static void tipc_subscrp_timeout(struct timer_list *t)
141{ 127{
142 struct tipc_subscription *sub = from_timer(sub, t, timer); 128 struct tipc_subscription *sub = from_timer(sub, t, timer);
143 struct tipc_subscriber *subscriber = sub->subscriber; 129 struct tipc_subscr *s = &sub->evt.s;
144
145 spin_lock_bh(&subscriber->lock);
146 tipc_nametbl_unsubscribe(sub);
147 list_del(&sub->subscrp_list);
148 spin_unlock_bh(&subscriber->lock);
149 130
150 /* Notify subscriber of timeout */ 131 spin_lock(&sub->lock);
151 tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 132 tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
152 TIPC_SUBSCR_TIMEOUT, 0, 0); 133 TIPC_SUBSCR_TIMEOUT, 0, 0);
153 134 sub->inactive = true;
154 tipc_subscrp_put(sub); 135 spin_unlock(&sub->lock);
155}
156
157static void tipc_subscrb_kref_release(struct kref *kref)
158{
159 kfree(container_of(kref,struct tipc_subscriber, kref));
160}
161
162static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
163{
164 kref_put(&subscriber->kref, tipc_subscrb_kref_release);
165}
166
167static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
168{
169 kref_get(&subscriber->kref);
170} 136}
171 137
172static void tipc_subscrp_kref_release(struct kref *kref) 138static void tipc_subscrp_kref_release(struct kref *kref)
@@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref)
175 struct tipc_subscription, 141 struct tipc_subscription,
176 kref); 142 kref);
177 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 143 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
178 struct tipc_subscriber *subscriber = sub->subscriber;
179 144
180 atomic_dec(&tn->subscription_count); 145 atomic_dec(&tn->subscription_count);
181 kfree(sub); 146 kfree(sub);
182 tipc_subscrb_put(subscriber);
183} 147}
184 148
185void tipc_subscrp_put(struct tipc_subscription *subscription) 149void tipc_subscrp_put(struct tipc_subscription *subscription)
@@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
192 kref_get(&subscription->kref); 156 kref_get(&subscription->kref);
193} 157}
194 158
195/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
196 * subscriptions for a given subscriber.
197 */
198static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
199 struct tipc_subscr *s)
200{
201 struct list_head *subscription_list = &subscriber->subscrp_list;
202 struct tipc_subscription *sub, *temp;
203 u32 timeout;
204
205 spin_lock_bh(&subscriber->lock);
206 list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
207 if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
208 continue;
209
210 timeout = htohl(sub->evt.s.timeout, sub->swap);
211 if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
212 tipc_nametbl_unsubscribe(sub);
213 list_del(&sub->subscrp_list);
214 tipc_subscrp_put(sub);
215 }
216
217 if (s)
218 break;
219 }
220 spin_unlock_bh(&subscriber->lock);
221}
222
223struct tipc_subscriber *tipc_subscrb_create(int conid)
224{
225 struct tipc_subscriber *subscriber;
226
227 subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
228 if (!subscriber) {
229 pr_warn("Subscriber rejected, no memory\n");
230 return NULL;
231 }
232 INIT_LIST_HEAD(&subscriber->subscrp_list);
233 kref_init(&subscriber->kref);
234 subscriber->conid = conid;
235 spin_lock_init(&subscriber->lock);
236
237 return subscriber;
238}
239
240void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
241{
242 tipc_subscrb_subscrp_delete(subscriber, NULL);
243 tipc_subscrb_put(subscriber);
244}
245
246static void tipc_subscrp_cancel(struct tipc_subscr *s,
247 struct tipc_subscriber *subscriber)
248{
249 tipc_subscrb_get(subscriber);
250 tipc_subscrb_subscrp_delete(subscriber, s);
251 tipc_subscrb_put(subscriber);
252}
253
254static struct tipc_subscription *tipc_subscrp_create(struct net *net, 159static struct tipc_subscription *tipc_subscrp_create(struct net *net,
255 struct tipc_subscr *s, 160 struct tipc_subscr *s,
256 int swap) 161 int conid, bool swap)
257{ 162{
258 struct tipc_net *tn = net_generic(net, tipc_net_id); 163 struct tipc_net *tn = net_generic(net, tipc_net_id);
259 struct tipc_subscription *sub; 164 struct tipc_subscription *sub;
@@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
275 180
276 /* Initialize subscription object */ 181 /* Initialize subscription object */
277 sub->net = net; 182 sub->net = net;
183 sub->conid = conid;
184 sub->inactive = false;
278 if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || 185 if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
279 (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { 186 (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
280 pr_warn("Subscription rejected, illegal request\n"); 187 pr_warn("Subscription rejected, illegal request\n");
@@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
284 191
285 sub->swap = swap; 192 sub->swap = swap;
286 memcpy(&sub->evt.s, s, sizeof(*s)); 193 memcpy(&sub->evt.s, s, sizeof(*s));
194 spin_lock_init(&sub->lock);
287 atomic_inc(&tn->subscription_count); 195 atomic_inc(&tn->subscription_count);
288 kref_init(&sub->kref); 196 kref_init(&sub->kref);
289 return sub; 197 return sub;
290} 198}
291 199
292static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, 200struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
293 struct tipc_subscriber *subscriber, int swap, 201 struct tipc_subscr *s,
294 bool status) 202 int conid, bool swap,
203 bool status)
295{ 204{
296 struct tipc_subscription *sub = NULL; 205 struct tipc_subscription *sub = NULL;
297 u32 timeout; 206 u32 timeout;
298 207
299 sub = tipc_subscrp_create(net, s, swap); 208 sub = tipc_subscrp_create(net, s, conid, swap);
300 if (!sub) 209 if (!sub)
301 return -1; 210 return NULL;
302 211
303 spin_lock_bh(&subscriber->lock);
304 list_add(&sub->subscrp_list, &subscriber->subscrp_list);
305 sub->subscriber = subscriber;
306 tipc_nametbl_subscribe(sub, status); 212 tipc_nametbl_subscribe(sub, status);
307 tipc_subscrb_get(subscriber);
308 spin_unlock_bh(&subscriber->lock);
309
310 timer_setup(&sub->timer, tipc_subscrp_timeout, 0); 213 timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
311 timeout = htohl(sub->evt.s.timeout, swap); 214 timeout = htohl(sub->evt.s.timeout, swap);
312
313 if (timeout != TIPC_WAIT_FOREVER) 215 if (timeout != TIPC_WAIT_FOREVER)
314 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); 216 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
315 return 0; 217 return sub;
316} 218}
317 219
318/* Handle one request to create a new subscription for the subscriber 220void tipc_sub_delete(struct tipc_subscription *sub)
319 */
320int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
321 void *buf, size_t len)
322{ 221{
323 struct tipc_subscriber *subscriber = usr_data; 222 tipc_nametbl_unsubscribe(sub);
324 struct tipc_subscr *s = (struct tipc_subscr *)buf; 223 if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
325 bool status; 224 del_timer_sync(&sub->timer);
326 int swap; 225 list_del(&sub->subscrp_list);
327 226 tipc_subscrp_put(sub);
328 /* Determine subscriber's endianness */
329 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
330 TIPC_SUB_CANCEL));
331
332 /* Detect & process a subscription cancellation request */
333 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
334 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
335 tipc_subscrp_cancel(s, subscriber);
336 return 0;
337 }
338 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
339 return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
340} 227}
341 228
342int tipc_topsrv_start(struct net *net) 229int tipc_topsrv_start(struct net *net)
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index a736f29ba9ab..cfd0cb3a1da8 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -43,7 +43,7 @@
43#define TIPC_MAX_PUBLICATIONS 65535 43#define TIPC_MAX_PUBLICATIONS 65535
44 44
45struct tipc_subscription; 45struct tipc_subscription;
46struct tipc_subscriber; 46struct tipc_conn;
47 47
48/** 48/**
49 * struct tipc_subscription - TIPC network topology subscription object 49 * struct tipc_subscription - TIPC network topology subscription object
@@ -58,19 +58,22 @@ struct tipc_subscriber;
58 */ 58 */
59struct tipc_subscription { 59struct tipc_subscription {
60 struct kref kref; 60 struct kref kref;
61 struct tipc_subscriber *subscriber;
62 struct net *net; 61 struct net *net;
63 struct timer_list timer; 62 struct timer_list timer;
64 struct list_head nameseq_list; 63 struct list_head nameseq_list;
65 struct list_head subscrp_list; 64 struct list_head subscrp_list;
66 int swap;
67 struct tipc_event evt; 65 struct tipc_event evt;
66 int conid;
67 bool swap;
68 bool inactive;
69 spinlock_t lock; /* serialize up/down and timer events */
68}; 70};
69 71
70struct tipc_subscriber *tipc_subscrb_create(int conid); 72struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
71void tipc_subscrb_delete(struct tipc_subscriber *subscriber); 73 struct tipc_subscr *s,
72int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, 74 int conid, bool swap,
73 void *buf, size_t len); 75 bool status);
76void tipc_sub_delete(struct tipc_subscription *sub);
74int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, 77int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
75 u32 found_upper); 78 u32 found_upper);
76void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 79void tipc_subscrp_report_overlap(struct tipc_subscription *sub,