aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2013-06-17 10:54:39 -0400
committerDavid S. Miller <davem@davemloft.net>2013-06-17 18:53:00 -0400
commitc5fa7b3cf3cb22e4ac60485fc2dc187fe012910f (patch)
tree9cc84cfb44bd0962132f9506adec55fe8fb433d4 /net
parent5d21cb70db0122507cd18f58b4a9112583c1e075 (diff)
tipc: introduce new TIPC server infrastructure
TIPC has two internal servers, one providing a subscription service for topology events, and another providing the configuration interface. These servers have previously been running in BH context, accessing the TIPC-port (aka native) API directly. Apart from these servers, even the TIPC socket implementation is partially built on this API. As this API may simultaneously be called via different paths and in different contexts, a complex and costly lock policiy is required in order to protect TIPC internal resources. To eliminate the need for this complex lock policiy, we introduce a new, generic service API that uses kernel sockets for message passing instead of the native API. Once the toplogy and configuration servers are converted to use this new service, all code pertaining to the native API can be removed. This entails a significant reduction in code amount and complexity, and opens up for a complete rework of the locking policy in TIPC. The new service also solves another problem: As the current topology server works in BH context, it cannot easily be blocked when sending of events fails due to congestion. In such cases events may have to be silently dropped, something that is unacceptable. Therefore, the new service keeps a dedicated outbound queue receiving messages from BH context. Once messages are inserted into this queue, we will immediately schedule a work from a special workqueue. This way, messages/events from the topology server are in reality sent in process context, and the server can block if necessary. Analogously, there is a new workqueue for receiving messages. Once a notification about an arriving message is received in BH context, we schedule a work from the receive workqueue to do the job of receiving the message in process context. As both sending and receive messages are now finished in processes, subscribed events cannot be dropped any more. As of this commit, this new server infrastructure is built, but not actually yet called by the existing TIPC code, but since the conversion changes required in order to use it are significant, the addition is kept here as a separate commit. Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/core.h8
-rw-r--r--net/tipc/server.c596
-rw-r--r--net/tipc/server.h94
-rw-r--r--net/tipc/socket.c99
5 files changed, 789 insertions, 10 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 02636d0a90cf..b282f7130d2b 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o config.o \
8 core.o handler.o link.o discover.o msg.o \ 8 core.o handler.o link.o discover.o msg.o \
9 name_distr.o subscr.o name_table.o net.o \ 9 name_distr.o subscr.o name_table.o net.o \
10 netlink.o node.o node_subscr.o port.o ref.o \ 10 netlink.o node.o node_subscr.o port.o ref.o \
11 socket.o log.o eth_media.o 11 socket.o log.o eth_media.o server.o
12 12
13tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 13tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
14tipc-$(CONFIG_SYSCTL) += sysctl.o 14tipc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/tipc/core.h b/net/tipc/core.h
index fe7f2b7c19fe..be72f8cebc53 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -1,8 +1,8 @@
1/* 1/*
2 * net/tipc/core.h: Include file for TIPC global declarations 2 * net/tipc/core.h: Include file for TIPC global declarations
3 * 3 *
4 * Copyright (c) 2005-2006, Ericsson AB 4 * Copyright (c) 2005-2006, 2013 Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -97,6 +97,10 @@ extern int tipc_netlink_start(void);
97extern void tipc_netlink_stop(void); 97extern void tipc_netlink_stop(void);
98extern int tipc_socket_init(void); 98extern int tipc_socket_init(void);
99extern void tipc_socket_stop(void); 99extern void tipc_socket_stop(void);
100extern int tipc_sock_create_local(int type, struct socket **res);
101extern void tipc_sock_release_local(struct socket *sock);
102extern int tipc_sock_accept_local(struct socket *sock,
103 struct socket **newsock, int flags);
100 104
101#ifdef CONFIG_SYSCTL 105#ifdef CONFIG_SYSCTL
102extern int tipc_register_sysctl(void); 106extern int tipc_register_sysctl(void);
diff --git a/net/tipc/server.c b/net/tipc/server.c
new file mode 100644
index 000000000000..19da5abe0fa6
--- /dev/null
+++ b/net/tipc/server.c
@@ -0,0 +1,596 @@
1/*
2 * net/tipc/server.c: TIPC server infrastructure
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "server.h"
37#include "core.h"
38#include <net/sock.h>
39
40/* Number of messages to send before rescheduling */
41#define MAX_SEND_MSG_COUNT 25
42#define MAX_RECV_MSG_COUNT 25
43#define CF_CONNECTED 1
44
45#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
46
47/**
48 * struct tipc_conn - TIPC connection structure
49 * @kref: reference counter to connection object
50 * @conid: connection identifier
51 * @sock: socket handler associated with connection
52 * @flags: indicates connection state
53 * @server: pointer to connected server
54 * @rwork: receive work item
55 * @usr_data: user-specified field
56 * @rx_action: what to do when connection socket is active
57 * @outqueue: pointer to first outbound message in queue
58 * @outqueue_lock: controll access to the outqueue
59 * @outqueue: list of connection objects for its server
60 * @swork: send work item
61 */
62struct tipc_conn {
63 struct kref kref;
64 int conid;
65 struct socket *sock;
66 unsigned long flags;
67 struct tipc_server *server;
68 struct work_struct rwork;
69 int (*rx_action) (struct tipc_conn *con);
70 void *usr_data;
71 struct list_head outqueue;
72 spinlock_t outqueue_lock;
73 struct work_struct swork;
74};
75
76/* An entry waiting to be sent */
77struct outqueue_entry {
78 struct list_head list;
79 struct kvec iov;
80 struct sockaddr_tipc dest;
81};
82
83static void tipc_recv_work(struct work_struct *work);
84static void tipc_send_work(struct work_struct *work);
85static void tipc_clean_outqueues(struct tipc_conn *con);
86
87static void tipc_conn_kref_release(struct kref *kref)
88{
89 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
90 struct tipc_server *s = con->server;
91
92 if (con->sock) {
93 tipc_sock_release_local(con->sock);
94 con->sock = NULL;
95 }
96
97 tipc_clean_outqueues(con);
98
99 if (con->conid)
100 s->tipc_conn_shutdown(con->conid, con->usr_data);
101
102 kfree(con);
103}
104
105static void conn_put(struct tipc_conn *con)
106{
107 kref_put(&con->kref, tipc_conn_kref_release);
108}
109
110static void conn_get(struct tipc_conn *con)
111{
112 kref_get(&con->kref);
113}
114
115static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
116{
117 struct tipc_conn *con;
118
119 spin_lock_bh(&s->idr_lock);
120 con = idr_find(&s->conn_idr, conid);
121 if (con)
122 conn_get(con);
123 spin_unlock_bh(&s->idr_lock);
124 return con;
125}
126
127static void sock_data_ready(struct sock *sk, int unused)
128{
129 struct tipc_conn *con;
130
131 read_lock(&sk->sk_callback_lock);
132 con = sock2con(sk);
133 if (con && test_bit(CF_CONNECTED, &con->flags)) {
134 conn_get(con);
135 if (!queue_work(con->server->rcv_wq, &con->rwork))
136 conn_put(con);
137 }
138 read_unlock(&sk->sk_callback_lock);
139}
140
141static void sock_write_space(struct sock *sk)
142{
143 struct tipc_conn *con;
144
145 read_lock(&sk->sk_callback_lock);
146 con = sock2con(sk);
147 if (con && test_bit(CF_CONNECTED, &con->flags)) {
148 conn_get(con);
149 if (!queue_work(con->server->send_wq, &con->swork))
150 conn_put(con);
151 }
152 read_unlock(&sk->sk_callback_lock);
153}
154
155static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
156{
157 struct sock *sk = sock->sk;
158
159 write_lock_bh(&sk->sk_callback_lock);
160
161 sk->sk_data_ready = sock_data_ready;
162 sk->sk_write_space = sock_write_space;
163 sk->sk_user_data = con;
164
165 con->sock = sock;
166
167 write_unlock_bh(&sk->sk_callback_lock);
168}
169
170static void tipc_unregister_callbacks(struct tipc_conn *con)
171{
172 struct sock *sk = con->sock->sk;
173
174 write_lock_bh(&sk->sk_callback_lock);
175 sk->sk_user_data = NULL;
176 write_unlock_bh(&sk->sk_callback_lock);
177}
178
179static void tipc_close_conn(struct tipc_conn *con)
180{
181 struct tipc_server *s = con->server;
182
183 if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
184 spin_lock_bh(&s->idr_lock);
185 idr_remove(&s->conn_idr, con->conid);
186 s->idr_in_use--;
187 spin_unlock_bh(&s->idr_lock);
188
189 tipc_unregister_callbacks(con);
190
191 /* We shouldn't flush pending works as we may be in the
192 * thread. In fact the races with pending rx/tx work structs
193 * are harmless for us here as we have already deleted this
194 * connection from server connection list and set
195 * sk->sk_user_data to 0 before releasing connection object.
196 */
197 kernel_sock_shutdown(con->sock, SHUT_RDWR);
198
199 conn_put(con);
200 }
201}
202
203static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
204{
205 struct tipc_conn *con;
206 int ret;
207
208 con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
209 if (!con)
210 return ERR_PTR(-ENOMEM);
211
212 kref_init(&con->kref);
213 INIT_LIST_HEAD(&con->outqueue);
214 spin_lock_init(&con->outqueue_lock);
215 INIT_WORK(&con->swork, tipc_send_work);
216 INIT_WORK(&con->rwork, tipc_recv_work);
217
218 spin_lock_bh(&s->idr_lock);
219 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
220 if (ret < 0) {
221 kfree(con);
222 spin_unlock_bh(&s->idr_lock);
223 return ERR_PTR(-ENOMEM);
224 }
225 con->conid = ret;
226 s->idr_in_use++;
227 spin_unlock_bh(&s->idr_lock);
228
229 set_bit(CF_CONNECTED, &con->flags);
230 con->server = s;
231
232 return con;
233}
234
235static int tipc_receive_from_sock(struct tipc_conn *con)
236{
237 struct msghdr msg = {};
238 struct tipc_server *s = con->server;
239 struct sockaddr_tipc addr;
240 struct kvec iov;
241 void *buf;
242 int ret;
243
244 buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
245 if (!buf) {
246 ret = -ENOMEM;
247 goto out_close;
248 }
249
250 iov.iov_base = buf;
251 iov.iov_len = s->max_rcvbuf_size;
252 msg.msg_name = &addr;
253 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
254 MSG_DONTWAIT);
255 if (ret <= 0) {
256 kmem_cache_free(s->rcvbuf_cache, buf);
257 goto out_close;
258 }
259
260 s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret);
261
262 kmem_cache_free(s->rcvbuf_cache, buf);
263
264 return 0;
265
266out_close:
267 if (ret != -EWOULDBLOCK)
268 tipc_close_conn(con);
269 else if (ret == 0)
270 /* Don't return success if we really got EOF */
271 ret = -EAGAIN;
272
273 return ret;
274}
275
276static int tipc_accept_from_sock(struct tipc_conn *con)
277{
278 struct tipc_server *s = con->server;
279 struct socket *sock = con->sock;
280 struct socket *newsock;
281 struct tipc_conn *newcon;
282 int ret;
283
284 ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK);
285 if (ret < 0)
286 return ret;
287
288 newcon = tipc_alloc_conn(con->server);
289 if (IS_ERR(newcon)) {
290 ret = PTR_ERR(newcon);
291 sock_release(newsock);
292 return ret;
293 }
294
295 newcon->rx_action = tipc_receive_from_sock;
296 tipc_register_callbacks(newsock, newcon);
297
298 /* Notify that new connection is incoming */
299 newcon->usr_data = s->tipc_conn_new(newcon->conid);
300
301 /* Wake up receive process in case of 'SYN+' message */
302 newsock->sk->sk_data_ready(newsock->sk, 0);
303 return ret;
304}
305
306static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
307{
308 struct tipc_server *s = con->server;
309 struct socket *sock = NULL;
310 int ret;
311
312 ret = tipc_sock_create_local(s->type, &sock);
313 if (ret < 0)
314 return NULL;
315 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
316 (char *)&s->imp, sizeof(s->imp));
317 if (ret < 0)
318 goto create_err;
319 ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
320 if (ret < 0)
321 goto create_err;
322
323 switch (s->type) {
324 case SOCK_STREAM:
325 case SOCK_SEQPACKET:
326 con->rx_action = tipc_accept_from_sock;
327
328 ret = kernel_listen(sock, 0);
329 if (ret < 0)
330 goto create_err;
331 break;
332 case SOCK_DGRAM:
333 case SOCK_RDM:
334 con->rx_action = tipc_receive_from_sock;
335 break;
336 default:
337 pr_err("Unknown socket type %d\n", s->type);
338 goto create_err;
339 }
340 return sock;
341
342create_err:
343 sock_release(sock);
344 con->sock = NULL;
345 return NULL;
346}
347
348static int tipc_open_listening_sock(struct tipc_server *s)
349{
350 struct socket *sock;
351 struct tipc_conn *con;
352
353 con = tipc_alloc_conn(s);
354 if (IS_ERR(con))
355 return PTR_ERR(con);
356
357 sock = tipc_create_listen_sock(con);
358 if (!sock)
359 return -EINVAL;
360
361 tipc_register_callbacks(sock, con);
362 return 0;
363}
364
365static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
366{
367 struct outqueue_entry *entry;
368 void *buf;
369
370 entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
371 if (!entry)
372 return NULL;
373
374 buf = kmalloc(len, GFP_ATOMIC);
375 if (!buf) {
376 kfree(entry);
377 return NULL;
378 }
379
380 memcpy(buf, data, len);
381 entry->iov.iov_base = buf;
382 entry->iov.iov_len = len;
383
384 return entry;
385}
386
387static void tipc_free_entry(struct outqueue_entry *e)
388{
389 kfree(e->iov.iov_base);
390 kfree(e);
391}
392
393static void tipc_clean_outqueues(struct tipc_conn *con)
394{
395 struct outqueue_entry *e, *safe;
396
397 spin_lock_bh(&con->outqueue_lock);
398 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
399 list_del(&e->list);
400 tipc_free_entry(e);
401 }
402 spin_unlock_bh(&con->outqueue_lock);
403}
404
405int tipc_conn_sendmsg(struct tipc_server *s, int conid,
406 struct sockaddr_tipc *addr, void *data, size_t len)
407{
408 struct outqueue_entry *e;
409 struct tipc_conn *con;
410
411 con = tipc_conn_lookup(s, conid);
412 if (!con)
413 return -EINVAL;
414
415 e = tipc_alloc_entry(data, len);
416 if (!e) {
417 conn_put(con);
418 return -ENOMEM;
419 }
420
421 if (addr)
422 memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc));
423
424 spin_lock_bh(&con->outqueue_lock);
425 list_add_tail(&e->list, &con->outqueue);
426 spin_unlock_bh(&con->outqueue_lock);
427
428 if (test_bit(CF_CONNECTED, &con->flags))
429 if (!queue_work(s->send_wq, &con->swork))
430 conn_put(con);
431
432 return 0;
433}
434
435void tipc_conn_terminate(struct tipc_server *s, int conid)
436{
437 struct tipc_conn *con;
438
439 con = tipc_conn_lookup(s, conid);
440 if (con) {
441 tipc_close_conn(con);
442 conn_put(con);
443 }
444}
445
446static void tipc_send_to_sock(struct tipc_conn *con)
447{
448 int count = 0;
449 struct tipc_server *s = con->server;
450 struct outqueue_entry *e;
451 struct msghdr msg;
452 int ret;
453
454 spin_lock_bh(&con->outqueue_lock);
455 while (1) {
456 e = list_entry(con->outqueue.next, struct outqueue_entry,
457 list);
458 if ((struct list_head *) e == &con->outqueue)
459 break;
460 spin_unlock_bh(&con->outqueue_lock);
461
462 memset(&msg, 0, sizeof(msg));
463 msg.msg_flags = MSG_DONTWAIT;
464
465 if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
466 msg.msg_name = &e->dest;
467 msg.msg_namelen = sizeof(struct sockaddr_tipc);
468 }
469 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
470 e->iov.iov_len);
471 if (ret == -EWOULDBLOCK || ret == 0) {
472 cond_resched();
473 goto out;
474 } else if (ret < 0) {
475 goto send_err;
476 }
477
478 /* Don't starve users filling buffers */
479 if (++count >= MAX_SEND_MSG_COUNT) {
480 cond_resched();
481 count = 0;
482 }
483
484 spin_lock_bh(&con->outqueue_lock);
485 list_del(&e->list);
486 tipc_free_entry(e);
487 }
488 spin_unlock_bh(&con->outqueue_lock);
489out:
490 return;
491
492send_err:
493 tipc_close_conn(con);
494}
495
496static void tipc_recv_work(struct work_struct *work)
497{
498 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
499 int count = 0;
500
501 while (test_bit(CF_CONNECTED, &con->flags)) {
502 if (con->rx_action(con))
503 break;
504
505 /* Don't flood Rx machine */
506 if (++count >= MAX_RECV_MSG_COUNT) {
507 cond_resched();
508 count = 0;
509 }
510 }
511 conn_put(con);
512}
513
514static void tipc_send_work(struct work_struct *work)
515{
516 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
517
518 if (test_bit(CF_CONNECTED, &con->flags))
519 tipc_send_to_sock(con);
520
521 conn_put(con);
522}
523
524static void tipc_work_stop(struct tipc_server *s)
525{
526 destroy_workqueue(s->rcv_wq);
527 destroy_workqueue(s->send_wq);
528}
529
530static int tipc_work_start(struct tipc_server *s)
531{
532 s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1);
533 if (!s->rcv_wq) {
534 pr_err("can't start tipc receive workqueue\n");
535 return -ENOMEM;
536 }
537
538 s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1);
539 if (!s->send_wq) {
540 pr_err("can't start tipc send workqueue\n");
541 destroy_workqueue(s->rcv_wq);
542 return -ENOMEM;
543 }
544
545 return 0;
546}
547
548int tipc_server_start(struct tipc_server *s)
549{
550 int ret;
551
552 spin_lock_init(&s->idr_lock);
553 idr_init(&s->conn_idr);
554 s->idr_in_use = 0;
555
556 s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
557 0, SLAB_HWCACHE_ALIGN, NULL);
558 if (!s->rcvbuf_cache)
559 return -ENOMEM;
560
561 ret = tipc_work_start(s);
562 if (ret < 0) {
563 kmem_cache_destroy(s->rcvbuf_cache);
564 return ret;
565 }
566 s->enabled = 1;
567
568 return tipc_open_listening_sock(s);
569}
570
571void tipc_server_stop(struct tipc_server *s)
572{
573 struct tipc_conn *con;
574 int total = 0;
575 int id;
576
577 if (!s->enabled)
578 return;
579
580 s->enabled = 0;
581 spin_lock_bh(&s->idr_lock);
582 for (id = 0; total < s->idr_in_use; id++) {
583 con = idr_find(&s->conn_idr, id);
584 if (con) {
585 total++;
586 spin_unlock_bh(&s->idr_lock);
587 tipc_close_conn(con);
588 spin_lock_bh(&s->idr_lock);
589 }
590 }
591 spin_unlock_bh(&s->idr_lock);
592
593 tipc_work_stop(s);
594 kmem_cache_destroy(s->rcvbuf_cache);
595 idr_destroy(&s->conn_idr);
596}
diff --git a/net/tipc/server.h b/net/tipc/server.h
new file mode 100644
index 000000000000..98b23f20bc0f
--- /dev/null
+++ b/net/tipc/server.h
@@ -0,0 +1,94 @@
1/*
2 * net/tipc/server.h: Include file for TIPC server code
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#ifndef _TIPC_SERVER_H
37#define _TIPC_SERVER_H
38
39#include "core.h"
40
41#define TIPC_SERVER_NAME_LEN 32
42
43/**
44 * struct tipc_server - TIPC server structure
45 * @conn_idr: identifier set of connection
46 * @idr_lock: protect the connection identifier set
47 * @idr_in_use: amount of allocated identifier entry
48 * @rcvbuf_cache: memory cache of server receive buffer
49 * @rcv_wq: receive workqueue
50 * @send_wq: send workqueue
51 * @max_rcvbuf_size: maximum permitted receive message length
52 * @tipc_conn_new: callback will be called when new connection is incoming
53 * @tipc_conn_shutdown: callback will be called when connection is shut down
54 * @tipc_conn_recvmsg: callback will be called when message arrives
55 * @saddr: TIPC server address
56 * @name: server name
57 * @imp: message importance
58 * @type: socket type
59 * @enabled: identify whether server is launched or not
60 */
61struct tipc_server {
62 struct idr conn_idr;
63 spinlock_t idr_lock;
64 int idr_in_use;
65 struct kmem_cache *rcvbuf_cache;
66 struct workqueue_struct *rcv_wq;
67 struct workqueue_struct *send_wq;
68 int max_rcvbuf_size;
69 void *(*tipc_conn_new) (int conid);
70 void (*tipc_conn_shutdown) (int conid, void *usr_data);
71 void (*tipc_conn_recvmsg) (int conid, struct sockaddr_tipc *addr,
72 void *usr_data, void *buf, size_t len);
73 struct sockaddr_tipc *saddr;
74 const char name[TIPC_SERVER_NAME_LEN];
75 int imp;
76 int type;
77 int enabled;
78};
79
80int tipc_conn_sendmsg(struct tipc_server *s, int conid,
81 struct sockaddr_tipc *addr, void *data, size_t len);
82
83/**
84 * tipc_conn_terminate - terminate connection with server
85 *
86 * Note: Must call it in process context since it might sleep
87 */
88void tipc_conn_terminate(struct tipc_server *s, int conid);
89
90int tipc_server_start(struct tipc_server *s);
91
92void tipc_server_stop(struct tipc_server *s);
93
94#endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d5fa708f037d..bd8e2cdeceef 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2,7 +2,7 @@
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, 2012 Ericsson AB 4 * Copyright (c) 2001-2007, 2012 Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2012, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -63,12 +63,15 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
63static void wakeupdispatch(struct tipc_port *tport); 63static void wakeupdispatch(struct tipc_port *tport);
64static void tipc_data_ready(struct sock *sk, int len); 64static void tipc_data_ready(struct sock *sk, int len);
65static void tipc_write_space(struct sock *sk); 65static void tipc_write_space(struct sock *sk);
66static int release(struct socket *sock);
67static int accept(struct socket *sock, struct socket *new_sock, int flags);
66 68
67static const struct proto_ops packet_ops; 69static const struct proto_ops packet_ops;
68static const struct proto_ops stream_ops; 70static const struct proto_ops stream_ops;
69static const struct proto_ops msg_ops; 71static const struct proto_ops msg_ops;
70 72
71static struct proto tipc_proto; 73static struct proto tipc_proto;
74static struct proto tipc_proto_kern;
72 75
73static int sockets_enabled; 76static int sockets_enabled;
74 77
@@ -141,7 +144,7 @@ static void reject_rx_queue(struct sock *sk)
141} 144}
142 145
143/** 146/**
144 * tipc_create - create a TIPC socket 147 * tipc_sk_create - create a TIPC socket
145 * @net: network namespace (must be default network) 148 * @net: network namespace (must be default network)
146 * @sock: pre-allocated socket structure 149 * @sock: pre-allocated socket structure
147 * @protocol: protocol indicator (must be 0) 150 * @protocol: protocol indicator (must be 0)
@@ -152,8 +155,8 @@ static void reject_rx_queue(struct sock *sk)
152 * 155 *
153 * Returns 0 on success, errno otherwise 156 * Returns 0 on success, errno otherwise
154 */ 157 */
155static int tipc_create(struct net *net, struct socket *sock, int protocol, 158static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
156 int kern) 159 int kern)
157{ 160{
158 const struct proto_ops *ops; 161 const struct proto_ops *ops;
159 socket_state state; 162 socket_state state;
@@ -183,7 +186,11 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
183 } 186 }
184 187
185 /* Allocate socket's protocol area */ 188 /* Allocate socket's protocol area */
186 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); 189 if (!kern)
190 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
191 else
192 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
193
187 if (sk == NULL) 194 if (sk == NULL)
188 return -ENOMEM; 195 return -ENOMEM;
189 196
@@ -219,6 +226,78 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
219} 226}
220 227
221/** 228/**
229 * tipc_sock_create_local - create TIPC socket from inside TIPC module
230 * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
231 *
232 * We cannot use sock_creat_kern here because it bumps module user count.
233 * Since socket owner and creator is the same module we must make sure
234 * that module count remains zero for module local sockets, otherwise
235 * we cannot do rmmod.
236 *
237 * Returns 0 on success, errno otherwise
238 */
239int tipc_sock_create_local(int type, struct socket **res)
240{
241 int rc;
242 struct sock *sk;
243
244 rc = sock_create_lite(AF_TIPC, type, 0, res);
245 if (rc < 0) {
246 pr_err("Failed to create kernel socket\n");
247 return rc;
248 }
249 tipc_sk_create(&init_net, *res, 0, 1);
250
251 sk = (*res)->sk;
252
253 return 0;
254}
255
256/**
257 * tipc_sock_release_local - release socket created by tipc_sock_create_local
258 * @sock: the socket to be released.
259 *
260 * Module reference count is not incremented when such sockets are created,
261 * so we must keep it from being decremented when they are released.
262 */
263void tipc_sock_release_local(struct socket *sock)
264{
265 release(sock);
266 sock->ops = NULL;
267 sock_release(sock);
268}
269
270/**
271 * tipc_sock_accept_local - accept a connection on a socket created
272 * with tipc_sock_create_local. Use this function to avoid that
273 * module reference count is inadvertently incremented.
274 *
275 * @sock: the accepting socket
276 * @newsock: reference to the new socket to be created
277 * @flags: socket flags
278 */
279
280int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
281 int flags)
282{
283 struct sock *sk = sock->sk;
284 int ret;
285
286 ret = sock_create_lite(sk->sk_family, sk->sk_type,
287 sk->sk_protocol, newsock);
288 if (ret < 0)
289 return ret;
290
291 ret = accept(sock, *newsock, flags);
292 if (ret < 0) {
293 sock_release(*newsock);
294 return ret;
295 }
296 (*newsock)->ops = sock->ops;
297 return ret;
298}
299
300/**
222 * release - destroy a TIPC socket 301 * release - destroy a TIPC socket
223 * @sock: socket to destroy 302 * @sock: socket to destroy
224 * 303 *
@@ -1529,7 +1608,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
1529 1608
1530 buf = skb_peek(&sk->sk_receive_queue); 1609 buf = skb_peek(&sk->sk_receive_queue);
1531 1610
1532 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); 1611 res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1533 if (res) 1612 if (res)
1534 goto exit; 1613 goto exit;
1535 1614
@@ -1839,7 +1918,7 @@ static const struct proto_ops stream_ops = {
1839static const struct net_proto_family tipc_family_ops = { 1918static const struct net_proto_family tipc_family_ops = {
1840 .owner = THIS_MODULE, 1919 .owner = THIS_MODULE,
1841 .family = AF_TIPC, 1920 .family = AF_TIPC,
1842 .create = tipc_create 1921 .create = tipc_sk_create
1843}; 1922};
1844 1923
1845static struct proto tipc_proto = { 1924static struct proto tipc_proto = {
@@ -1849,6 +1928,12 @@ static struct proto tipc_proto = {
1849 .sysctl_rmem = sysctl_tipc_rmem 1928 .sysctl_rmem = sysctl_tipc_rmem
1850}; 1929};
1851 1930
1931static struct proto tipc_proto_kern = {
1932 .name = "TIPC",
1933 .obj_size = sizeof(struct tipc_sock),
1934 .sysctl_rmem = sysctl_tipc_rmem
1935};
1936
1852/** 1937/**
1853 * tipc_socket_init - initialize TIPC socket interface 1938 * tipc_socket_init - initialize TIPC socket interface
1854 * 1939 *