aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-02-15 04:40:50 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 15:26:34 -0500
commit0ef897be12b8b4cf297b6016e79ec97ec90f2cf6 (patch)
treeb410be206ad34d2f4bc7f32bb9236d4d46141eb4 /net/tipc
parent5c45ab24ac77ea32eae7d3576cf37c3ddb259f80 (diff)
tipc: separate topology server listener socket from subcsriber sockets
We move the listener socket to struct tipc_server and give it its own work item. This makes it easier to follow the code, and entails some simplifications in the reception code in subscriber sockets. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/server.c328
1 files changed, 147 insertions, 181 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 0abbdd698662..0e351e81690e 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -64,7 +64,6 @@
64 * @tipc_conn_new: callback will be called when new connection is incoming 64 * @tipc_conn_new: callback will be called when new connection is incoming
65 * @tipc_conn_release: callback will be called before releasing the connection 65 * @tipc_conn_release: callback will be called before releasing the connection
66 * @tipc_conn_recvmsg: callback will be called when message arrives 66 * @tipc_conn_recvmsg: callback will be called when message arrives
67 * @saddr: TIPC server address
68 * @name: server name 67 * @name: server name
69 * @imp: message importance 68 * @imp: message importance
70 * @type: socket type 69 * @type: socket type
@@ -74,10 +73,11 @@ struct tipc_server {
74 spinlock_t idr_lock; /* for idr list */ 73 spinlock_t idr_lock; /* for idr list */
75 int idr_in_use; 74 int idr_in_use;
76 struct net *net; 75 struct net *net;
76 struct work_struct awork;
77 struct workqueue_struct *rcv_wq; 77 struct workqueue_struct *rcv_wq;
78 struct workqueue_struct *send_wq; 78 struct workqueue_struct *send_wq;
79 int max_rcvbuf_size; 79 int max_rcvbuf_size;
80 struct sockaddr_tipc *saddr; 80 struct socket *listener;
81 char name[TIPC_SERVER_NAME_LEN]; 81 char name[TIPC_SERVER_NAME_LEN];
82}; 82};
83 83
@@ -106,7 +106,6 @@ struct tipc_conn {
106 struct list_head sub_list; 106 struct list_head sub_list;
107 spinlock_t sub_lock; /* for subscription list */ 107 spinlock_t sub_lock; /* for subscription list */
108 struct work_struct rwork; 108 struct work_struct rwork;
109 int (*rx_action) (struct tipc_conn *con);
110 struct list_head outqueue; 109 struct list_head outqueue;
111 spinlock_t outqueue_lock; 110 spinlock_t outqueue_lock;
112 struct work_struct swork; 111 struct work_struct swork;
@@ -121,12 +120,6 @@ struct outqueue_entry {
121 120
122static void tipc_recv_work(struct work_struct *work); 121static void tipc_recv_work(struct work_struct *work);
123static void tipc_send_work(struct work_struct *work); 122static void tipc_send_work(struct work_struct *work);
124static void tipc_clean_outqueues(struct tipc_conn *con);
125
126static struct tipc_conn *sock2con(struct sock *sk)
127{
128 return sk->sk_user_data;
129}
130 123
131static bool connected(struct tipc_conn *con) 124static bool connected(struct tipc_conn *con)
132{ 125{
@@ -137,21 +130,21 @@ static void tipc_conn_kref_release(struct kref *kref)
137{ 130{
138 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); 131 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
139 struct tipc_server *s = con->server; 132 struct tipc_server *s = con->server;
140 struct socket *sock = con->sock; 133 struct outqueue_entry *e, *safe;
141 134
142 if (sock) {
143 if (test_bit(CF_SERVER, &con->flags)) {
144 __module_get(sock->ops->owner);
145 __module_get(sock->sk->sk_prot_creator->owner);
146 }
147 sock_release(sock);
148 con->sock = NULL;
149 }
150 spin_lock_bh(&s->idr_lock); 135 spin_lock_bh(&s->idr_lock);
151 idr_remove(&s->conn_idr, con->conid); 136 idr_remove(&s->conn_idr, con->conid);
152 s->idr_in_use--; 137 s->idr_in_use--;
153 spin_unlock_bh(&s->idr_lock); 138 spin_unlock_bh(&s->idr_lock);
154 tipc_clean_outqueues(con); 139 if (con->sock)
140 sock_release(con->sock);
141
142 spin_lock_bh(&con->outqueue_lock);
143 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
144 list_del(&e->list);
145 kfree(e);
146 }
147 spin_unlock_bh(&con->outqueue_lock);
155 kfree(con); 148 kfree(con);
156} 149}
157 150
@@ -178,14 +171,14 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
178} 171}
179 172
180/* sock_data_ready - interrupt callback indicating the socket has data to read 173/* sock_data_ready - interrupt callback indicating the socket has data to read
181 * The queued job is launched in tipc_recv_from_sock() 174 * The queued work is launched into tipc_recv_work()->tipc_recv_from_sock()
182 */ 175 */
183static void sock_data_ready(struct sock *sk) 176static void sock_data_ready(struct sock *sk)
184{ 177{
185 struct tipc_conn *con; 178 struct tipc_conn *con;
186 179
187 read_lock_bh(&sk->sk_callback_lock); 180 read_lock_bh(&sk->sk_callback_lock);
188 con = sock2con(sk); 181 con = sk->sk_user_data;
189 if (connected(con)) { 182 if (connected(con)) {
190 conn_get(con); 183 conn_get(con);
191 if (!queue_work(con->server->rcv_wq, &con->rwork)) 184 if (!queue_work(con->server->rcv_wq, &con->rwork))
@@ -195,15 +188,15 @@ static void sock_data_ready(struct sock *sk)
195} 188}
196 189
197/* sock_write_space - interrupt callback after a sendmsg EAGAIN 190/* sock_write_space - interrupt callback after a sendmsg EAGAIN
198 * Indicates that there now is more is space in the send buffer 191 * Indicates that there now is more space in the send buffer
199 * The queued job is launched in tipc_send_to_sock() 192 * The queued work is launched into tipc_send_work()->tipc_send_to_sock()
200 */ 193 */
201static void sock_write_space(struct sock *sk) 194static void sock_write_space(struct sock *sk)
202{ 195{
203 struct tipc_conn *con; 196 struct tipc_conn *con;
204 197
205 read_lock_bh(&sk->sk_callback_lock); 198 read_lock_bh(&sk->sk_callback_lock);
206 con = sock2con(sk); 199 con = sk->sk_user_data;
207 if (connected(con)) { 200 if (connected(con)) {
208 conn_get(con); 201 conn_get(con);
209 if (!queue_work(con->server->send_wq, &con->swork)) 202 if (!queue_work(con->server->send_wq, &con->swork))
@@ -212,23 +205,8 @@ static void sock_write_space(struct sock *sk)
212 read_unlock_bh(&sk->sk_callback_lock); 205 read_unlock_bh(&sk->sk_callback_lock);
213} 206}
214 207
215static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
216{
217 struct sock *sk = sock->sk;
218
219 write_lock_bh(&sk->sk_callback_lock);
220
221 sk->sk_data_ready = sock_data_ready;
222 sk->sk_write_space = sock_write_space;
223 sk->sk_user_data = con;
224
225 con->sock = sock;
226
227 write_unlock_bh(&sk->sk_callback_lock);
228}
229
230/* tipc_con_delete_sub - delete a specific or all subscriptions 208/* tipc_con_delete_sub - delete a specific or all subscriptions
231 * for a given subscriber 209 * for a given subscriber
232 */ 210 */
233static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) 211static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
234{ 212{
@@ -268,6 +246,7 @@ static void tipc_close_conn(struct tipc_conn *con)
268 246
269 /* Don't flush pending works, -just let them expire */ 247 /* Don't flush pending works, -just let them expire */
270 kernel_sock_shutdown(con->sock, SHUT_RDWR); 248 kernel_sock_shutdown(con->sock, SHUT_RDWR);
249
271 conn_put(con); 250 conn_put(con);
272} 251}
273 252
@@ -357,117 +336,8 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
357 return ret; 336 return ret;
358} 337}
359 338
360static int tipc_accept_from_sock(struct tipc_conn *con) 339/* tipc_conn_queue_evt() - interrupt level call from a subscription instance
361{ 340 * The queued work is launched into tipc_send_work()->tipc_send_to_sock()
362 struct socket *sock = con->sock;
363 struct socket *newsock;
364 struct tipc_conn *newcon;
365 int ret;
366
367 ret = kernel_accept(sock, &newsock, O_NONBLOCK);
368 if (ret < 0)
369 return ret;
370
371 newcon = tipc_alloc_conn(con->server);
372 if (IS_ERR(newcon)) {
373 ret = PTR_ERR(newcon);
374 sock_release(newsock);
375 return ret;
376 }
377
378 newcon->rx_action = tipc_receive_from_sock;
379 tipc_register_callbacks(newsock, newcon);
380
381 /* Wake up receive process in case of 'SYN+' message */
382 newsock->sk->sk_data_ready(newsock->sk);
383 return ret;
384}
385
386static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
387{
388 struct tipc_server *s = con->server;
389 struct socket *sock = NULL;
390 int imp = TIPC_CRITICAL_IMPORTANCE;
391 int ret;
392
393 ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock);
394 if (ret < 0)
395 return NULL;
396 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
397 (char *)&imp, sizeof(imp));
398 if (ret < 0)
399 goto create_err;
400 ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
401 if (ret < 0)
402 goto create_err;
403
404 con->rx_action = tipc_accept_from_sock;
405 ret = kernel_listen(sock, 0);
406 if (ret < 0)
407 goto create_err;
408
409 /* As server's listening socket owner and creator is the same module,
410 * we have to decrease TIPC module reference count to guarantee that
411 * it remains zero after the server socket is created, otherwise,
412 * executing "rmmod" command is unable to make TIPC module deleted
413 * after TIPC module is inserted successfully.
414 *
415 * However, the reference count is ever increased twice in
416 * sock_create_kern(): one is to increase the reference count of owner
417 * of TIPC socket's proto_ops struct; another is to increment the
418 * reference count of owner of TIPC proto struct. Therefore, we must
419 * decrement the module reference count twice to ensure that it keeps
420 * zero after server's listening socket is created. Of course, we
421 * must bump the module reference count twice as well before the socket
422 * is closed.
423 */
424 module_put(sock->ops->owner);
425 module_put(sock->sk->sk_prot_creator->owner);
426 set_bit(CF_SERVER, &con->flags);
427
428 return sock;
429
430create_err:
431 kernel_sock_shutdown(sock, SHUT_RDWR);
432 sock_release(sock);
433 return NULL;
434}
435
436static int tipc_open_listening_sock(struct tipc_server *s)
437{
438 struct socket *sock;
439 struct tipc_conn *con;
440
441 con = tipc_alloc_conn(s);
442 if (IS_ERR(con))
443 return PTR_ERR(con);
444
445 sock = tipc_create_listen_sock(con);
446 if (!sock) {
447 idr_remove(&s->conn_idr, con->conid);
448 s->idr_in_use--;
449 kfree(con);
450 return -EINVAL;
451 }
452
453 tipc_register_callbacks(sock, con);
454 return 0;
455}
456
457static void tipc_clean_outqueues(struct tipc_conn *con)
458{
459 struct outqueue_entry *e, *safe;
460
461 spin_lock_bh(&con->outqueue_lock);
462 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
463 list_del(&e->list);
464 kfree(e);
465 }
466 spin_unlock_bh(&con->outqueue_lock);
467}
468
469/* tipc_conn_queue_evt - interrupt level call from a subscription instance
470 * The queued job is launched in tipc_send_to_sock()
471 */ 341 */
472void tipc_conn_queue_evt(struct net *net, int conid, 342void tipc_conn_queue_evt(struct net *net, int conid,
473 u32 event, struct tipc_event *evt) 343 u32 event, struct tipc_event *evt)
@@ -588,9 +458,9 @@ static void tipc_send_to_sock(struct tipc_conn *con)
588 1, sizeof(*evt)); 458 1, sizeof(*evt));
589 if (ret == -EWOULDBLOCK || ret == 0) { 459 if (ret == -EWOULDBLOCK || ret == 0) {
590 cond_resched(); 460 cond_resched();
591 goto out; 461 return;
592 } else if (ret < 0) { 462 } else if (ret < 0) {
593 goto err; 463 return tipc_close_conn(con);
594 } 464 }
595 } else { 465 } else {
596 tipc_send_kern_top_evt(srv->net, evt); 466 tipc_send_kern_top_evt(srv->net, evt);
@@ -606,10 +476,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
606 kfree(e); 476 kfree(e);
607 } 477 }
608 spin_unlock_bh(&con->outqueue_lock); 478 spin_unlock_bh(&con->outqueue_lock);
609out:
610 return;
611err:
612 tipc_close_conn(con);
613} 479}
614 480
615static void tipc_recv_work(struct work_struct *work) 481static void tipc_recv_work(struct work_struct *work)
@@ -618,7 +484,7 @@ static void tipc_recv_work(struct work_struct *work)
618 int count = 0; 484 int count = 0;
619 485
620 while (connected(con)) { 486 while (connected(con)) {
621 if (con->rx_action(con)) 487 if (tipc_receive_from_sock(con))
622 break; 488 break;
623 489
624 /* Don't flood Rx machine */ 490 /* Don't flood Rx machine */
@@ -640,10 +506,113 @@ static void tipc_send_work(struct work_struct *work)
640 conn_put(con); 506 conn_put(con);
641} 507}
642 508
643static void tipc_work_stop(struct tipc_server *s) 509static void tipc_accept_from_sock(struct work_struct *work)
644{ 510{
645 destroy_workqueue(s->rcv_wq); 511 struct tipc_server *srv = container_of(work, struct tipc_server, awork);
646 destroy_workqueue(s->send_wq); 512 struct socket *lsock = srv->listener;
513 struct socket *newsock;
514 struct tipc_conn *con;
515 struct sock *newsk;
516 int ret;
517
518 while (1) {
519 ret = kernel_accept(lsock, &newsock, O_NONBLOCK);
520 if (ret < 0)
521 return;
522 con = tipc_alloc_conn(srv);
523 if (IS_ERR(con)) {
524 ret = PTR_ERR(con);
525 sock_release(newsock);
526 return;
527 }
528 /* Register callbacks */
529 newsk = newsock->sk;
530 write_lock_bh(&newsk->sk_callback_lock);
531 newsk->sk_data_ready = sock_data_ready;
532 newsk->sk_write_space = sock_write_space;
533 newsk->sk_user_data = con;
534 con->sock = newsock;
535 write_unlock_bh(&newsk->sk_callback_lock);
536
537 /* Wake up receive process in case of 'SYN+' message */
538 newsk->sk_data_ready(newsk);
539 }
540}
541
542/* listener_sock_data_ready - interrupt callback indicating new connection
543 * The queued job is launched into tipc_accept_from_sock()
544 */
545static void listener_sock_data_ready(struct sock *sk)
546{
547 struct tipc_server *srv;
548
549 read_lock_bh(&sk->sk_callback_lock);
550 srv = sk->sk_user_data;
551 if (srv->listener)
552 queue_work(srv->rcv_wq, &srv->awork);
553 read_unlock_bh(&sk->sk_callback_lock);
554}
555
556static int tipc_create_listener_sock(struct tipc_server *srv)
557{
558 int imp = TIPC_CRITICAL_IMPORTANCE;
559 struct socket *lsock = NULL;
560 struct sockaddr_tipc saddr;
561 struct sock *sk;
562 int rc;
563
564 rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock);
565 if (rc < 0)
566 return rc;
567
568 srv->listener = lsock;
569 sk = lsock->sk;
570 write_lock_bh(&sk->sk_callback_lock);
571 sk->sk_data_ready = listener_sock_data_ready;
572 sk->sk_user_data = srv;
573 write_unlock_bh(&sk->sk_callback_lock);
574
575 rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE,
576 (char *)&imp, sizeof(imp));
577 if (rc < 0)
578 goto err;
579
580 saddr.family = AF_TIPC;
581 saddr.addrtype = TIPC_ADDR_NAMESEQ;
582 saddr.addr.nameseq.type = TIPC_TOP_SRV;
583 saddr.addr.nameseq.lower = TIPC_TOP_SRV;
584 saddr.addr.nameseq.upper = TIPC_TOP_SRV;
585 saddr.scope = TIPC_NODE_SCOPE;
586
587 rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr));
588 if (rc < 0)
589 goto err;
590 rc = kernel_listen(lsock, 0);
591 if (rc < 0)
592 goto err;
593
594 /* As server's listening socket owner and creator is the same module,
595 * we have to decrease TIPC module reference count to guarantee that
596 * it remains zero after the server socket is created, otherwise,
597 * executing "rmmod" command is unable to make TIPC module deleted
598 * after TIPC module is inserted successfully.
599 *
600 * However, the reference count is ever increased twice in
601 * sock_create_kern(): one is to increase the reference count of owner
602 * of TIPC socket's proto_ops struct; another is to increment the
603 * reference count of owner of TIPC proto struct. Therefore, we must
604 * decrement the module reference count twice to ensure that it keeps
605 * zero after server's listening socket is created. Of course, we
606 * must bump the module reference count twice as well before the socket
607 * is closed.
608 */
609 module_put(lsock->ops->owner);
610 module_put(sk->sk_prot_creator->owner);
611
612 return 0;
613err:
614 sock_release(lsock);
615 return -EINVAL;
647} 616}
648 617
649static int tipc_work_start(struct tipc_server *s) 618static int tipc_work_start(struct tipc_server *s)
@@ -664,32 +633,26 @@ static int tipc_work_start(struct tipc_server *s)
664 return 0; 633 return 0;
665} 634}
666 635
636static void tipc_work_stop(struct tipc_server *s)
637{
638 destroy_workqueue(s->rcv_wq);
639 destroy_workqueue(s->send_wq);
640}
641
667int tipc_topsrv_start(struct net *net) 642int tipc_topsrv_start(struct net *net)
668{ 643{
669 struct tipc_net *tn = tipc_net(net); 644 struct tipc_net *tn = tipc_net(net);
670 const char name[] = "topology_server"; 645 const char name[] = "topology_server";
671 struct sockaddr_tipc *saddr;
672 struct tipc_server *srv; 646 struct tipc_server *srv;
673 int ret; 647 int ret;
674 648
675 saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC);
676 if (!saddr)
677 return -ENOMEM;
678 saddr->family = AF_TIPC;
679 saddr->addrtype = TIPC_ADDR_NAMESEQ;
680 saddr->addr.nameseq.type = TIPC_TOP_SRV;
681 saddr->addr.nameseq.lower = TIPC_TOP_SRV;
682 saddr->addr.nameseq.upper = TIPC_TOP_SRV;
683 saddr->scope = TIPC_NODE_SCOPE;
684
685 srv = kzalloc(sizeof(*srv), GFP_ATOMIC); 649 srv = kzalloc(sizeof(*srv), GFP_ATOMIC);
686 if (!srv) { 650 if (!srv)
687 kfree(saddr);
688 return -ENOMEM; 651 return -ENOMEM;
689 } 652
690 srv->net = net; 653 srv->net = net;
691 srv->saddr = saddr; 654 srv->max_rcvbuf_size = sizeof(struct tipc_subscr);
692 srv->max_rcvbuf_size = sizeof(struct tipc_subscr); 655 INIT_WORK(&srv->awork, tipc_accept_from_sock);
693 656
694 strncpy(srv->name, name, strlen(name) + 1); 657 strncpy(srv->name, name, strlen(name) + 1);
695 tn->topsrv = srv; 658 tn->topsrv = srv;
@@ -703,7 +666,7 @@ int tipc_topsrv_start(struct net *net)
703 if (ret < 0) 666 if (ret < 0)
704 return ret; 667 return ret;
705 668
706 ret = tipc_open_listening_sock(srv); 669 ret = tipc_create_listener_sock(srv);
707 if (ret < 0) 670 if (ret < 0)
708 tipc_work_stop(srv); 671 tipc_work_stop(srv);
709 672
@@ -713,6 +676,7 @@ int tipc_topsrv_start(struct net *net)
713void tipc_topsrv_stop(struct net *net) 676void tipc_topsrv_stop(struct net *net)
714{ 677{
715 struct tipc_server *srv = tipc_topsrv(net); 678 struct tipc_server *srv = tipc_topsrv(net);
679 struct socket *lsock = srv->listener;
716 struct tipc_conn *con; 680 struct tipc_conn *con;
717 int id; 681 int id;
718 682
@@ -725,10 +689,12 @@ void tipc_topsrv_stop(struct net *net)
725 spin_lock_bh(&srv->idr_lock); 689 spin_lock_bh(&srv->idr_lock);
726 } 690 }
727 } 691 }
692 __module_get(lsock->ops->owner);
693 __module_get(lsock->sk->sk_prot_creator->owner);
694 sock_release(lsock);
695 srv->listener = NULL;
728 spin_unlock_bh(&srv->idr_lock); 696 spin_unlock_bh(&srv->idr_lock);
729
730 tipc_work_stop(srv); 697 tipc_work_stop(srv);
731 idr_destroy(&srv->conn_idr); 698 idr_destroy(&srv->conn_idr);
732 kfree(srv->saddr);
733 kfree(srv); 699 kfree(srv);
734} 700}