aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/server.c
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2013-06-17 10:54:39 -0400
committerDavid S. Miller <davem@davemloft.net>2013-06-17 18:53:00 -0400
commitc5fa7b3cf3cb22e4ac60485fc2dc187fe012910f (patch)
tree9cc84cfb44bd0962132f9506adec55fe8fb433d4 /net/tipc/server.c
parent5d21cb70db0122507cd18f58b4a9112583c1e075 (diff)
tipc: introduce new TIPC server infrastructure
TIPC has two internal servers, one providing a subscription service for topology events, and another providing the configuration interface. These servers have previously been running in BH context, accessing the TIPC-port (aka native) API directly. Apart from these servers, even the TIPC socket implementation is partially built on this API. As this API may simultaneously be called via different paths and in different contexts, a complex and costly lock policiy is required in order to protect TIPC internal resources. To eliminate the need for this complex lock policiy, we introduce a new, generic service API that uses kernel sockets for message passing instead of the native API. Once the toplogy and configuration servers are converted to use this new service, all code pertaining to the native API can be removed. This entails a significant reduction in code amount and complexity, and opens up for a complete rework of the locking policy in TIPC. The new service also solves another problem: As the current topology server works in BH context, it cannot easily be blocked when sending of events fails due to congestion. In such cases events may have to be silently dropped, something that is unacceptable. Therefore, the new service keeps a dedicated outbound queue receiving messages from BH context. Once messages are inserted into this queue, we will immediately schedule a work from a special workqueue. This way, messages/events from the topology server are in reality sent in process context, and the server can block if necessary. Analogously, there is a new workqueue for receiving messages. Once a notification about an arriving message is received in BH context, we schedule a work from the receive workqueue to do the job of receiving the message in process context. As both sending and receive messages are now finished in processes, subscribed events cannot be dropped any more. As of this commit, this new server infrastructure is built, but not actually yet called by the existing TIPC code, but since the conversion changes required in order to use it are significant, the addition is kept here as a separate commit. Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/server.c')
-rw-r--r--net/tipc/server.c596
1 files changed, 596 insertions, 0 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
new file mode 100644
index 000000000000..19da5abe0fa6
--- /dev/null
+++ b/net/tipc/server.c
@@ -0,0 +1,596 @@
1/*
2 * net/tipc/server.c: TIPC server infrastructure
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "server.h"
37#include "core.h"
38#include <net/sock.h>
39
40/* Number of messages to send before rescheduling */
41#define MAX_SEND_MSG_COUNT 25
42#define MAX_RECV_MSG_COUNT 25
43#define CF_CONNECTED 1
44
45#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
46
47/**
48 * struct tipc_conn - TIPC connection structure
49 * @kref: reference counter to connection object
50 * @conid: connection identifier
51 * @sock: socket handler associated with connection
52 * @flags: indicates connection state
53 * @server: pointer to connected server
54 * @rwork: receive work item
55 * @usr_data: user-specified field
56 * @rx_action: what to do when connection socket is active
57 * @outqueue: pointer to first outbound message in queue
58 * @outqueue_lock: controll access to the outqueue
59 * @outqueue: list of connection objects for its server
60 * @swork: send work item
61 */
62struct tipc_conn {
63 struct kref kref;
64 int conid;
65 struct socket *sock;
66 unsigned long flags;
67 struct tipc_server *server;
68 struct work_struct rwork;
69 int (*rx_action) (struct tipc_conn *con);
70 void *usr_data;
71 struct list_head outqueue;
72 spinlock_t outqueue_lock;
73 struct work_struct swork;
74};
75
76/* An entry waiting to be sent */
77struct outqueue_entry {
78 struct list_head list;
79 struct kvec iov;
80 struct sockaddr_tipc dest;
81};
82
83static void tipc_recv_work(struct work_struct *work);
84static void tipc_send_work(struct work_struct *work);
85static void tipc_clean_outqueues(struct tipc_conn *con);
86
87static void tipc_conn_kref_release(struct kref *kref)
88{
89 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
90 struct tipc_server *s = con->server;
91
92 if (con->sock) {
93 tipc_sock_release_local(con->sock);
94 con->sock = NULL;
95 }
96
97 tipc_clean_outqueues(con);
98
99 if (con->conid)
100 s->tipc_conn_shutdown(con->conid, con->usr_data);
101
102 kfree(con);
103}
104
105static void conn_put(struct tipc_conn *con)
106{
107 kref_put(&con->kref, tipc_conn_kref_release);
108}
109
110static void conn_get(struct tipc_conn *con)
111{
112 kref_get(&con->kref);
113}
114
115static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
116{
117 struct tipc_conn *con;
118
119 spin_lock_bh(&s->idr_lock);
120 con = idr_find(&s->conn_idr, conid);
121 if (con)
122 conn_get(con);
123 spin_unlock_bh(&s->idr_lock);
124 return con;
125}
126
127static void sock_data_ready(struct sock *sk, int unused)
128{
129 struct tipc_conn *con;
130
131 read_lock(&sk->sk_callback_lock);
132 con = sock2con(sk);
133 if (con && test_bit(CF_CONNECTED, &con->flags)) {
134 conn_get(con);
135 if (!queue_work(con->server->rcv_wq, &con->rwork))
136 conn_put(con);
137 }
138 read_unlock(&sk->sk_callback_lock);
139}
140
141static void sock_write_space(struct sock *sk)
142{
143 struct tipc_conn *con;
144
145 read_lock(&sk->sk_callback_lock);
146 con = sock2con(sk);
147 if (con && test_bit(CF_CONNECTED, &con->flags)) {
148 conn_get(con);
149 if (!queue_work(con->server->send_wq, &con->swork))
150 conn_put(con);
151 }
152 read_unlock(&sk->sk_callback_lock);
153}
154
155static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
156{
157 struct sock *sk = sock->sk;
158
159 write_lock_bh(&sk->sk_callback_lock);
160
161 sk->sk_data_ready = sock_data_ready;
162 sk->sk_write_space = sock_write_space;
163 sk->sk_user_data = con;
164
165 con->sock = sock;
166
167 write_unlock_bh(&sk->sk_callback_lock);
168}
169
170static void tipc_unregister_callbacks(struct tipc_conn *con)
171{
172 struct sock *sk = con->sock->sk;
173
174 write_lock_bh(&sk->sk_callback_lock);
175 sk->sk_user_data = NULL;
176 write_unlock_bh(&sk->sk_callback_lock);
177}
178
179static void tipc_close_conn(struct tipc_conn *con)
180{
181 struct tipc_server *s = con->server;
182
183 if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
184 spin_lock_bh(&s->idr_lock);
185 idr_remove(&s->conn_idr, con->conid);
186 s->idr_in_use--;
187 spin_unlock_bh(&s->idr_lock);
188
189 tipc_unregister_callbacks(con);
190
191 /* We shouldn't flush pending works as we may be in the
192 * thread. In fact the races with pending rx/tx work structs
193 * are harmless for us here as we have already deleted this
194 * connection from server connection list and set
195 * sk->sk_user_data to 0 before releasing connection object.
196 */
197 kernel_sock_shutdown(con->sock, SHUT_RDWR);
198
199 conn_put(con);
200 }
201}
202
203static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
204{
205 struct tipc_conn *con;
206 int ret;
207
208 con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
209 if (!con)
210 return ERR_PTR(-ENOMEM);
211
212 kref_init(&con->kref);
213 INIT_LIST_HEAD(&con->outqueue);
214 spin_lock_init(&con->outqueue_lock);
215 INIT_WORK(&con->swork, tipc_send_work);
216 INIT_WORK(&con->rwork, tipc_recv_work);
217
218 spin_lock_bh(&s->idr_lock);
219 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
220 if (ret < 0) {
221 kfree(con);
222 spin_unlock_bh(&s->idr_lock);
223 return ERR_PTR(-ENOMEM);
224 }
225 con->conid = ret;
226 s->idr_in_use++;
227 spin_unlock_bh(&s->idr_lock);
228
229 set_bit(CF_CONNECTED, &con->flags);
230 con->server = s;
231
232 return con;
233}
234
235static int tipc_receive_from_sock(struct tipc_conn *con)
236{
237 struct msghdr msg = {};
238 struct tipc_server *s = con->server;
239 struct sockaddr_tipc addr;
240 struct kvec iov;
241 void *buf;
242 int ret;
243
244 buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
245 if (!buf) {
246 ret = -ENOMEM;
247 goto out_close;
248 }
249
250 iov.iov_base = buf;
251 iov.iov_len = s->max_rcvbuf_size;
252 msg.msg_name = &addr;
253 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
254 MSG_DONTWAIT);
255 if (ret <= 0) {
256 kmem_cache_free(s->rcvbuf_cache, buf);
257 goto out_close;
258 }
259
260 s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret);
261
262 kmem_cache_free(s->rcvbuf_cache, buf);
263
264 return 0;
265
266out_close:
267 if (ret != -EWOULDBLOCK)
268 tipc_close_conn(con);
269 else if (ret == 0)
270 /* Don't return success if we really got EOF */
271 ret = -EAGAIN;
272
273 return ret;
274}
275
276static int tipc_accept_from_sock(struct tipc_conn *con)
277{
278 struct tipc_server *s = con->server;
279 struct socket *sock = con->sock;
280 struct socket *newsock;
281 struct tipc_conn *newcon;
282 int ret;
283
284 ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK);
285 if (ret < 0)
286 return ret;
287
288 newcon = tipc_alloc_conn(con->server);
289 if (IS_ERR(newcon)) {
290 ret = PTR_ERR(newcon);
291 sock_release(newsock);
292 return ret;
293 }
294
295 newcon->rx_action = tipc_receive_from_sock;
296 tipc_register_callbacks(newsock, newcon);
297
298 /* Notify that new connection is incoming */
299 newcon->usr_data = s->tipc_conn_new(newcon->conid);
300
301 /* Wake up receive process in case of 'SYN+' message */
302 newsock->sk->sk_data_ready(newsock->sk, 0);
303 return ret;
304}
305
306static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
307{
308 struct tipc_server *s = con->server;
309 struct socket *sock = NULL;
310 int ret;
311
312 ret = tipc_sock_create_local(s->type, &sock);
313 if (ret < 0)
314 return NULL;
315 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
316 (char *)&s->imp, sizeof(s->imp));
317 if (ret < 0)
318 goto create_err;
319 ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
320 if (ret < 0)
321 goto create_err;
322
323 switch (s->type) {
324 case SOCK_STREAM:
325 case SOCK_SEQPACKET:
326 con->rx_action = tipc_accept_from_sock;
327
328 ret = kernel_listen(sock, 0);
329 if (ret < 0)
330 goto create_err;
331 break;
332 case SOCK_DGRAM:
333 case SOCK_RDM:
334 con->rx_action = tipc_receive_from_sock;
335 break;
336 default:
337 pr_err("Unknown socket type %d\n", s->type);
338 goto create_err;
339 }
340 return sock;
341
342create_err:
343 sock_release(sock);
344 con->sock = NULL;
345 return NULL;
346}
347
348static int tipc_open_listening_sock(struct tipc_server *s)
349{
350 struct socket *sock;
351 struct tipc_conn *con;
352
353 con = tipc_alloc_conn(s);
354 if (IS_ERR(con))
355 return PTR_ERR(con);
356
357 sock = tipc_create_listen_sock(con);
358 if (!sock)
359 return -EINVAL;
360
361 tipc_register_callbacks(sock, con);
362 return 0;
363}
364
365static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
366{
367 struct outqueue_entry *entry;
368 void *buf;
369
370 entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
371 if (!entry)
372 return NULL;
373
374 buf = kmalloc(len, GFP_ATOMIC);
375 if (!buf) {
376 kfree(entry);
377 return NULL;
378 }
379
380 memcpy(buf, data, len);
381 entry->iov.iov_base = buf;
382 entry->iov.iov_len = len;
383
384 return entry;
385}
386
387static void tipc_free_entry(struct outqueue_entry *e)
388{
389 kfree(e->iov.iov_base);
390 kfree(e);
391}
392
393static void tipc_clean_outqueues(struct tipc_conn *con)
394{
395 struct outqueue_entry *e, *safe;
396
397 spin_lock_bh(&con->outqueue_lock);
398 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
399 list_del(&e->list);
400 tipc_free_entry(e);
401 }
402 spin_unlock_bh(&con->outqueue_lock);
403}
404
405int tipc_conn_sendmsg(struct tipc_server *s, int conid,
406 struct sockaddr_tipc *addr, void *data, size_t len)
407{
408 struct outqueue_entry *e;
409 struct tipc_conn *con;
410
411 con = tipc_conn_lookup(s, conid);
412 if (!con)
413 return -EINVAL;
414
415 e = tipc_alloc_entry(data, len);
416 if (!e) {
417 conn_put(con);
418 return -ENOMEM;
419 }
420
421 if (addr)
422 memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc));
423
424 spin_lock_bh(&con->outqueue_lock);
425 list_add_tail(&e->list, &con->outqueue);
426 spin_unlock_bh(&con->outqueue_lock);
427
428 if (test_bit(CF_CONNECTED, &con->flags))
429 if (!queue_work(s->send_wq, &con->swork))
430 conn_put(con);
431
432 return 0;
433}
434
435void tipc_conn_terminate(struct tipc_server *s, int conid)
436{
437 struct tipc_conn *con;
438
439 con = tipc_conn_lookup(s, conid);
440 if (con) {
441 tipc_close_conn(con);
442 conn_put(con);
443 }
444}
445
446static void tipc_send_to_sock(struct tipc_conn *con)
447{
448 int count = 0;
449 struct tipc_server *s = con->server;
450 struct outqueue_entry *e;
451 struct msghdr msg;
452 int ret;
453
454 spin_lock_bh(&con->outqueue_lock);
455 while (1) {
456 e = list_entry(con->outqueue.next, struct outqueue_entry,
457 list);
458 if ((struct list_head *) e == &con->outqueue)
459 break;
460 spin_unlock_bh(&con->outqueue_lock);
461
462 memset(&msg, 0, sizeof(msg));
463 msg.msg_flags = MSG_DONTWAIT;
464
465 if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
466 msg.msg_name = &e->dest;
467 msg.msg_namelen = sizeof(struct sockaddr_tipc);
468 }
469 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
470 e->iov.iov_len);
471 if (ret == -EWOULDBLOCK || ret == 0) {
472 cond_resched();
473 goto out;
474 } else if (ret < 0) {
475 goto send_err;
476 }
477
478 /* Don't starve users filling buffers */
479 if (++count >= MAX_SEND_MSG_COUNT) {
480 cond_resched();
481 count = 0;
482 }
483
484 spin_lock_bh(&con->outqueue_lock);
485 list_del(&e->list);
486 tipc_free_entry(e);
487 }
488 spin_unlock_bh(&con->outqueue_lock);
489out:
490 return;
491
492send_err:
493 tipc_close_conn(con);
494}
495
496static void tipc_recv_work(struct work_struct *work)
497{
498 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
499 int count = 0;
500
501 while (test_bit(CF_CONNECTED, &con->flags)) {
502 if (con->rx_action(con))
503 break;
504
505 /* Don't flood Rx machine */
506 if (++count >= MAX_RECV_MSG_COUNT) {
507 cond_resched();
508 count = 0;
509 }
510 }
511 conn_put(con);
512}
513
514static void tipc_send_work(struct work_struct *work)
515{
516 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
517
518 if (test_bit(CF_CONNECTED, &con->flags))
519 tipc_send_to_sock(con);
520
521 conn_put(con);
522}
523
524static void tipc_work_stop(struct tipc_server *s)
525{
526 destroy_workqueue(s->rcv_wq);
527 destroy_workqueue(s->send_wq);
528}
529
530static int tipc_work_start(struct tipc_server *s)
531{
532 s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1);
533 if (!s->rcv_wq) {
534 pr_err("can't start tipc receive workqueue\n");
535 return -ENOMEM;
536 }
537
538 s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1);
539 if (!s->send_wq) {
540 pr_err("can't start tipc send workqueue\n");
541 destroy_workqueue(s->rcv_wq);
542 return -ENOMEM;
543 }
544
545 return 0;
546}
547
548int tipc_server_start(struct tipc_server *s)
549{
550 int ret;
551
552 spin_lock_init(&s->idr_lock);
553 idr_init(&s->conn_idr);
554 s->idr_in_use = 0;
555
556 s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
557 0, SLAB_HWCACHE_ALIGN, NULL);
558 if (!s->rcvbuf_cache)
559 return -ENOMEM;
560
561 ret = tipc_work_start(s);
562 if (ret < 0) {
563 kmem_cache_destroy(s->rcvbuf_cache);
564 return ret;
565 }
566 s->enabled = 1;
567
568 return tipc_open_listening_sock(s);
569}
570
571void tipc_server_stop(struct tipc_server *s)
572{
573 struct tipc_conn *con;
574 int total = 0;
575 int id;
576
577 if (!s->enabled)
578 return;
579
580 s->enabled = 0;
581 spin_lock_bh(&s->idr_lock);
582 for (id = 0; total < s->idr_in_use; id++) {
583 con = idr_find(&s->conn_idr, id);
584 if (con) {
585 total++;
586 spin_unlock_bh(&s->idr_lock);
587 tipc_close_conn(con);
588 spin_lock_bh(&s->idr_lock);
589 }
590 }
591 spin_unlock_bh(&s->idr_lock);
592
593 tipc_work_stop(s);
594 kmem_cache_destroy(s->rcvbuf_cache);
595 idr_destroy(&s->conn_idr);
596}