summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/server.c161
-rw-r--r--net/tipc/server.h2
-rw-r--r--net/tipc/subscr.c173
-rw-r--r--net/tipc/subscr.h17
4 files changed, 146 insertions, 207 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 8aa2a33b1e48..b8268c0882a7 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -2,6 +2,7 @@
2 * net/tipc/server.c: TIPC server infrastructure 2 * net/tipc/server.c: TIPC server infrastructure
3 * 3 *
4 * Copyright (c) 2012-2013, Wind River Systems 4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved. 6 * All rights reserved.
6 * 7 *
7 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -57,12 +58,13 @@
57 * @sock: socket handler associated with connection 58 * @sock: socket handler associated with connection
58 * @flags: indicates connection state 59 * @flags: indicates connection state
59 * @server: pointer to connected server 60 * @server: pointer to connected server
61 * @sub_list: lsit to all pertaing subscriptions
62 * @sub_lock: lock protecting the subscription list
63 * @outqueue_lock: control access to the outqueue
60 * @rwork: receive work item 64 * @rwork: receive work item
61 * @usr_data: user-specified field
62 * @rx_action: what to do when connection socket is active 65 * @rx_action: what to do when connection socket is active
63 * @outqueue: pointer to first outbound message in queue 66 * @outqueue: pointer to first outbound message in queue
64 * @outqueue_lock: control access to the outqueue 67 * @outqueue_lock: control access to the outqueue
65 * @outqueue: list of connection objects for its server
66 * @swork: send work item 68 * @swork: send work item
67 */ 69 */
68struct tipc_conn { 70struct tipc_conn {
@@ -71,9 +73,10 @@ struct tipc_conn {
71 struct socket *sock; 73 struct socket *sock;
72 unsigned long flags; 74 unsigned long flags;
73 struct tipc_server *server; 75 struct tipc_server *server;
76 struct list_head sub_list;
77 spinlock_t sub_lock; /* for subscription list */
74 struct work_struct rwork; 78 struct work_struct rwork;
75 int (*rx_action) (struct tipc_conn *con); 79 int (*rx_action) (struct tipc_conn *con);
76 void *usr_data;
77 struct list_head outqueue; 80 struct list_head outqueue;
78 spinlock_t outqueue_lock; 81 spinlock_t outqueue_lock;
79 struct work_struct swork; 82 struct work_struct swork;
@@ -81,6 +84,7 @@ struct tipc_conn {
81 84
82/* An entry waiting to be sent */ 85/* An entry waiting to be sent */
83struct outqueue_entry { 86struct outqueue_entry {
87 u32 evt;
84 struct list_head list; 88 struct list_head list;
85 struct kvec iov; 89 struct kvec iov;
86}; 90};
@@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work);
89static void tipc_send_work(struct work_struct *work); 93static void tipc_send_work(struct work_struct *work);
90static void tipc_clean_outqueues(struct tipc_conn *con); 94static void tipc_clean_outqueues(struct tipc_conn *con);
91 95
96static bool connected(struct tipc_conn *con)
97{
98 return con && test_bit(CF_CONNECTED, &con->flags);
99}
100
101/**
102 * htohl - convert value to endianness used by destination
103 * @in: value to convert
104 * @swap: non-zero if endianness must be reversed
105 *
106 * Returns converted value
107 */
108static u32 htohl(u32 in, int swap)
109{
110 return swap ? swab32(in) : in;
111}
112
92static void tipc_conn_kref_release(struct kref *kref) 113static void tipc_conn_kref_release(struct kref *kref)
93{ 114{
94 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); 115 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
95 struct tipc_server *s = con->server; 116 struct tipc_server *s = con->server;
96 struct socket *sock = con->sock; 117 struct socket *sock = con->sock;
97 struct sock *sk;
98 118
99 if (sock) { 119 if (sock) {
100 sk = sock->sk;
101 if (test_bit(CF_SERVER, &con->flags)) { 120 if (test_bit(CF_SERVER, &con->flags)) {
102 __module_get(sock->ops->owner); 121 __module_get(sock->ops->owner);
103 __module_get(sk->sk_prot_creator->owner); 122 __module_get(sock->sk->sk_prot_creator->owner);
104 } 123 }
105 sock_release(sock); 124 sock_release(sock);
106 con->sock = NULL; 125 con->sock = NULL;
@@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
129 148
130 spin_lock_bh(&s->idr_lock); 149 spin_lock_bh(&s->idr_lock);
131 con = idr_find(&s->conn_idr, conid); 150 con = idr_find(&s->conn_idr, conid);
132 if (con) { 151 if (!connected(con) || !kref_get_unless_zero(&con->kref))
133 if (!test_bit(CF_CONNECTED, &con->flags) || 152 con = NULL;
134 !kref_get_unless_zero(&con->kref))
135 con = NULL;
136 }
137 spin_unlock_bh(&s->idr_lock); 153 spin_unlock_bh(&s->idr_lock);
138 return con; 154 return con;
139} 155}
@@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk)
144 160
145 read_lock_bh(&sk->sk_callback_lock); 161 read_lock_bh(&sk->sk_callback_lock);
146 con = sock2con(sk); 162 con = sock2con(sk);
147 if (con && test_bit(CF_CONNECTED, &con->flags)) { 163 if (connected(con)) {
148 conn_get(con); 164 conn_get(con);
149 if (!queue_work(con->server->rcv_wq, &con->rwork)) 165 if (!queue_work(con->server->rcv_wq, &con->rwork))
150 conn_put(con); 166 conn_put(con);
@@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk)
158 174
159 read_lock_bh(&sk->sk_callback_lock); 175 read_lock_bh(&sk->sk_callback_lock);
160 con = sock2con(sk); 176 con = sock2con(sk);
161 if (con && test_bit(CF_CONNECTED, &con->flags)) { 177 if (connected(con)) {
162 conn_get(con); 178 conn_get(con);
163 if (!queue_work(con->server->send_wq, &con->swork)) 179 if (!queue_work(con->server->send_wq, &con->swork))
164 conn_put(con); 180 conn_put(con);
@@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
181 write_unlock_bh(&sk->sk_callback_lock); 197 write_unlock_bh(&sk->sk_callback_lock);
182} 198}
183 199
200/* tipc_con_delete_sub - delete a specific or all subscriptions
201 * for a given subscriber
202 */
203static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
204{
205 struct list_head *sub_list = &con->sub_list;
206 struct tipc_subscription *sub, *tmp;
207
208 spin_lock_bh(&con->sub_lock);
209 list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
210 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
211 tipc_sub_delete(sub);
212 else if (s)
213 break;
214 }
215 spin_unlock_bh(&con->sub_lock);
216}
217
184static void tipc_close_conn(struct tipc_conn *con) 218static void tipc_close_conn(struct tipc_conn *con)
185{ 219{
186 struct sock *sk = con->sock->sk; 220 struct sock *sk = con->sock->sk;
@@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con)
188 222
189 write_lock_bh(&sk->sk_callback_lock); 223 write_lock_bh(&sk->sk_callback_lock);
190 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); 224 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
225
191 if (disconnect) { 226 if (disconnect) {
192 sk->sk_user_data = NULL; 227 sk->sk_user_data = NULL;
193 if (con->conid) 228 if (con->conid)
194 tipc_subscrb_delete(con->usr_data); 229 tipc_con_delete_sub(con, NULL);
195 } 230 }
196 write_unlock_bh(&sk->sk_callback_lock); 231 write_unlock_bh(&sk->sk_callback_lock);
197 232
@@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
215 250
216 kref_init(&con->kref); 251 kref_init(&con->kref);
217 INIT_LIST_HEAD(&con->outqueue); 252 INIT_LIST_HEAD(&con->outqueue);
253 INIT_LIST_HEAD(&con->sub_list);
218 spin_lock_init(&con->outqueue_lock); 254 spin_lock_init(&con->outqueue_lock);
255 spin_lock_init(&con->sub_lock);
219 INIT_WORK(&con->swork, tipc_send_work); 256 INIT_WORK(&con->swork, tipc_send_work);
220 INIT_WORK(&con->rwork, tipc_recv_work); 257 INIT_WORK(&con->rwork, tipc_recv_work);
221 258
@@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
236 return con; 273 return con;
237} 274}
238 275
276int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
277 void *buf, size_t len)
278{
279 struct tipc_subscr *s = (struct tipc_subscr *)buf;
280 struct tipc_subscription *sub;
281 bool status;
282 int swap;
283
284 /* Determine subscriber's endianness */
285 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
286 TIPC_SUB_CANCEL));
287
288 /* Detect & process a subscription cancellation request */
289 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
290 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
291 tipc_con_delete_sub(con, s);
292 return 0;
293 }
294 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
295 sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
296 if (!sub)
297 return -1;
298
299 spin_lock_bh(&con->sub_lock);
300 list_add(&sub->subscrp_list, &con->sub_list);
301 spin_unlock_bh(&con->sub_lock);
302 return 0;
303}
304
239static int tipc_receive_from_sock(struct tipc_conn *con) 305static int tipc_receive_from_sock(struct tipc_conn *con)
240{ 306{
241 struct tipc_server *s = con->server; 307 struct tipc_server *s = con->server;
@@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
262 } 328 }
263 329
264 read_lock_bh(&sk->sk_callback_lock); 330 read_lock_bh(&sk->sk_callback_lock);
265 if (test_bit(CF_CONNECTED, &con->flags)) 331 ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
266 ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
267 con->usr_data, buf, ret);
268 read_unlock_bh(&sk->sk_callback_lock); 332 read_unlock_bh(&sk->sk_callback_lock);
269 kmem_cache_free(s->rcvbuf_cache, buf); 333 kmem_cache_free(s->rcvbuf_cache, buf);
270 if (ret < 0) 334 if (ret < 0)
@@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
302 newcon->rx_action = tipc_receive_from_sock; 366 newcon->rx_action = tipc_receive_from_sock;
303 tipc_register_callbacks(newsock, newcon); 367 tipc_register_callbacks(newsock, newcon);
304 368
305 /* Notify that new connection is incoming */
306 newcon->usr_data = tipc_subscrb_create(newcon->conid);
307
308 if (!newcon->usr_data) {
309 sock_release(newsock);
310 conn_put(newcon);
311 return -ENOMEM;
312 }
313
314 /* Wake up receive process in case of 'SYN+' message */ 369 /* Wake up receive process in case of 'SYN+' message */
315 newsock->sk->sk_data_ready(newsock->sk); 370 newsock->sk->sk_data_ready(newsock->sk);
316 return ret; 371 return ret;
@@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
427} 482}
428 483
429int tipc_conn_sendmsg(struct tipc_server *s, int conid, 484int tipc_conn_sendmsg(struct tipc_server *s, int conid,
430 void *data, size_t len) 485 u32 evt, void *data, size_t len)
431{ 486{
432 struct outqueue_entry *e; 487 struct outqueue_entry *e;
433 struct tipc_conn *con; 488 struct tipc_conn *con;
@@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
436 if (!con) 491 if (!con)
437 return -EINVAL; 492 return -EINVAL;
438 493
439 if (!test_bit(CF_CONNECTED, &con->flags)) { 494 if (!connected(con)) {
440 conn_put(con); 495 conn_put(con);
441 return 0; 496 return 0;
442 } 497 }
@@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
446 conn_put(con); 501 conn_put(con);
447 return -ENOMEM; 502 return -ENOMEM;
448 } 503 }
449 504 e->evt = evt;
450 spin_lock_bh(&con->outqueue_lock); 505 spin_lock_bh(&con->outqueue_lock);
451 list_add_tail(&e->list, &con->outqueue); 506 list_add_tail(&e->list, &con->outqueue);
452 spin_unlock_bh(&con->outqueue_lock); 507 spin_unlock_bh(&con->outqueue_lock);
@@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
470bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 525bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
471 u32 upper, u32 filter, int *conid) 526 u32 upper, u32 filter, int *conid)
472{ 527{
473 struct tipc_subscriber *scbr;
474 struct tipc_subscr sub; 528 struct tipc_subscr sub;
475 struct tipc_server *s;
476 struct tipc_conn *con; 529 struct tipc_conn *con;
530 int rc;
477 531
478 sub.seq.type = type; 532 sub.seq.type = type;
479 sub.seq.lower = lower; 533 sub.seq.lower = lower;
@@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
487 return false; 541 return false;
488 542
489 *conid = con->conid; 543 *conid = con->conid;
490 s = con->server;
491 scbr = tipc_subscrb_create(*conid);
492 if (!scbr) {
493 conn_put(con);
494 return false;
495 }
496
497 con->usr_data = scbr;
498 con->sock = NULL; 544 con->sock = NULL;
499 tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); 545 rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
500 return true; 546 if (rc < 0)
547 tipc_close_conn(con);
548 return !rc;
501} 549}
502 550
503void tipc_topsrv_kern_unsubscr(struct net *net, int conid) 551void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
504{ 552{
505 struct tipc_conn *con; 553 struct tipc_conn *con;
506 struct tipc_server *srv;
507 554
508 con = tipc_conn_lookup(tipc_topsrv(net), conid); 555 con = tipc_conn_lookup(tipc_topsrv(net), conid);
509 if (!con) 556 if (!con)
510 return; 557 return;
511 558
512 test_and_clear_bit(CF_CONNECTED, &con->flags); 559 test_and_clear_bit(CF_CONNECTED, &con->flags);
513 srv = con->server; 560 tipc_con_delete_sub(con, NULL);
514 if (con->conid)
515 tipc_subscrb_delete(con->usr_data);
516 conn_put(con); 561 conn_put(con);
517 conn_put(con); 562 conn_put(con);
518} 563}
@@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
537 582
538static void tipc_send_to_sock(struct tipc_conn *con) 583static void tipc_send_to_sock(struct tipc_conn *con)
539{ 584{
540 struct tipc_server *s = con->server; 585 struct list_head *queue = &con->outqueue;
586 struct tipc_server *srv = con->server;
541 struct outqueue_entry *e; 587 struct outqueue_entry *e;
542 struct tipc_event *evt; 588 struct tipc_event *evt;
543 struct msghdr msg; 589 struct msghdr msg;
@@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con)
545 int ret; 591 int ret;
546 592
547 spin_lock_bh(&con->outqueue_lock); 593 spin_lock_bh(&con->outqueue_lock);
548 while (test_bit(CF_CONNECTED, &con->flags)) { 594
549 e = list_entry(con->outqueue.next, struct outqueue_entry, list); 595 while (!list_empty(queue)) {
550 if ((struct list_head *) e == &con->outqueue) 596 e = list_first_entry(queue, struct outqueue_entry, list);
551 break;
552 597
553 spin_unlock_bh(&con->outqueue_lock); 598 spin_unlock_bh(&con->outqueue_lock);
554 599
600 if (e->evt == TIPC_SUBSCR_TIMEOUT) {
601 evt = (struct tipc_event *)e->iov.iov_base;
602 tipc_con_delete_sub(con, &evt->s);
603 }
604 memset(&msg, 0, sizeof(msg));
605 msg.msg_flags = MSG_DONTWAIT;
606
555 if (con->sock) { 607 if (con->sock) {
556 memset(&msg, 0, sizeof(msg));
557 msg.msg_flags = MSG_DONTWAIT;
558 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, 608 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
559 e->iov.iov_len); 609 e->iov.iov_len);
560 if (ret == -EWOULDBLOCK || ret == 0) { 610 if (ret == -EWOULDBLOCK || ret == 0) {
@@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
565 } 615 }
566 } else { 616 } else {
567 evt = e->iov.iov_base; 617 evt = e->iov.iov_base;
568 tipc_send_kern_top_evt(s->net, evt); 618 tipc_send_kern_top_evt(srv->net, evt);
569 } 619 }
570 620
571 /* Don't starve users filling buffers */ 621 /* Don't starve users filling buffers */
@@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
573 cond_resched(); 623 cond_resched();
574 count = 0; 624 count = 0;
575 } 625 }
576
577 spin_lock_bh(&con->outqueue_lock); 626 spin_lock_bh(&con->outqueue_lock);
578 list_del(&e->list); 627 list_del(&e->list);
579 tipc_free_entry(e); 628 tipc_free_entry(e);
@@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work)
591 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); 640 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
592 int count = 0; 641 int count = 0;
593 642
594 while (test_bit(CF_CONNECTED, &con->flags)) { 643 while (connected(con)) {
595 if (con->rx_action(con)) 644 if (con->rx_action(con))
596 break; 645 break;
597 646
@@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work)
608{ 657{
609 struct tipc_conn *con = container_of(work, struct tipc_conn, swork); 658 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
610 659
611 if (test_bit(CF_CONNECTED, &con->flags)) 660 if (connected(con))
612 tipc_send_to_sock(con); 661 tipc_send_to_sock(con);
613 662
614 conn_put(con); 663 conn_put(con);
diff --git a/net/tipc/server.h b/net/tipc/server.h
index b4b83bdc8b20..fcc72321362d 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -77,7 +77,7 @@ struct tipc_server {
77}; 77};
78 78
79int tipc_conn_sendmsg(struct tipc_server *s, int conid, 79int tipc_conn_sendmsg(struct tipc_server *s, int conid,
80 void *data, size_t len); 80 u32 evt, void *data, size_t len);
81 81
82bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 82bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
83 u32 upper, u32 filter, int *conid); 83 u32 upper, u32 filter, int *conid);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index b86fbbf7a0b9..c6de1452db69 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/subscr.c: TIPC network topology service 2 * net/tipc/subscr.c: TIPC network topology service
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2017, Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -39,22 +39,6 @@
39#include "subscr.h" 39#include "subscr.h"
40 40
41/** 41/**
42 * struct tipc_subscriber - TIPC network topology subscriber
43 * @kref: reference counter to tipc_subscription object
44 * @conid: connection identifier to server connecting to subscriber
45 * @lock: control access to subscriber
46 * @subscrp_list: list of subscription objects for this subscriber
47 */
48struct tipc_subscriber {
49 struct kref kref;
50 int conid;
51 spinlock_t lock;
52 struct list_head subscrp_list;
53};
54
55static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
56
57/**
58 * htohl - convert value to endianness used by destination 42 * htohl - convert value to endianness used by destination
59 * @in: value to convert 43 * @in: value to convert
60 * @swap: non-zero if endianness must be reversed 44 * @swap: non-zero if endianness must be reversed
@@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
71 u32 event, u32 port_ref, u32 node) 55 u32 event, u32 port_ref, u32 node)
72{ 56{
73 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 57 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
74 struct tipc_subscriber *subscriber = sub->subscriber;
75 struct kvec msg_sect; 58 struct kvec msg_sect;
76 59
60 if (sub->inactive)
61 return;
77 msg_sect.iov_base = (void *)&sub->evt; 62 msg_sect.iov_base = (void *)&sub->evt;
78 msg_sect.iov_len = sizeof(struct tipc_event); 63 msg_sect.iov_len = sizeof(struct tipc_event);
79 sub->evt.event = htohl(event, sub->swap); 64 sub->evt.event = htohl(event, sub->swap);
@@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
81 sub->evt.found_upper = htohl(found_upper, sub->swap); 66 sub->evt.found_upper = htohl(found_upper, sub->swap);
82 sub->evt.port.ref = htohl(port_ref, sub->swap); 67 sub->evt.port.ref = htohl(port_ref, sub->swap);
83 sub->evt.port.node = htohl(node, sub->swap); 68 sub->evt.port.node = htohl(node, sub->swap);
84 tipc_conn_sendmsg(tn->topsrv, subscriber->conid, 69 tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
85 msg_sect.iov_base, msg_sect.iov_len); 70 msg_sect.iov_base, msg_sect.iov_len);
86} 71}
87 72
@@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
132 return; 117 return;
133 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) 118 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
134 return; 119 return;
135 120 spin_lock(&sub->lock);
136 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, 121 tipc_subscrp_send_event(sub, found_lower, found_upper,
137 node); 122 event, port_ref, node);
123 spin_unlock(&sub->lock);
138} 124}
139 125
140static void tipc_subscrp_timeout(struct timer_list *t) 126static void tipc_subscrp_timeout(struct timer_list *t)
141{ 127{
142 struct tipc_subscription *sub = from_timer(sub, t, timer); 128 struct tipc_subscription *sub = from_timer(sub, t, timer);
143 struct tipc_subscriber *subscriber = sub->subscriber; 129 struct tipc_subscr *s = &sub->evt.s;
144
145 spin_lock_bh(&subscriber->lock);
146 tipc_nametbl_unsubscribe(sub);
147 list_del(&sub->subscrp_list);
148 spin_unlock_bh(&subscriber->lock);
149 130
150 /* Notify subscriber of timeout */ 131 spin_lock(&sub->lock);
151 tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 132 tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
152 TIPC_SUBSCR_TIMEOUT, 0, 0); 133 TIPC_SUBSCR_TIMEOUT, 0, 0);
153 134 sub->inactive = true;
154 tipc_subscrp_put(sub); 135 spin_unlock(&sub->lock);
155}
156
157static void tipc_subscrb_kref_release(struct kref *kref)
158{
159 kfree(container_of(kref,struct tipc_subscriber, kref));
160}
161
162static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
163{
164 kref_put(&subscriber->kref, tipc_subscrb_kref_release);
165}
166
167static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
168{
169 kref_get(&subscriber->kref);
170} 136}
171 137
172static void tipc_subscrp_kref_release(struct kref *kref) 138static void tipc_subscrp_kref_release(struct kref *kref)
@@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref)
175 struct tipc_subscription, 141 struct tipc_subscription,
176 kref); 142 kref);
177 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 143 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
178 struct tipc_subscriber *subscriber = sub->subscriber;
179 144
180 atomic_dec(&tn->subscription_count); 145 atomic_dec(&tn->subscription_count);
181 kfree(sub); 146 kfree(sub);
182 tipc_subscrb_put(subscriber);
183} 147}
184 148
185void tipc_subscrp_put(struct tipc_subscription *subscription) 149void tipc_subscrp_put(struct tipc_subscription *subscription)
@@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
192 kref_get(&subscription->kref); 156 kref_get(&subscription->kref);
193} 157}
194 158
195/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
196 * subscriptions for a given subscriber.
197 */
198static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
199 struct tipc_subscr *s)
200{
201 struct list_head *subscription_list = &subscriber->subscrp_list;
202 struct tipc_subscription *sub, *temp;
203 u32 timeout;
204
205 spin_lock_bh(&subscriber->lock);
206 list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
207 if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
208 continue;
209
210 timeout = htohl(sub->evt.s.timeout, sub->swap);
211 if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
212 tipc_nametbl_unsubscribe(sub);
213 list_del(&sub->subscrp_list);
214 tipc_subscrp_put(sub);
215 }
216
217 if (s)
218 break;
219 }
220 spin_unlock_bh(&subscriber->lock);
221}
222
223struct tipc_subscriber *tipc_subscrb_create(int conid)
224{
225 struct tipc_subscriber *subscriber;
226
227 subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
228 if (!subscriber) {
229 pr_warn("Subscriber rejected, no memory\n");
230 return NULL;
231 }
232 INIT_LIST_HEAD(&subscriber->subscrp_list);
233 kref_init(&subscriber->kref);
234 subscriber->conid = conid;
235 spin_lock_init(&subscriber->lock);
236
237 return subscriber;
238}
239
240void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
241{
242 tipc_subscrb_subscrp_delete(subscriber, NULL);
243 tipc_subscrb_put(subscriber);
244}
245
246static void tipc_subscrp_cancel(struct tipc_subscr *s,
247 struct tipc_subscriber *subscriber)
248{
249 tipc_subscrb_get(subscriber);
250 tipc_subscrb_subscrp_delete(subscriber, s);
251 tipc_subscrb_put(subscriber);
252}
253
254static struct tipc_subscription *tipc_subscrp_create(struct net *net, 159static struct tipc_subscription *tipc_subscrp_create(struct net *net,
255 struct tipc_subscr *s, 160 struct tipc_subscr *s,
256 int swap) 161 int conid, bool swap)
257{ 162{
258 struct tipc_net *tn = net_generic(net, tipc_net_id); 163 struct tipc_net *tn = net_generic(net, tipc_net_id);
259 struct tipc_subscription *sub; 164 struct tipc_subscription *sub;
@@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
275 180
276 /* Initialize subscription object */ 181 /* Initialize subscription object */
277 sub->net = net; 182 sub->net = net;
183 sub->conid = conid;
184 sub->inactive = false;
278 if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || 185 if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
279 (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { 186 (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
280 pr_warn("Subscription rejected, illegal request\n"); 187 pr_warn("Subscription rejected, illegal request\n");
@@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
284 191
285 sub->swap = swap; 192 sub->swap = swap;
286 memcpy(&sub->evt.s, s, sizeof(*s)); 193 memcpy(&sub->evt.s, s, sizeof(*s));
194 spin_lock_init(&sub->lock);
287 atomic_inc(&tn->subscription_count); 195 atomic_inc(&tn->subscription_count);
288 kref_init(&sub->kref); 196 kref_init(&sub->kref);
289 return sub; 197 return sub;
290} 198}
291 199
292static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, 200struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
293 struct tipc_subscriber *subscriber, int swap, 201 struct tipc_subscr *s,
294 bool status) 202 int conid, bool swap,
203 bool status)
295{ 204{
296 struct tipc_subscription *sub = NULL; 205 struct tipc_subscription *sub = NULL;
297 u32 timeout; 206 u32 timeout;
298 207
299 sub = tipc_subscrp_create(net, s, swap); 208 sub = tipc_subscrp_create(net, s, conid, swap);
300 if (!sub) 209 if (!sub)
301 return -1; 210 return NULL;
302 211
303 spin_lock_bh(&subscriber->lock);
304 list_add(&sub->subscrp_list, &subscriber->subscrp_list);
305 sub->subscriber = subscriber;
306 tipc_nametbl_subscribe(sub, status); 212 tipc_nametbl_subscribe(sub, status);
307 tipc_subscrb_get(subscriber);
308 spin_unlock_bh(&subscriber->lock);
309
310 timer_setup(&sub->timer, tipc_subscrp_timeout, 0); 213 timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
311 timeout = htohl(sub->evt.s.timeout, swap); 214 timeout = htohl(sub->evt.s.timeout, swap);
312
313 if (timeout != TIPC_WAIT_FOREVER) 215 if (timeout != TIPC_WAIT_FOREVER)
314 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); 216 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
315 return 0; 217 return sub;
316} 218}
317 219
318/* Handle one request to create a new subscription for the subscriber 220void tipc_sub_delete(struct tipc_subscription *sub)
319 */
320int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
321 void *buf, size_t len)
322{ 221{
323 struct tipc_subscriber *subscriber = usr_data; 222 tipc_nametbl_unsubscribe(sub);
324 struct tipc_subscr *s = (struct tipc_subscr *)buf; 223 if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
325 bool status; 224 del_timer_sync(&sub->timer);
326 int swap; 225 list_del(&sub->subscrp_list);
327 226 tipc_subscrp_put(sub);
328 /* Determine subscriber's endianness */
329 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
330 TIPC_SUB_CANCEL));
331
332 /* Detect & process a subscription cancellation request */
333 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
334 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
335 tipc_subscrp_cancel(s, subscriber);
336 return 0;
337 }
338 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
339 return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
340} 227}
341 228
342int tipc_topsrv_start(struct net *net) 229int tipc_topsrv_start(struct net *net)
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index a736f29ba9ab..cfd0cb3a1da8 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -43,7 +43,7 @@
43#define TIPC_MAX_PUBLICATIONS 65535 43#define TIPC_MAX_PUBLICATIONS 65535
44 44
45struct tipc_subscription; 45struct tipc_subscription;
46struct tipc_subscriber; 46struct tipc_conn;
47 47
48/** 48/**
49 * struct tipc_subscription - TIPC network topology subscription object 49 * struct tipc_subscription - TIPC network topology subscription object
@@ -58,19 +58,22 @@ struct tipc_subscriber;
58 */ 58 */
59struct tipc_subscription { 59struct tipc_subscription {
60 struct kref kref; 60 struct kref kref;
61 struct tipc_subscriber *subscriber;
62 struct net *net; 61 struct net *net;
63 struct timer_list timer; 62 struct timer_list timer;
64 struct list_head nameseq_list; 63 struct list_head nameseq_list;
65 struct list_head subscrp_list; 64 struct list_head subscrp_list;
66 int swap;
67 struct tipc_event evt; 65 struct tipc_event evt;
66 int conid;
67 bool swap;
68 bool inactive;
69 spinlock_t lock; /* serialize up/down and timer events */
68}; 70};
69 71
70struct tipc_subscriber *tipc_subscrb_create(int conid); 72struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
71void tipc_subscrb_delete(struct tipc_subscriber *subscriber); 73 struct tipc_subscr *s,
72int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, 74 int conid, bool swap,
73 void *buf, size_t len); 75 bool status);
76void tipc_sub_delete(struct tipc_subscription *sub);
74int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, 77int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
75 u32 found_upper); 78 u32 found_upper);
76void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 79void tipc_subscrp_report_overlap(struct tipc_subscription *sub,