aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/server.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-02-15 04:40:51 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 15:26:34 -0500
commit026321c6d056a54b4145522492245d2b5913ee1d (patch)
tree7a0d42868ef29033baf596bfe766163cee4edeba /net/tipc/server.c
parent0ef897be12b8b4cf297b6016e79ec97ec90f2cf6 (diff)
tipc: rename tipc_server to tipc_topsrv
We rename struct tipc_server to struct tipc_topsrv. This reflect its now specialized role as topology server. Accoringly, we change or add function prefixes to make it clearer which functionality those belong to. There are no functional changes in this commit. Acked-by: Ying.Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/server.c')
-rw-r--r--net/tipc/server.c700
1 files changed, 0 insertions, 700 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
deleted file mode 100644
index 0e351e81690e..000000000000
--- a/net/tipc/server.c
+++ /dev/null
@@ -1,700 +0,0 @@
1/*
2 * net/tipc/server.c: TIPC server infrastructure
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017, Ericsson AB
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "subscr.h"
38#include "server.h"
39#include "core.h"
40#include "socket.h"
41#include "addr.h"
42#include "msg.h"
43#include <net/sock.h>
44#include <linux/module.h>
45
46/* Number of messages to send before rescheduling */
47#define MAX_SEND_MSG_COUNT 25
48#define MAX_RECV_MSG_COUNT 25
49#define CF_CONNECTED 1
50#define CF_SERVER 2
51
52#define TIPC_SERVER_NAME_LEN 32
53
54/**
55 * struct tipc_server - TIPC server structure
56 * @conn_idr: identifier set of connection
57 * @idr_lock: protect the connection identifier set
58 * @idr_in_use: amount of allocated identifier entry
59 * @net: network namspace instance
60 * @rcvbuf_cache: memory cache of server receive buffer
61 * @rcv_wq: receive workqueue
62 * @send_wq: send workqueue
63 * @max_rcvbuf_size: maximum permitted receive message length
64 * @tipc_conn_new: callback will be called when new connection is incoming
65 * @tipc_conn_release: callback will be called before releasing the connection
66 * @tipc_conn_recvmsg: callback will be called when message arrives
67 * @name: server name
68 * @imp: message importance
69 * @type: socket type
70 */
71struct tipc_server {
72 struct idr conn_idr;
73 spinlock_t idr_lock; /* for idr list */
74 int idr_in_use;
75 struct net *net;
76 struct work_struct awork;
77 struct workqueue_struct *rcv_wq;
78 struct workqueue_struct *send_wq;
79 int max_rcvbuf_size;
80 struct socket *listener;
81 char name[TIPC_SERVER_NAME_LEN];
82};
83
84/**
85 * struct tipc_conn - TIPC connection structure
86 * @kref: reference counter to connection object
87 * @conid: connection identifier
88 * @sock: socket handler associated with connection
89 * @flags: indicates connection state
90 * @server: pointer to connected server
91 * @sub_list: lsit to all pertaing subscriptions
92 * @sub_lock: lock protecting the subscription list
93 * @outqueue_lock: control access to the outqueue
94 * @rwork: receive work item
95 * @rx_action: what to do when connection socket is active
96 * @outqueue: pointer to first outbound message in queue
97 * @outqueue_lock: control access to the outqueue
98 * @swork: send work item
99 */
100struct tipc_conn {
101 struct kref kref;
102 int conid;
103 struct socket *sock;
104 unsigned long flags;
105 struct tipc_server *server;
106 struct list_head sub_list;
107 spinlock_t sub_lock; /* for subscription list */
108 struct work_struct rwork;
109 struct list_head outqueue;
110 spinlock_t outqueue_lock;
111 struct work_struct swork;
112};
113
114/* An entry waiting to be sent */
115struct outqueue_entry {
116 bool inactive;
117 struct tipc_event evt;
118 struct list_head list;
119};
120
121static void tipc_recv_work(struct work_struct *work);
122static void tipc_send_work(struct work_struct *work);
123
124static bool connected(struct tipc_conn *con)
125{
126 return con && test_bit(CF_CONNECTED, &con->flags);
127}
128
129static void tipc_conn_kref_release(struct kref *kref)
130{
131 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
132 struct tipc_server *s = con->server;
133 struct outqueue_entry *e, *safe;
134
135 spin_lock_bh(&s->idr_lock);
136 idr_remove(&s->conn_idr, con->conid);
137 s->idr_in_use--;
138 spin_unlock_bh(&s->idr_lock);
139 if (con->sock)
140 sock_release(con->sock);
141
142 spin_lock_bh(&con->outqueue_lock);
143 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
144 list_del(&e->list);
145 kfree(e);
146 }
147 spin_unlock_bh(&con->outqueue_lock);
148 kfree(con);
149}
150
151static void conn_put(struct tipc_conn *con)
152{
153 kref_put(&con->kref, tipc_conn_kref_release);
154}
155
156static void conn_get(struct tipc_conn *con)
157{
158 kref_get(&con->kref);
159}
160
161static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
162{
163 struct tipc_conn *con;
164
165 spin_lock_bh(&s->idr_lock);
166 con = idr_find(&s->conn_idr, conid);
167 if (!connected(con) || !kref_get_unless_zero(&con->kref))
168 con = NULL;
169 spin_unlock_bh(&s->idr_lock);
170 return con;
171}
172
173/* sock_data_ready - interrupt callback indicating the socket has data to read
174 * The queued work is launched into tipc_recv_work()->tipc_recv_from_sock()
175 */
176static void sock_data_ready(struct sock *sk)
177{
178 struct tipc_conn *con;
179
180 read_lock_bh(&sk->sk_callback_lock);
181 con = sk->sk_user_data;
182 if (connected(con)) {
183 conn_get(con);
184 if (!queue_work(con->server->rcv_wq, &con->rwork))
185 conn_put(con);
186 }
187 read_unlock_bh(&sk->sk_callback_lock);
188}
189
190/* sock_write_space - interrupt callback after a sendmsg EAGAIN
191 * Indicates that there now is more space in the send buffer
192 * The queued work is launched into tipc_send_work()->tipc_send_to_sock()
193 */
194static void sock_write_space(struct sock *sk)
195{
196 struct tipc_conn *con;
197
198 read_lock_bh(&sk->sk_callback_lock);
199 con = sk->sk_user_data;
200 if (connected(con)) {
201 conn_get(con);
202 if (!queue_work(con->server->send_wq, &con->swork))
203 conn_put(con);
204 }
205 read_unlock_bh(&sk->sk_callback_lock);
206}
207
208/* tipc_con_delete_sub - delete a specific or all subscriptions
209 * for a given subscriber
210 */
211static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
212{
213 struct list_head *sub_list = &con->sub_list;
214 struct tipc_net *tn = tipc_net(con->server->net);
215 struct tipc_subscription *sub, *tmp;
216
217 spin_lock_bh(&con->sub_lock);
218 list_for_each_entry_safe(sub, tmp, sub_list, sub_list) {
219 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) {
220 tipc_sub_unsubscribe(sub);
221 atomic_dec(&tn->subscription_count);
222 } else if (s) {
223 break;
224 }
225 }
226 spin_unlock_bh(&con->sub_lock);
227}
228
229static void tipc_close_conn(struct tipc_conn *con)
230{
231 struct sock *sk = con->sock->sk;
232 bool disconnect = false;
233
234 write_lock_bh(&sk->sk_callback_lock);
235 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
236
237 if (disconnect) {
238 sk->sk_user_data = NULL;
239 tipc_con_delete_sub(con, NULL);
240 }
241 write_unlock_bh(&sk->sk_callback_lock);
242
243 /* Handle concurrent calls from sending and receiving threads */
244 if (!disconnect)
245 return;
246
247 /* Don't flush pending works, -just let them expire */
248 kernel_sock_shutdown(con->sock, SHUT_RDWR);
249
250 conn_put(con);
251}
252
253static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
254{
255 struct tipc_conn *con;
256 int ret;
257
258 con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
259 if (!con)
260 return ERR_PTR(-ENOMEM);
261
262 kref_init(&con->kref);
263 INIT_LIST_HEAD(&con->outqueue);
264 INIT_LIST_HEAD(&con->sub_list);
265 spin_lock_init(&con->outqueue_lock);
266 spin_lock_init(&con->sub_lock);
267 INIT_WORK(&con->swork, tipc_send_work);
268 INIT_WORK(&con->rwork, tipc_recv_work);
269
270 spin_lock_bh(&s->idr_lock);
271 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
272 if (ret < 0) {
273 kfree(con);
274 spin_unlock_bh(&s->idr_lock);
275 return ERR_PTR(-ENOMEM);
276 }
277 con->conid = ret;
278 s->idr_in_use++;
279 spin_unlock_bh(&s->idr_lock);
280
281 set_bit(CF_CONNECTED, &con->flags);
282 con->server = s;
283
284 return con;
285}
286
287static int tipc_con_rcv_sub(struct tipc_server *srv,
288 struct tipc_conn *con,
289 struct tipc_subscr *s)
290{
291 struct tipc_net *tn = tipc_net(srv->net);
292 struct tipc_subscription *sub;
293
294 if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) {
295 tipc_con_delete_sub(con, s);
296 return 0;
297 }
298 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) {
299 pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR);
300 return -1;
301 }
302 sub = tipc_sub_subscribe(srv->net, s, con->conid);
303 if (!sub)
304 return -1;
305 atomic_inc(&tn->subscription_count);
306 spin_lock_bh(&con->sub_lock);
307 list_add(&sub->sub_list, &con->sub_list);
308 spin_unlock_bh(&con->sub_lock);
309 return 0;
310}
311
312static int tipc_receive_from_sock(struct tipc_conn *con)
313{
314 struct tipc_server *srv = con->server;
315 struct sock *sk = con->sock->sk;
316 struct msghdr msg = {};
317 struct tipc_subscr s;
318 struct kvec iov;
319 int ret;
320
321 iov.iov_base = &s;
322 iov.iov_len = sizeof(s);
323 msg.msg_name = NULL;
324 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
325 ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
326 if (ret == -EWOULDBLOCK)
327 return -EWOULDBLOCK;
328 if (ret > 0) {
329 read_lock_bh(&sk->sk_callback_lock);
330 ret = tipc_con_rcv_sub(srv, con, &s);
331 read_unlock_bh(&sk->sk_callback_lock);
332 }
333 if (ret < 0)
334 tipc_close_conn(con);
335
336 return ret;
337}
338
339/* tipc_conn_queue_evt() - interrupt level call from a subscription instance
340 * The queued work is launched into tipc_send_work()->tipc_send_to_sock()
341 */
342void tipc_conn_queue_evt(struct net *net, int conid,
343 u32 event, struct tipc_event *evt)
344{
345 struct tipc_server *srv = tipc_topsrv(net);
346 struct outqueue_entry *e;
347 struct tipc_conn *con;
348
349 con = tipc_conn_lookup(srv, conid);
350 if (!con)
351 return;
352
353 if (!connected(con))
354 goto err;
355
356 e = kmalloc(sizeof(*e), GFP_ATOMIC);
357 if (!e)
358 goto err;
359 e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
360 memcpy(&e->evt, evt, sizeof(*evt));
361 spin_lock_bh(&con->outqueue_lock);
362 list_add_tail(&e->list, &con->outqueue);
363 spin_unlock_bh(&con->outqueue_lock);
364
365 if (queue_work(srv->send_wq, &con->swork))
366 return;
367err:
368 conn_put(con);
369}
370
371bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
372 u32 upper, u32 filter, int *conid)
373{
374 struct tipc_subscr sub;
375 struct tipc_conn *con;
376 int rc;
377
378 sub.seq.type = type;
379 sub.seq.lower = lower;
380 sub.seq.upper = upper;
381 sub.timeout = TIPC_WAIT_FOREVER;
382 sub.filter = filter;
383 *(u32 *)&sub.usr_handle = port;
384
385 con = tipc_alloc_conn(tipc_topsrv(net));
386 if (IS_ERR(con))
387 return false;
388
389 *conid = con->conid;
390 con->sock = NULL;
391 rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
392 if (rc < 0)
393 tipc_close_conn(con);
394 return !rc;
395}
396
397void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
398{
399 struct tipc_conn *con;
400
401 con = tipc_conn_lookup(tipc_topsrv(net), conid);
402 if (!con)
403 return;
404
405 test_and_clear_bit(CF_CONNECTED, &con->flags);
406 tipc_con_delete_sub(con, NULL);
407 conn_put(con);
408 conn_put(con);
409}
410
411static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
412{
413 u32 port = *(u32 *)&evt->s.usr_handle;
414 u32 self = tipc_own_addr(net);
415 struct sk_buff_head evtq;
416 struct sk_buff *skb;
417
418 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
419 self, self, port, port, 0);
420 if (!skb)
421 return;
422 msg_set_dest_droppable(buf_msg(skb), true);
423 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
424 skb_queue_head_init(&evtq);
425 __skb_queue_tail(&evtq, skb);
426 tipc_sk_rcv(net, &evtq);
427}
428
429static void tipc_send_to_sock(struct tipc_conn *con)
430{
431 struct list_head *queue = &con->outqueue;
432 struct tipc_server *srv = con->server;
433 struct outqueue_entry *e;
434 struct tipc_event *evt;
435 struct msghdr msg;
436 struct kvec iov;
437 int count = 0;
438 int ret;
439
440 spin_lock_bh(&con->outqueue_lock);
441
442 while (!list_empty(queue)) {
443 e = list_first_entry(queue, struct outqueue_entry, list);
444 evt = &e->evt;
445 spin_unlock_bh(&con->outqueue_lock);
446
447 if (e->inactive)
448 tipc_con_delete_sub(con, &evt->s);
449
450 memset(&msg, 0, sizeof(msg));
451 msg.msg_flags = MSG_DONTWAIT;
452 iov.iov_base = evt;
453 iov.iov_len = sizeof(*evt);
454 msg.msg_name = NULL;
455
456 if (con->sock) {
457 ret = kernel_sendmsg(con->sock, &msg, &iov,
458 1, sizeof(*evt));
459 if (ret == -EWOULDBLOCK || ret == 0) {
460 cond_resched();
461 return;
462 } else if (ret < 0) {
463 return tipc_close_conn(con);
464 }
465 } else {
466 tipc_send_kern_top_evt(srv->net, evt);
467 }
468
469 /* Don't starve users filling buffers */
470 if (++count >= MAX_SEND_MSG_COUNT) {
471 cond_resched();
472 count = 0;
473 }
474 spin_lock_bh(&con->outqueue_lock);
475 list_del(&e->list);
476 kfree(e);
477 }
478 spin_unlock_bh(&con->outqueue_lock);
479}
480
481static void tipc_recv_work(struct work_struct *work)
482{
483 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
484 int count = 0;
485
486 while (connected(con)) {
487 if (tipc_receive_from_sock(con))
488 break;
489
490 /* Don't flood Rx machine */
491 if (++count >= MAX_RECV_MSG_COUNT) {
492 cond_resched();
493 count = 0;
494 }
495 }
496 conn_put(con);
497}
498
499static void tipc_send_work(struct work_struct *work)
500{
501 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
502
503 if (connected(con))
504 tipc_send_to_sock(con);
505
506 conn_put(con);
507}
508
509static void tipc_accept_from_sock(struct work_struct *work)
510{
511 struct tipc_server *srv = container_of(work, struct tipc_server, awork);
512 struct socket *lsock = srv->listener;
513 struct socket *newsock;
514 struct tipc_conn *con;
515 struct sock *newsk;
516 int ret;
517
518 while (1) {
519 ret = kernel_accept(lsock, &newsock, O_NONBLOCK);
520 if (ret < 0)
521 return;
522 con = tipc_alloc_conn(srv);
523 if (IS_ERR(con)) {
524 ret = PTR_ERR(con);
525 sock_release(newsock);
526 return;
527 }
528 /* Register callbacks */
529 newsk = newsock->sk;
530 write_lock_bh(&newsk->sk_callback_lock);
531 newsk->sk_data_ready = sock_data_ready;
532 newsk->sk_write_space = sock_write_space;
533 newsk->sk_user_data = con;
534 con->sock = newsock;
535 write_unlock_bh(&newsk->sk_callback_lock);
536
537 /* Wake up receive process in case of 'SYN+' message */
538 newsk->sk_data_ready(newsk);
539 }
540}
541
542/* listener_sock_data_ready - interrupt callback indicating new connection
543 * The queued job is launched into tipc_accept_from_sock()
544 */
545static void listener_sock_data_ready(struct sock *sk)
546{
547 struct tipc_server *srv;
548
549 read_lock_bh(&sk->sk_callback_lock);
550 srv = sk->sk_user_data;
551 if (srv->listener)
552 queue_work(srv->rcv_wq, &srv->awork);
553 read_unlock_bh(&sk->sk_callback_lock);
554}
555
556static int tipc_create_listener_sock(struct tipc_server *srv)
557{
558 int imp = TIPC_CRITICAL_IMPORTANCE;
559 struct socket *lsock = NULL;
560 struct sockaddr_tipc saddr;
561 struct sock *sk;
562 int rc;
563
564 rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock);
565 if (rc < 0)
566 return rc;
567
568 srv->listener = lsock;
569 sk = lsock->sk;
570 write_lock_bh(&sk->sk_callback_lock);
571 sk->sk_data_ready = listener_sock_data_ready;
572 sk->sk_user_data = srv;
573 write_unlock_bh(&sk->sk_callback_lock);
574
575 rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE,
576 (char *)&imp, sizeof(imp));
577 if (rc < 0)
578 goto err;
579
580 saddr.family = AF_TIPC;
581 saddr.addrtype = TIPC_ADDR_NAMESEQ;
582 saddr.addr.nameseq.type = TIPC_TOP_SRV;
583 saddr.addr.nameseq.lower = TIPC_TOP_SRV;
584 saddr.addr.nameseq.upper = TIPC_TOP_SRV;
585 saddr.scope = TIPC_NODE_SCOPE;
586
587 rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr));
588 if (rc < 0)
589 goto err;
590 rc = kernel_listen(lsock, 0);
591 if (rc < 0)
592 goto err;
593
594 /* As server's listening socket owner and creator is the same module,
595 * we have to decrease TIPC module reference count to guarantee that
596 * it remains zero after the server socket is created, otherwise,
597 * executing "rmmod" command is unable to make TIPC module deleted
598 * after TIPC module is inserted successfully.
599 *
600 * However, the reference count is ever increased twice in
601 * sock_create_kern(): one is to increase the reference count of owner
602 * of TIPC socket's proto_ops struct; another is to increment the
603 * reference count of owner of TIPC proto struct. Therefore, we must
604 * decrement the module reference count twice to ensure that it keeps
605 * zero after server's listening socket is created. Of course, we
606 * must bump the module reference count twice as well before the socket
607 * is closed.
608 */
609 module_put(lsock->ops->owner);
610 module_put(sk->sk_prot_creator->owner);
611
612 return 0;
613err:
614 sock_release(lsock);
615 return -EINVAL;
616}
617
618static int tipc_work_start(struct tipc_server *s)
619{
620 s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
621 if (!s->rcv_wq) {
622 pr_err("can't start tipc receive workqueue\n");
623 return -ENOMEM;
624 }
625
626 s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
627 if (!s->send_wq) {
628 pr_err("can't start tipc send workqueue\n");
629 destroy_workqueue(s->rcv_wq);
630 return -ENOMEM;
631 }
632
633 return 0;
634}
635
636static void tipc_work_stop(struct tipc_server *s)
637{
638 destroy_workqueue(s->rcv_wq);
639 destroy_workqueue(s->send_wq);
640}
641
642int tipc_topsrv_start(struct net *net)
643{
644 struct tipc_net *tn = tipc_net(net);
645 const char name[] = "topology_server";
646 struct tipc_server *srv;
647 int ret;
648
649 srv = kzalloc(sizeof(*srv), GFP_ATOMIC);
650 if (!srv)
651 return -ENOMEM;
652
653 srv->net = net;
654 srv->max_rcvbuf_size = sizeof(struct tipc_subscr);
655 INIT_WORK(&srv->awork, tipc_accept_from_sock);
656
657 strncpy(srv->name, name, strlen(name) + 1);
658 tn->topsrv = srv;
659 atomic_set(&tn->subscription_count, 0);
660
661 spin_lock_init(&srv->idr_lock);
662 idr_init(&srv->conn_idr);
663 srv->idr_in_use = 0;
664
665 ret = tipc_work_start(srv);
666 if (ret < 0)
667 return ret;
668
669 ret = tipc_create_listener_sock(srv);
670 if (ret < 0)
671 tipc_work_stop(srv);
672
673 return ret;
674}
675
676void tipc_topsrv_stop(struct net *net)
677{
678 struct tipc_server *srv = tipc_topsrv(net);
679 struct socket *lsock = srv->listener;
680 struct tipc_conn *con;
681 int id;
682
683 spin_lock_bh(&srv->idr_lock);
684 for (id = 0; srv->idr_in_use; id++) {
685 con = idr_find(&srv->conn_idr, id);
686 if (con) {
687 spin_unlock_bh(&srv->idr_lock);
688 tipc_close_conn(con);
689 spin_lock_bh(&srv->idr_lock);
690 }
691 }
692 __module_get(lsock->ops->owner);
693 __module_get(lsock->sk->sk_prot_creator->owner);
694 sock_release(lsock);
695 srv->listener = NULL;
696 spin_unlock_bh(&srv->idr_lock);
697 tipc_work_stop(srv);
698 idr_destroy(&srv->conn_idr);
699 kfree(srv);
700}