aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/core.h6
-rw-r--r--net/tipc/group.c2
-rw-r--r--net/tipc/name_table.c73
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/server.c710
-rw-r--r--net/tipc/subscr.c361
-rw-r--r--net/tipc/subscr.h66
-rw-r--r--net/tipc/topsrv.c702
-rw-r--r--net/tipc/topsrv.h (renamed from net/tipc/server.h)57
10 files changed, 862 insertions, 1119 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 37bb0bfbd936..1edb7192aa2f 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -9,7 +9,7 @@ tipc-y += addr.o bcast.o bearer.o \
9 core.o link.o discover.o msg.o \ 9 core.o link.o discover.o msg.o \
10 name_distr.o subscr.o monitor.o name_table.o net.o \ 10 name_distr.o subscr.o monitor.o name_table.o net.o \
11 netlink.o netlink_compat.o node.o socket.o eth_media.o \ 11 netlink.o netlink_compat.o node.o socket.o eth_media.o \
12 server.o socket.o group.o 12 topsrv.o socket.o group.o
13 13
14tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o 14tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o
15tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 15tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 20b21af2ff14..ff8b071654f5 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -64,7 +64,7 @@ struct tipc_bearer;
64struct tipc_bc_base; 64struct tipc_bc_base;
65struct tipc_link; 65struct tipc_link;
66struct tipc_name_table; 66struct tipc_name_table;
67struct tipc_server; 67struct tipc_topsrv;
68struct tipc_monitor; 68struct tipc_monitor;
69 69
70#define TIPC_MOD_VER "2.0.0" 70#define TIPC_MOD_VER "2.0.0"
@@ -112,7 +112,7 @@ struct tipc_net {
112 struct list_head dist_queue; 112 struct list_head dist_queue;
113 113
114 /* Topology subscription server */ 114 /* Topology subscription server */
115 struct tipc_server *topsrv; 115 struct tipc_topsrv *topsrv;
116 atomic_t subscription_count; 116 atomic_t subscription_count;
117}; 117};
118 118
@@ -131,7 +131,7 @@ static inline struct list_head *tipc_nodes(struct net *net)
131 return &tipc_net(net)->node_list; 131 return &tipc_net(net)->node_list;
132} 132}
133 133
134static inline struct tipc_server *tipc_topsrv(struct net *net) 134static inline struct tipc_topsrv *tipc_topsrv(struct net *net)
135{ 135{
136 return tipc_net(net)->topsrv; 136 return tipc_net(net)->topsrv;
137} 137}
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 122162a31816..03086ccb7746 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -37,7 +37,7 @@
37#include "addr.h" 37#include "addr.h"
38#include "group.h" 38#include "group.h"
39#include "bcast.h" 39#include "bcast.h"
40#include "server.h" 40#include "topsrv.h"
41#include "msg.h" 41#include "msg.h"
42#include "socket.h" 42#include "socket.h"
43#include "node.h" 43#include "node.h"
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index ed0457cc99d6..e01c9c691ba2 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -326,10 +326,10 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
326 326
327 /* Any subscriptions waiting for notification? */ 327 /* Any subscriptions waiting for notification? */
328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
329 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 329 tipc_sub_report_overlap(s, publ->lower, publ->upper,
330 TIPC_PUBLISHED, publ->ref, 330 TIPC_PUBLISHED, publ->ref,
331 publ->node, publ->scope, 331 publ->node, publ->scope,
332 created_subseq); 332 created_subseq);
333 } 333 }
334 return publ; 334 return publ;
335} 335}
@@ -397,10 +397,9 @@ found:
397 397
398 /* Notify any waiting subscriptions */ 398 /* Notify any waiting subscriptions */
399 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 399 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
400 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 400 tipc_sub_report_overlap(s, publ->lower, publ->upper,
401 TIPC_WITHDRAWN, publ->ref, 401 TIPC_WITHDRAWN, publ->ref, publ->node,
402 publ->node, publ->scope, 402 publ->scope, removed_subseq);
403 removed_subseq);
404 } 403 }
405 404
406 return publ; 405 return publ;
@@ -412,33 +411,37 @@ found:
412 * sequence overlapping with the requested sequence 411 * sequence overlapping with the requested sequence
413 */ 412 */
414static void tipc_nameseq_subscribe(struct name_seq *nseq, 413static void tipc_nameseq_subscribe(struct name_seq *nseq,
415 struct tipc_subscription *s, 414 struct tipc_subscription *sub)
416 bool status)
417{ 415{
418 struct sub_seq *sseq = nseq->sseqs; 416 struct sub_seq *sseq = nseq->sseqs;
419 struct tipc_name_seq ns; 417 struct tipc_name_seq ns;
418 struct tipc_subscr *s = &sub->evt.s;
419 bool no_status;
420 420
421 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); 421 ns.type = tipc_sub_read(s, seq.type);
422 ns.lower = tipc_sub_read(s, seq.lower);
423 ns.upper = tipc_sub_read(s, seq.upper);
424 no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS;
422 425
423 tipc_subscrp_get(s); 426 tipc_sub_get(sub);
424 list_add(&s->nameseq_list, &nseq->subscriptions); 427 list_add(&sub->nameseq_list, &nseq->subscriptions);
425 428
426 if (!status || !sseq) 429 if (no_status || !sseq)
427 return; 430 return;
428 431
429 while (sseq != &nseq->sseqs[nseq->first_free]) { 432 while (sseq != &nseq->sseqs[nseq->first_free]) {
430 if (tipc_subscrp_check_overlap(&ns, sseq->lower, sseq->upper)) { 433 if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) {
431 struct publication *crs; 434 struct publication *crs;
432 struct name_info *info = sseq->info; 435 struct name_info *info = sseq->info;
433 int must_report = 1; 436 int must_report = 1;
434 437
435 list_for_each_entry(crs, &info->zone_list, zone_list) { 438 list_for_each_entry(crs, &info->zone_list, zone_list) {
436 tipc_subscrp_report_overlap(s, sseq->lower, 439 tipc_sub_report_overlap(sub, sseq->lower,
437 sseq->upper, 440 sseq->upper,
438 TIPC_PUBLISHED, 441 TIPC_PUBLISHED,
439 crs->ref, crs->node, 442 crs->ref, crs->node,
440 crs->scope, 443 crs->scope,
441 must_report); 444 must_report);
442 must_report = 0; 445 must_report = 0;
443 } 446 }
444 } 447 }
@@ -808,24 +811,27 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
808/** 811/**
809 * tipc_nametbl_subscribe - add a subscription object to the name table 812 * tipc_nametbl_subscribe - add a subscription object to the name table
810 */ 813 */
811void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) 814void tipc_nametbl_subscribe(struct tipc_subscription *sub)
812{ 815{
813 struct tipc_net *tn = net_generic(s->net, tipc_net_id); 816 struct tipc_net *tn = tipc_net(sub->net);
814 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); 817 struct tipc_subscr *s = &sub->evt.s;
818 u32 type = tipc_sub_read(s, seq.type);
815 int index = hash(type); 819 int index = hash(type);
816 struct name_seq *seq; 820 struct name_seq *seq;
817 struct tipc_name_seq ns; 821 struct tipc_name_seq ns;
818 822
819 spin_lock_bh(&tn->nametbl_lock); 823 spin_lock_bh(&tn->nametbl_lock);
820 seq = nametbl_find_seq(s->net, type); 824 seq = nametbl_find_seq(sub->net, type);
821 if (!seq) 825 if (!seq)
822 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); 826 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
823 if (seq) { 827 if (seq) {
824 spin_lock_bh(&seq->lock); 828 spin_lock_bh(&seq->lock);
825 tipc_nameseq_subscribe(seq, s, status); 829 tipc_nameseq_subscribe(seq, sub);
826 spin_unlock_bh(&seq->lock); 830 spin_unlock_bh(&seq->lock);
827 } else { 831 } else {
828 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); 832 ns.type = tipc_sub_read(s, seq.type);
833 ns.lower = tipc_sub_read(s, seq.lower);
834 ns.upper = tipc_sub_read(s, seq.upper);
829 pr_warn("Failed to create subscription for {%u,%u,%u}\n", 835 pr_warn("Failed to create subscription for {%u,%u,%u}\n",
830 ns.type, ns.lower, ns.upper); 836 ns.type, ns.lower, ns.upper);
831 } 837 }
@@ -835,18 +841,19 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
835/** 841/**
836 * tipc_nametbl_unsubscribe - remove a subscription object from name table 842 * tipc_nametbl_unsubscribe - remove a subscription object from name table
837 */ 843 */
838void tipc_nametbl_unsubscribe(struct tipc_subscription *s) 844void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
839{ 845{
840 struct tipc_net *tn = net_generic(s->net, tipc_net_id); 846 struct tipc_subscr *s = &sub->evt.s;
847 struct tipc_net *tn = tipc_net(sub->net);
841 struct name_seq *seq; 848 struct name_seq *seq;
842 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); 849 u32 type = tipc_sub_read(s, seq.type);
843 850
844 spin_lock_bh(&tn->nametbl_lock); 851 spin_lock_bh(&tn->nametbl_lock);
845 seq = nametbl_find_seq(s->net, type); 852 seq = nametbl_find_seq(sub->net, type);
846 if (seq != NULL) { 853 if (seq != NULL) {
847 spin_lock_bh(&seq->lock); 854 spin_lock_bh(&seq->lock);
848 list_del_init(&s->nameseq_list); 855 list_del_init(&sub->nameseq_list);
849 tipc_subscrp_put(s); 856 tipc_sub_put(sub);
850 if (!seq->first_free && list_empty(&seq->subscriptions)) { 857 if (!seq->first_free && list_empty(&seq->subscriptions)) {
851 hlist_del_init_rcu(&seq->ns_list); 858 hlist_del_init_rcu(&seq->ns_list);
852 kfree(seq->sseqs); 859 kfree(seq->sseqs);
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index f56e7cb3d436..17652602d5e2 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -120,7 +120,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
120struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 120struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
121 u32 lower, u32 node, u32 ref, 121 u32 lower, u32 node, u32 ref,
122 u32 key); 122 u32 key);
123void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status); 123void tipc_nametbl_subscribe(struct tipc_subscription *s);
124void tipc_nametbl_unsubscribe(struct tipc_subscription *s); 124void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
125int tipc_nametbl_init(struct net *net); 125int tipc_nametbl_init(struct net *net);
126void tipc_nametbl_stop(struct net *net); 126void tipc_nametbl_stop(struct net *net);
diff --git a/net/tipc/server.c b/net/tipc/server.c
deleted file mode 100644
index df0c563c90cd..000000000000
--- a/net/tipc/server.c
+++ /dev/null
@@ -1,710 +0,0 @@
1/*
2 * net/tipc/server.c: TIPC server infrastructure
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "server.h"
37#include "core.h"
38#include "socket.h"
39#include "addr.h"
40#include "msg.h"
41#include <net/sock.h>
42#include <linux/module.h>
43
44/* Number of messages to send before rescheduling */
45#define MAX_SEND_MSG_COUNT 25
46#define MAX_RECV_MSG_COUNT 25
47#define CF_CONNECTED 1
48#define CF_SERVER 2
49
50#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
51
52/**
53 * struct tipc_conn - TIPC connection structure
54 * @kref: reference counter to connection object
55 * @conid: connection identifier
56 * @sock: socket handler associated with connection
57 * @flags: indicates connection state
58 * @server: pointer to connected server
59 * @rwork: receive work item
60 * @usr_data: user-specified field
61 * @rx_action: what to do when connection socket is active
62 * @outqueue: pointer to first outbound message in queue
63 * @outqueue_lock: control access to the outqueue
64 * @outqueue: list of connection objects for its server
65 * @swork: send work item
66 */
67struct tipc_conn {
68 struct kref kref;
69 int conid;
70 struct socket *sock;
71 unsigned long flags;
72 struct tipc_server *server;
73 struct work_struct rwork;
74 int (*rx_action) (struct tipc_conn *con);
75 void *usr_data;
76 struct list_head outqueue;
77 spinlock_t outqueue_lock;
78 struct work_struct swork;
79};
80
81/* An entry waiting to be sent */
82struct outqueue_entry {
83 struct list_head list;
84 struct kvec iov;
85 struct sockaddr_tipc dest;
86};
87
88static void tipc_recv_work(struct work_struct *work);
89static void tipc_send_work(struct work_struct *work);
90static void tipc_clean_outqueues(struct tipc_conn *con);
91
92static void tipc_conn_kref_release(struct kref *kref)
93{
94 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
95 struct tipc_server *s = con->server;
96 struct sockaddr_tipc *saddr = s->saddr;
97 struct socket *sock = con->sock;
98 struct sock *sk;
99
100 if (sock) {
101 sk = sock->sk;
102 if (test_bit(CF_SERVER, &con->flags)) {
103 __module_get(sock->ops->owner);
104 __module_get(sk->sk_prot_creator->owner);
105 }
106 saddr->scope = -TIPC_NODE_SCOPE;
107 kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
108 sock_release(sock);
109 con->sock = NULL;
110 }
111 spin_lock_bh(&s->idr_lock);
112 idr_remove(&s->conn_idr, con->conid);
113 s->idr_in_use--;
114 spin_unlock_bh(&s->idr_lock);
115 tipc_clean_outqueues(con);
116 kfree(con);
117}
118
119static void conn_put(struct tipc_conn *con)
120{
121 kref_put(&con->kref, tipc_conn_kref_release);
122}
123
124static void conn_get(struct tipc_conn *con)
125{
126 kref_get(&con->kref);
127}
128
129static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
130{
131 struct tipc_conn *con;
132
133 spin_lock_bh(&s->idr_lock);
134 con = idr_find(&s->conn_idr, conid);
135 if (con) {
136 if (!test_bit(CF_CONNECTED, &con->flags) ||
137 !kref_get_unless_zero(&con->kref))
138 con = NULL;
139 }
140 spin_unlock_bh(&s->idr_lock);
141 return con;
142}
143
144static void sock_data_ready(struct sock *sk)
145{
146 struct tipc_conn *con;
147
148 read_lock_bh(&sk->sk_callback_lock);
149 con = sock2con(sk);
150 if (con && test_bit(CF_CONNECTED, &con->flags)) {
151 conn_get(con);
152 if (!queue_work(con->server->rcv_wq, &con->rwork))
153 conn_put(con);
154 }
155 read_unlock_bh(&sk->sk_callback_lock);
156}
157
158static void sock_write_space(struct sock *sk)
159{
160 struct tipc_conn *con;
161
162 read_lock_bh(&sk->sk_callback_lock);
163 con = sock2con(sk);
164 if (con && test_bit(CF_CONNECTED, &con->flags)) {
165 conn_get(con);
166 if (!queue_work(con->server->send_wq, &con->swork))
167 conn_put(con);
168 }
169 read_unlock_bh(&sk->sk_callback_lock);
170}
171
172static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
173{
174 struct sock *sk = sock->sk;
175
176 write_lock_bh(&sk->sk_callback_lock);
177
178 sk->sk_data_ready = sock_data_ready;
179 sk->sk_write_space = sock_write_space;
180 sk->sk_user_data = con;
181
182 con->sock = sock;
183
184 write_unlock_bh(&sk->sk_callback_lock);
185}
186
187static void tipc_close_conn(struct tipc_conn *con)
188{
189 struct tipc_server *s = con->server;
190 struct sock *sk = con->sock->sk;
191 bool disconnect = false;
192
193 write_lock_bh(&sk->sk_callback_lock);
194 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
195 if (disconnect) {
196 sk->sk_user_data = NULL;
197 if (con->conid)
198 s->tipc_conn_release(con->conid, con->usr_data);
199 }
200 write_unlock_bh(&sk->sk_callback_lock);
201
202 /* Handle concurrent calls from sending and receiving threads */
203 if (!disconnect)
204 return;
205
206 /* Don't flush pending works, -just let them expire */
207 kernel_sock_shutdown(con->sock, SHUT_RDWR);
208 conn_put(con);
209}
210
211static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
212{
213 struct tipc_conn *con;
214 int ret;
215
216 con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
217 if (!con)
218 return ERR_PTR(-ENOMEM);
219
220 kref_init(&con->kref);
221 INIT_LIST_HEAD(&con->outqueue);
222 spin_lock_init(&con->outqueue_lock);
223 INIT_WORK(&con->swork, tipc_send_work);
224 INIT_WORK(&con->rwork, tipc_recv_work);
225
226 spin_lock_bh(&s->idr_lock);
227 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
228 if (ret < 0) {
229 kfree(con);
230 spin_unlock_bh(&s->idr_lock);
231 return ERR_PTR(-ENOMEM);
232 }
233 con->conid = ret;
234 s->idr_in_use++;
235 spin_unlock_bh(&s->idr_lock);
236
237 set_bit(CF_CONNECTED, &con->flags);
238 con->server = s;
239
240 return con;
241}
242
243static int tipc_receive_from_sock(struct tipc_conn *con)
244{
245 struct tipc_server *s = con->server;
246 struct sock *sk = con->sock->sk;
247 struct sockaddr_tipc addr;
248 struct msghdr msg = {};
249 struct kvec iov;
250 void *buf;
251 int ret;
252
253 buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
254 if (!buf) {
255 ret = -ENOMEM;
256 goto out_close;
257 }
258
259 iov.iov_base = buf;
260 iov.iov_len = s->max_rcvbuf_size;
261 msg.msg_name = &addr;
262 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
263 ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
264 if (ret <= 0) {
265 kmem_cache_free(s->rcvbuf_cache, buf);
266 goto out_close;
267 }
268
269 read_lock_bh(&sk->sk_callback_lock);
270 if (test_bit(CF_CONNECTED, &con->flags))
271 ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid,
272 &addr, con->usr_data, buf, ret);
273 read_unlock_bh(&sk->sk_callback_lock);
274 kmem_cache_free(s->rcvbuf_cache, buf);
275 if (ret < 0)
276 tipc_conn_terminate(s, con->conid);
277 return ret;
278
279out_close:
280 if (ret != -EWOULDBLOCK)
281 tipc_close_conn(con);
282 else if (ret == 0)
283 /* Don't return success if we really got EOF */
284 ret = -EAGAIN;
285
286 return ret;
287}
288
289static int tipc_accept_from_sock(struct tipc_conn *con)
290{
291 struct tipc_server *s = con->server;
292 struct socket *sock = con->sock;
293 struct socket *newsock;
294 struct tipc_conn *newcon;
295 int ret;
296
297 ret = kernel_accept(sock, &newsock, O_NONBLOCK);
298 if (ret < 0)
299 return ret;
300
301 newcon = tipc_alloc_conn(con->server);
302 if (IS_ERR(newcon)) {
303 ret = PTR_ERR(newcon);
304 sock_release(newsock);
305 return ret;
306 }
307
308 newcon->rx_action = tipc_receive_from_sock;
309 tipc_register_callbacks(newsock, newcon);
310
311 /* Notify that new connection is incoming */
312 newcon->usr_data = s->tipc_conn_new(newcon->conid);
313 if (!newcon->usr_data) {
314 sock_release(newsock);
315 conn_put(newcon);
316 return -ENOMEM;
317 }
318
319 /* Wake up receive process in case of 'SYN+' message */
320 newsock->sk->sk_data_ready(newsock->sk);
321 return ret;
322}
323
324static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
325{
326 struct tipc_server *s = con->server;
327 struct socket *sock = NULL;
328 int ret;
329
330 ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock);
331 if (ret < 0)
332 return NULL;
333 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
334 (char *)&s->imp, sizeof(s->imp));
335 if (ret < 0)
336 goto create_err;
337 ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
338 if (ret < 0)
339 goto create_err;
340
341 switch (s->type) {
342 case SOCK_STREAM:
343 case SOCK_SEQPACKET:
344 con->rx_action = tipc_accept_from_sock;
345
346 ret = kernel_listen(sock, 0);
347 if (ret < 0)
348 goto create_err;
349 break;
350 case SOCK_DGRAM:
351 case SOCK_RDM:
352 con->rx_action = tipc_receive_from_sock;
353 break;
354 default:
355 pr_err("Unknown socket type %d\n", s->type);
356 goto create_err;
357 }
358
359 /* As server's listening socket owner and creator is the same module,
360 * we have to decrease TIPC module reference count to guarantee that
361 * it remains zero after the server socket is created, otherwise,
362 * executing "rmmod" command is unable to make TIPC module deleted
363 * after TIPC module is inserted successfully.
364 *
365 * However, the reference count is ever increased twice in
366 * sock_create_kern(): one is to increase the reference count of owner
367 * of TIPC socket's proto_ops struct; another is to increment the
368 * reference count of owner of TIPC proto struct. Therefore, we must
369 * decrement the module reference count twice to ensure that it keeps
370 * zero after server's listening socket is created. Of course, we
371 * must bump the module reference count twice as well before the socket
372 * is closed.
373 */
374 module_put(sock->ops->owner);
375 module_put(sock->sk->sk_prot_creator->owner);
376 set_bit(CF_SERVER, &con->flags);
377
378 return sock;
379
380create_err:
381 kernel_sock_shutdown(sock, SHUT_RDWR);
382 sock_release(sock);
383 return NULL;
384}
385
386static int tipc_open_listening_sock(struct tipc_server *s)
387{
388 struct socket *sock;
389 struct tipc_conn *con;
390
391 con = tipc_alloc_conn(s);
392 if (IS_ERR(con))
393 return PTR_ERR(con);
394
395 sock = tipc_create_listen_sock(con);
396 if (!sock) {
397 idr_remove(&s->conn_idr, con->conid);
398 s->idr_in_use--;
399 kfree(con);
400 return -EINVAL;
401 }
402
403 tipc_register_callbacks(sock, con);
404 return 0;
405}
406
407static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
408{
409 struct outqueue_entry *entry;
410 void *buf;
411
412 entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
413 if (!entry)
414 return NULL;
415
416 buf = kmemdup(data, len, GFP_ATOMIC);
417 if (!buf) {
418 kfree(entry);
419 return NULL;
420 }
421
422 entry->iov.iov_base = buf;
423 entry->iov.iov_len = len;
424
425 return entry;
426}
427
428static void tipc_free_entry(struct outqueue_entry *e)
429{
430 kfree(e->iov.iov_base);
431 kfree(e);
432}
433
434static void tipc_clean_outqueues(struct tipc_conn *con)
435{
436 struct outqueue_entry *e, *safe;
437
438 spin_lock_bh(&con->outqueue_lock);
439 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
440 list_del(&e->list);
441 tipc_free_entry(e);
442 }
443 spin_unlock_bh(&con->outqueue_lock);
444}
445
446int tipc_conn_sendmsg(struct tipc_server *s, int conid,
447 struct sockaddr_tipc *addr, void *data, size_t len)
448{
449 struct outqueue_entry *e;
450 struct tipc_conn *con;
451
452 con = tipc_conn_lookup(s, conid);
453 if (!con)
454 return -EINVAL;
455
456 if (!test_bit(CF_CONNECTED, &con->flags)) {
457 conn_put(con);
458 return 0;
459 }
460
461 e = tipc_alloc_entry(data, len);
462 if (!e) {
463 conn_put(con);
464 return -ENOMEM;
465 }
466
467 if (addr)
468 memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc));
469
470 spin_lock_bh(&con->outqueue_lock);
471 list_add_tail(&e->list, &con->outqueue);
472 spin_unlock_bh(&con->outqueue_lock);
473
474 if (!queue_work(s->send_wq, &con->swork))
475 conn_put(con);
476 return 0;
477}
478
479void tipc_conn_terminate(struct tipc_server *s, int conid)
480{
481 struct tipc_conn *con;
482
483 con = tipc_conn_lookup(s, conid);
484 if (con) {
485 tipc_close_conn(con);
486 conn_put(con);
487 }
488}
489
490bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
491 u32 upper, u32 filter, int *conid)
492{
493 struct tipc_subscriber *scbr;
494 struct tipc_subscr sub;
495 struct tipc_server *s;
496 struct tipc_conn *con;
497
498 sub.seq.type = type;
499 sub.seq.lower = lower;
500 sub.seq.upper = upper;
501 sub.timeout = TIPC_WAIT_FOREVER;
502 sub.filter = filter;
503 *(u32 *)&sub.usr_handle = port;
504
505 con = tipc_alloc_conn(tipc_topsrv(net));
506 if (IS_ERR(con))
507 return false;
508
509 *conid = con->conid;
510 s = con->server;
511 scbr = s->tipc_conn_new(*conid);
512 if (!scbr) {
513 conn_put(con);
514 return false;
515 }
516
517 con->usr_data = scbr;
518 con->sock = NULL;
519 s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
520 return true;
521}
522
523void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
524{
525 struct tipc_conn *con;
526 struct tipc_server *srv;
527
528 con = tipc_conn_lookup(tipc_topsrv(net), conid);
529 if (!con)
530 return;
531
532 test_and_clear_bit(CF_CONNECTED, &con->flags);
533 srv = con->server;
534 if (con->conid)
535 srv->tipc_conn_release(con->conid, con->usr_data);
536 conn_put(con);
537 conn_put(con);
538}
539
540static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
541{
542 u32 port = *(u32 *)&evt->s.usr_handle;
543 u32 self = tipc_own_addr(net);
544 struct sk_buff_head evtq;
545 struct sk_buff *skb;
546
547 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
548 self, self, port, port, 0);
549 if (!skb)
550 return;
551 msg_set_dest_droppable(buf_msg(skb), true);
552 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
553 skb_queue_head_init(&evtq);
554 __skb_queue_tail(&evtq, skb);
555 tipc_sk_rcv(net, &evtq);
556}
557
558static void tipc_send_to_sock(struct tipc_conn *con)
559{
560 struct tipc_server *s = con->server;
561 struct outqueue_entry *e;
562 struct tipc_event *evt;
563 struct msghdr msg;
564 int count = 0;
565 int ret;
566
567 spin_lock_bh(&con->outqueue_lock);
568 while (test_bit(CF_CONNECTED, &con->flags)) {
569 e = list_entry(con->outqueue.next, struct outqueue_entry, list);
570 if ((struct list_head *) e == &con->outqueue)
571 break;
572
573 spin_unlock_bh(&con->outqueue_lock);
574
575 if (con->sock) {
576 memset(&msg, 0, sizeof(msg));
577 msg.msg_flags = MSG_DONTWAIT;
578 if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
579 msg.msg_name = &e->dest;
580 msg.msg_namelen = sizeof(struct sockaddr_tipc);
581 }
582 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
583 e->iov.iov_len);
584 if (ret == -EWOULDBLOCK || ret == 0) {
585 cond_resched();
586 goto out;
587 } else if (ret < 0) {
588 goto send_err;
589 }
590 } else {
591 evt = e->iov.iov_base;
592 tipc_send_kern_top_evt(s->net, evt);
593 }
594 /* Don't starve users filling buffers */
595 if (++count >= MAX_SEND_MSG_COUNT) {
596 cond_resched();
597 count = 0;
598 }
599
600 spin_lock_bh(&con->outqueue_lock);
601 list_del(&e->list);
602 tipc_free_entry(e);
603 }
604 spin_unlock_bh(&con->outqueue_lock);
605out:
606 return;
607
608send_err:
609 tipc_close_conn(con);
610}
611
612static void tipc_recv_work(struct work_struct *work)
613{
614 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
615 int count = 0;
616
617 while (test_bit(CF_CONNECTED, &con->flags)) {
618 if (con->rx_action(con))
619 break;
620
621 /* Don't flood Rx machine */
622 if (++count >= MAX_RECV_MSG_COUNT) {
623 cond_resched();
624 count = 0;
625 }
626 }
627 conn_put(con);
628}
629
630static void tipc_send_work(struct work_struct *work)
631{
632 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
633
634 if (test_bit(CF_CONNECTED, &con->flags))
635 tipc_send_to_sock(con);
636
637 conn_put(con);
638}
639
640static void tipc_work_stop(struct tipc_server *s)
641{
642 destroy_workqueue(s->rcv_wq);
643 destroy_workqueue(s->send_wq);
644}
645
646static int tipc_work_start(struct tipc_server *s)
647{
648 s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
649 if (!s->rcv_wq) {
650 pr_err("can't start tipc receive workqueue\n");
651 return -ENOMEM;
652 }
653
654 s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
655 if (!s->send_wq) {
656 pr_err("can't start tipc send workqueue\n");
657 destroy_workqueue(s->rcv_wq);
658 return -ENOMEM;
659 }
660
661 return 0;
662}
663
664int tipc_server_start(struct tipc_server *s)
665{
666 int ret;
667
668 spin_lock_init(&s->idr_lock);
669 idr_init(&s->conn_idr);
670 s->idr_in_use = 0;
671
672 s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
673 0, SLAB_HWCACHE_ALIGN, NULL);
674 if (!s->rcvbuf_cache)
675 return -ENOMEM;
676
677 ret = tipc_work_start(s);
678 if (ret < 0) {
679 kmem_cache_destroy(s->rcvbuf_cache);
680 return ret;
681 }
682 ret = tipc_open_listening_sock(s);
683 if (ret < 0) {
684 tipc_work_stop(s);
685 kmem_cache_destroy(s->rcvbuf_cache);
686 return ret;
687 }
688 return ret;
689}
690
691void tipc_server_stop(struct tipc_server *s)
692{
693 struct tipc_conn *con;
694 int id;
695
696 spin_lock_bh(&s->idr_lock);
697 for (id = 0; s->idr_in_use; id++) {
698 con = idr_find(&s->conn_idr, id);
699 if (con) {
700 spin_unlock_bh(&s->idr_lock);
701 tipc_close_conn(con);
702 spin_lock_bh(&s->idr_lock);
703 }
704 }
705 spin_unlock_bh(&s->idr_lock);
706
707 tipc_work_stop(s);
708 kmem_cache_destroy(s->rcvbuf_cache);
709 idr_destroy(&s->conn_idr);
710}
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 68e26470c516..6925a989569b 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/subscr.c: TIPC network topology service 2 * net/tipc/subscr.c: TIPC network topology service
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2017, Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -38,61 +38,30 @@
38#include "name_table.h" 38#include "name_table.h"
39#include "subscr.h" 39#include "subscr.h"
40 40
41/** 41static void tipc_sub_send_event(struct tipc_subscription *sub,
42 * struct tipc_subscriber - TIPC network topology subscriber 42 u32 found_lower, u32 found_upper,
43 * @kref: reference counter to tipc_subscription object 43 u32 event, u32 port, u32 node)
44 * @conid: connection identifier to server connecting to subscriber
45 * @lock: control access to subscriber
46 * @subscrp_list: list of subscription objects for this subscriber
47 */
48struct tipc_subscriber {
49 struct kref kref;
50 int conid;
51 spinlock_t lock;
52 struct list_head subscrp_list;
53};
54
55static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
56
57/**
58 * htohl - convert value to endianness used by destination
59 * @in: value to convert
60 * @swap: non-zero if endianness must be reversed
61 *
62 * Returns converted value
63 */
64static u32 htohl(u32 in, int swap)
65{
66 return swap ? swab32(in) : in;
67}
68
69static void tipc_subscrp_send_event(struct tipc_subscription *sub,
70 u32 found_lower, u32 found_upper,
71 u32 event, u32 port_ref, u32 node)
72{ 44{
73 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 45 struct tipc_event *evt = &sub->evt;
74 struct tipc_subscriber *subscriber = sub->subscriber;
75 struct kvec msg_sect;
76 46
77 msg_sect.iov_base = (void *)&sub->evt; 47 if (sub->inactive)
78 msg_sect.iov_len = sizeof(struct tipc_event); 48 return;
79 sub->evt.event = htohl(event, sub->swap); 49 tipc_evt_write(evt, event, event);
80 sub->evt.found_lower = htohl(found_lower, sub->swap); 50 tipc_evt_write(evt, found_lower, found_lower);
81 sub->evt.found_upper = htohl(found_upper, sub->swap); 51 tipc_evt_write(evt, found_upper, found_upper);
82 sub->evt.port.ref = htohl(port_ref, sub->swap); 52 tipc_evt_write(evt, port.ref, port);
83 sub->evt.port.node = htohl(node, sub->swap); 53 tipc_evt_write(evt, port.node, node);
84 tipc_conn_sendmsg(tn->topsrv, subscriber->conid, NULL, 54 tipc_topsrv_queue_evt(sub->net, sub->conid, event, evt);
85 msg_sect.iov_base, msg_sect.iov_len);
86} 55}
87 56
88/** 57/**
89 * tipc_subscrp_check_overlap - test for subscription overlap with the 58 * tipc_sub_check_overlap - test for subscription overlap with the
90 * given values 59 * given values
91 * 60 *
92 * Returns 1 if there is overlap, otherwise 0. 61 * Returns 1 if there is overlap, otherwise 0.
93 */ 62 */
94int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, 63int tipc_sub_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
95 u32 found_upper) 64 u32 found_upper)
96{ 65{
97 if (found_lower < seq->lower) 66 if (found_lower < seq->lower)
98 found_lower = seq->lower; 67 found_lower = seq->lower;
@@ -103,298 +72,98 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
103 return 1; 72 return 1;
104} 73}
105 74
106u32 tipc_subscrp_convert_seq_type(u32 type, int swap) 75void tipc_sub_report_overlap(struct tipc_subscription *sub,
107{ 76 u32 found_lower, u32 found_upper,
108 return htohl(type, swap); 77 u32 event, u32 port, u32 node,
109} 78 u32 scope, int must)
110
111void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
112 struct tipc_name_seq *out)
113{
114 out->type = htohl(in->type, swap);
115 out->lower = htohl(in->lower, swap);
116 out->upper = htohl(in->upper, swap);
117}
118
119void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
120 u32 found_upper, u32 event, u32 port_ref,
121 u32 node, u32 scope, int must)
122{ 79{
123 u32 filter = htohl(sub->evt.s.filter, sub->swap); 80 struct tipc_subscr *s = &sub->evt.s;
81 u32 filter = tipc_sub_read(s, filter);
124 struct tipc_name_seq seq; 82 struct tipc_name_seq seq;
125 83
126 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); 84 seq.type = tipc_sub_read(s, seq.type);
127 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) 85 seq.lower = tipc_sub_read(s, seq.lower);
86 seq.upper = tipc_sub_read(s, seq.upper);
87
88 if (!tipc_sub_check_overlap(&seq, found_lower, found_upper))
128 return; 89 return;
90
129 if (!must && !(filter & TIPC_SUB_PORTS)) 91 if (!must && !(filter & TIPC_SUB_PORTS))
130 return; 92 return;
131 if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE) 93 if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
132 return; 94 return;
133 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) 95 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
134 return; 96 return;
135 97 spin_lock(&sub->lock);
136 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, 98 tipc_sub_send_event(sub, found_lower, found_upper,
137 node); 99 event, port, node);
100 spin_unlock(&sub->lock);
138} 101}
139 102
140static void tipc_subscrp_timeout(struct timer_list *t) 103static void tipc_sub_timeout(struct timer_list *t)
141{ 104{
142 struct tipc_subscription *sub = from_timer(sub, t, timer); 105 struct tipc_subscription *sub = from_timer(sub, t, timer);
143 struct tipc_subscriber *subscriber = sub->subscriber; 106 struct tipc_subscr *s = &sub->evt.s;
144
145 spin_lock_bh(&subscriber->lock);
146 tipc_nametbl_unsubscribe(sub);
147 list_del(&sub->subscrp_list);
148 spin_unlock_bh(&subscriber->lock);
149
150 /* Notify subscriber of timeout */
151 tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
152 TIPC_SUBSCR_TIMEOUT, 0, 0);
153
154 tipc_subscrp_put(sub);
155}
156
157static void tipc_subscrb_kref_release(struct kref *kref)
158{
159 kfree(container_of(kref,struct tipc_subscriber, kref));
160}
161
162static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
163{
164 kref_put(&subscriber->kref, tipc_subscrb_kref_release);
165}
166 107
167static void tipc_subscrb_get(struct tipc_subscriber *subscriber) 108 spin_lock(&sub->lock);
168{ 109 tipc_sub_send_event(sub, s->seq.lower, s->seq.upper,
169 kref_get(&subscriber->kref); 110 TIPC_SUBSCR_TIMEOUT, 0, 0);
111 sub->inactive = true;
112 spin_unlock(&sub->lock);
170} 113}
171 114
172static void tipc_subscrp_kref_release(struct kref *kref) 115static void tipc_sub_kref_release(struct kref *kref)
173{ 116{
174 struct tipc_subscription *sub = container_of(kref, 117 kfree(container_of(kref, struct tipc_subscription, kref));
175 struct tipc_subscription,
176 kref);
177 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
178 struct tipc_subscriber *subscriber = sub->subscriber;
179
180 atomic_dec(&tn->subscription_count);
181 kfree(sub);
182 tipc_subscrb_put(subscriber);
183} 118}
184 119
185void tipc_subscrp_put(struct tipc_subscription *subscription) 120void tipc_sub_put(struct tipc_subscription *subscription)
186{ 121{
187 kref_put(&subscription->kref, tipc_subscrp_kref_release); 122 kref_put(&subscription->kref, tipc_sub_kref_release);
188} 123}
189 124
190void tipc_subscrp_get(struct tipc_subscription *subscription) 125void tipc_sub_get(struct tipc_subscription *subscription)
191{ 126{
192 kref_get(&subscription->kref); 127 kref_get(&subscription->kref);
193} 128}
194 129
195/* tipc_subscrb_subscrp_delete - delete a specific subscription or all 130struct tipc_subscription *tipc_sub_subscribe(struct net *net,
196 * subscriptions for a given subscriber. 131 struct tipc_subscr *s,
197 */ 132 int conid)
198static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
199 struct tipc_subscr *s)
200{
201 struct list_head *subscription_list = &subscriber->subscrp_list;
202 struct tipc_subscription *sub, *temp;
203 u32 timeout;
204
205 spin_lock_bh(&subscriber->lock);
206 list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
207 if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
208 continue;
209
210 timeout = htohl(sub->evt.s.timeout, sub->swap);
211 if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
212 tipc_nametbl_unsubscribe(sub);
213 list_del(&sub->subscrp_list);
214 tipc_subscrp_put(sub);
215 }
216
217 if (s)
218 break;
219 }
220 spin_unlock_bh(&subscriber->lock);
221}
222
223static struct tipc_subscriber *tipc_subscrb_create(int conid)
224{
225 struct tipc_subscriber *subscriber;
226
227 subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
228 if (!subscriber) {
229 pr_warn("Subscriber rejected, no memory\n");
230 return NULL;
231 }
232 INIT_LIST_HEAD(&subscriber->subscrp_list);
233 kref_init(&subscriber->kref);
234 subscriber->conid = conid;
235 spin_lock_init(&subscriber->lock);
236
237 return subscriber;
238}
239
240static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
241{
242 tipc_subscrb_subscrp_delete(subscriber, NULL);
243 tipc_subscrb_put(subscriber);
244}
245
246static void tipc_subscrp_cancel(struct tipc_subscr *s,
247 struct tipc_subscriber *subscriber)
248{
249 tipc_subscrb_get(subscriber);
250 tipc_subscrb_subscrp_delete(subscriber, s);
251 tipc_subscrb_put(subscriber);
252}
253
254static struct tipc_subscription *tipc_subscrp_create(struct net *net,
255 struct tipc_subscr *s,
256 int swap)
257{ 133{
258 struct tipc_net *tn = net_generic(net, tipc_net_id); 134 u32 filter = tipc_sub_read(s, filter);
259 struct tipc_subscription *sub; 135 struct tipc_subscription *sub;
260 u32 filter = htohl(s->filter, swap); 136 u32 timeout;
261 137
262 /* Refuse subscription if global limit exceeded */ 138 if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) ||
263 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 139 (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) {
264 pr_warn("Subscription rejected, limit reached (%u)\n", 140 pr_warn("Subscription rejected, illegal request\n");
265 TIPC_MAX_SUBSCRIPTIONS);
266 return NULL; 141 return NULL;
267 } 142 }
268
269 /* Allocate subscription object */
270 sub = kmalloc(sizeof(*sub), GFP_ATOMIC); 143 sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
271 if (!sub) { 144 if (!sub) {
272 pr_warn("Subscription rejected, no memory\n"); 145 pr_warn("Subscription rejected, no memory\n");
273 return NULL; 146 return NULL;
274 } 147 }
275
276 /* Initialize subscription object */
277 sub->net = net; 148 sub->net = net;
278 if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || 149 sub->conid = conid;
279 (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { 150 sub->inactive = false;
280 pr_warn("Subscription rejected, illegal request\n");
281 kfree(sub);
282 return NULL;
283 }
284
285 sub->swap = swap;
286 memcpy(&sub->evt.s, s, sizeof(*s)); 151 memcpy(&sub->evt.s, s, sizeof(*s));
287 atomic_inc(&tn->subscription_count); 152 spin_lock_init(&sub->lock);
288 kref_init(&sub->kref); 153 kref_init(&sub->kref);
289 return sub; 154 tipc_nametbl_subscribe(sub);
290} 155 timer_setup(&sub->timer, tipc_sub_timeout, 0);
291 156 timeout = tipc_sub_read(&sub->evt.s, timeout);
292static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
293 struct tipc_subscriber *subscriber, int swap,
294 bool status)
295{
296 struct tipc_subscription *sub = NULL;
297 u32 timeout;
298
299 sub = tipc_subscrp_create(net, s, swap);
300 if (!sub)
301 return -1;
302
303 spin_lock_bh(&subscriber->lock);
304 list_add(&sub->subscrp_list, &subscriber->subscrp_list);
305 sub->subscriber = subscriber;
306 tipc_nametbl_subscribe(sub, status);
307 tipc_subscrb_get(subscriber);
308 spin_unlock_bh(&subscriber->lock);
309
310 timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
311 timeout = htohl(sub->evt.s.timeout, swap);
312
313 if (timeout != TIPC_WAIT_FOREVER) 157 if (timeout != TIPC_WAIT_FOREVER)
314 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); 158 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
315 return 0; 159 return sub;
316}
317
318/* Handle one termination request for the subscriber */
319static void tipc_subscrb_release_cb(int conid, void *usr_data)
320{
321 tipc_subscrb_delete((struct tipc_subscriber *)usr_data);
322}
323
324/* Handle one request to create a new subscription for the subscriber */
325static int tipc_subscrb_rcv_cb(struct net *net, int conid,
326 struct sockaddr_tipc *addr, void *usr_data,
327 void *buf, size_t len)
328{
329 struct tipc_subscriber *subscriber = usr_data;
330 struct tipc_subscr *s = (struct tipc_subscr *)buf;
331 bool status;
332 int swap;
333
334 /* Determine subscriber's endianness */
335 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
336 TIPC_SUB_CANCEL));
337
338 /* Detect & process a subscription cancellation request */
339 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
340 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
341 tipc_subscrp_cancel(s, subscriber);
342 return 0;
343 }
344 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
345 return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
346}
347
348/* Handle one request to establish a new subscriber */
349static void *tipc_subscrb_connect_cb(int conid)
350{
351 return (void *)tipc_subscrb_create(conid);
352}
353
354int tipc_topsrv_start(struct net *net)
355{
356 struct tipc_net *tn = net_generic(net, tipc_net_id);
357 const char name[] = "topology_server";
358 struct tipc_server *topsrv;
359 struct sockaddr_tipc *saddr;
360
361 saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC);
362 if (!saddr)
363 return -ENOMEM;
364 saddr->family = AF_TIPC;
365 saddr->addrtype = TIPC_ADDR_NAMESEQ;
366 saddr->addr.nameseq.type = TIPC_TOP_SRV;
367 saddr->addr.nameseq.lower = TIPC_TOP_SRV;
368 saddr->addr.nameseq.upper = TIPC_TOP_SRV;
369 saddr->scope = TIPC_NODE_SCOPE;
370
371 topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC);
372 if (!topsrv) {
373 kfree(saddr);
374 return -ENOMEM;
375 }
376 topsrv->net = net;
377 topsrv->saddr = saddr;
378 topsrv->imp = TIPC_CRITICAL_IMPORTANCE;
379 topsrv->type = SOCK_SEQPACKET;
380 topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
381 topsrv->tipc_conn_recvmsg = tipc_subscrb_rcv_cb;
382 topsrv->tipc_conn_new = tipc_subscrb_connect_cb;
383 topsrv->tipc_conn_release = tipc_subscrb_release_cb;
384
385 strncpy(topsrv->name, name, strlen(name) + 1);
386 tn->topsrv = topsrv;
387 atomic_set(&tn->subscription_count, 0);
388
389 return tipc_server_start(topsrv);
390} 160}
391 161
392void tipc_topsrv_stop(struct net *net) 162void tipc_sub_unsubscribe(struct tipc_subscription *sub)
393{ 163{
394 struct tipc_net *tn = net_generic(net, tipc_net_id); 164 tipc_nametbl_unsubscribe(sub);
395 struct tipc_server *topsrv = tn->topsrv; 165 if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
396 166 del_timer_sync(&sub->timer);
397 tipc_server_stop(topsrv); 167 list_del(&sub->sub_list);
398 kfree(topsrv->saddr); 168 tipc_sub_put(sub);
399 kfree(topsrv);
400} 169}
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index f3edca775d9f..8b2d22b18f22 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/subscr.h: Include file for TIPC network topology service 2 * net/tipc/subscr.h: Include file for TIPC network topology service
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2017, Ericsson AB
5 * Copyright (c) 2005-2007, 2012-2013, Wind River Systems 5 * Copyright (c) 2005-2007, 2012-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -37,48 +37,72 @@
37#ifndef _TIPC_SUBSCR_H 37#ifndef _TIPC_SUBSCR_H
38#define _TIPC_SUBSCR_H 38#define _TIPC_SUBSCR_H
39 39
40#include "server.h" 40#include "topsrv.h"
41 41
42#define TIPC_MAX_SUBSCRIPTIONS 65535 42#define TIPC_MAX_SUBSCR 65535
43#define TIPC_MAX_PUBLICATIONS 65535 43#define TIPC_MAX_PUBLICATIONS 65535
44 44
45struct tipc_subscription; 45struct tipc_subscription;
46struct tipc_subscriber; 46struct tipc_conn;
47 47
48/** 48/**
49 * struct tipc_subscription - TIPC network topology subscription object 49 * struct tipc_subscription - TIPC network topology subscription object
50 * @subscriber: pointer to its subscriber 50 * @subscriber: pointer to its subscriber
51 * @seq: name sequence associated with subscription 51 * @seq: name sequence associated with subscription
52 * @net: point to network namespace
53 * @timer: timer governing subscription duration (optional) 52 * @timer: timer governing subscription duration (optional)
54 * @nameseq_list: adjacent subscriptions in name sequence's subscription list 53 * @nameseq_list: adjacent subscriptions in name sequence's subscription list
55 * @subscrp_list: adjacent subscriptions in subscriber's subscription list 54 * @sub_list: adjacent subscriptions in subscriber's subscription list
56 * @swap: indicates if subscriber uses opposite endianness in its messages
57 * @evt: template for events generated by subscription 55 * @evt: template for events generated by subscription
58 */ 56 */
59struct tipc_subscription { 57struct tipc_subscription {
60 struct kref kref; 58 struct kref kref;
61 struct tipc_subscriber *subscriber;
62 struct net *net; 59 struct net *net;
63 struct timer_list timer; 60 struct timer_list timer;
64 struct list_head nameseq_list; 61 struct list_head nameseq_list;
65 struct list_head subscrp_list; 62 struct list_head sub_list;
66 int swap;
67 struct tipc_event evt; 63 struct tipc_event evt;
64 int conid;
65 bool inactive;
66 spinlock_t lock; /* serialize up/down and timer events */
68}; 67};
69 68
70int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, 69struct tipc_subscription *tipc_sub_subscribe(struct net *net,
71 u32 found_upper); 70 struct tipc_subscr *s,
72void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 71 int conid);
73 u32 found_lower, u32 found_upper, u32 event, 72void tipc_sub_unsubscribe(struct tipc_subscription *sub);
74 u32 port_ref, u32 node, u32 scope, int must); 73
75void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, 74int tipc_sub_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
76 struct tipc_name_seq *out); 75 u32 found_upper);
77u32 tipc_subscrp_convert_seq_type(u32 type, int swap); 76void tipc_sub_report_overlap(struct tipc_subscription *sub,
77 u32 found_lower, u32 found_upper,
78 u32 event, u32 port, u32 node,
79 u32 scope, int must);
78int tipc_topsrv_start(struct net *net); 80int tipc_topsrv_start(struct net *net);
79void tipc_topsrv_stop(struct net *net); 81void tipc_topsrv_stop(struct net *net);
80 82
81void tipc_subscrp_put(struct tipc_subscription *subscription); 83void tipc_sub_put(struct tipc_subscription *subscription);
82void tipc_subscrp_get(struct tipc_subscription *subscription); 84void tipc_sub_get(struct tipc_subscription *subscription);
85
86#define TIPC_FILTER_MASK (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | TIPC_SUB_CANCEL)
87
88/* tipc_sub_read - return field_ of struct sub_ in host endian format
89 */
90#define tipc_sub_read(sub_, field_) \
91 ({ \
92 struct tipc_subscr *sub__ = sub_; \
93 u32 val__ = (sub__)->field_; \
94 int swap_ = !((sub__)->filter & TIPC_FILTER_MASK); \
95 (swap_ ? swab32(val__) : val__); \
96 })
97
98/* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format
99 */
100#define tipc_evt_write(evt_, field_, val_) \
101 ({ \
102 struct tipc_event *evt__ = evt_; \
103 u32 val__ = val_; \
104 int swap_ = !((evt__)->s.filter & (TIPC_FILTER_MASK)); \
105 (evt__)->field_ = swap_ ? swab32(val__) : val__; \
106 })
83 107
84#endif 108#endif
diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c
new file mode 100644
index 000000000000..02013e00f287
--- /dev/null
+++ b/net/tipc/topsrv.c
@@ -0,0 +1,702 @@
1/*
2 * net/tipc/server.c: TIPC server infrastructure
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017-2018, Ericsson AB
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "subscr.h"
38#include "topsrv.h"
39#include "core.h"
40#include "socket.h"
41#include "addr.h"
42#include "msg.h"
43#include <net/sock.h>
44#include <linux/module.h>
45
46/* Number of messages to send before rescheduling */
47#define MAX_SEND_MSG_COUNT 25
48#define MAX_RECV_MSG_COUNT 25
49#define CF_CONNECTED 1
50#define CF_SERVER 2
51
52#define TIPC_SERVER_NAME_LEN 32
53
54/**
55 * struct tipc_topsrv - TIPC server structure
56 * @conn_idr: identifier set of connection
57 * @idr_lock: protect the connection identifier set
58 * @idr_in_use: amount of allocated identifier entry
59 * @net: network namspace instance
60 * @rcvbuf_cache: memory cache of server receive buffer
61 * @rcv_wq: receive workqueue
62 * @send_wq: send workqueue
63 * @max_rcvbuf_size: maximum permitted receive message length
64 * @tipc_conn_new: callback will be called when new connection is incoming
65 * @tipc_conn_release: callback will be called before releasing the connection
66 * @tipc_conn_recvmsg: callback will be called when message arrives
67 * @name: server name
68 * @imp: message importance
69 * @type: socket type
70 */
71struct tipc_topsrv {
72 struct idr conn_idr;
73 spinlock_t idr_lock; /* for idr list */
74 int idr_in_use;
75 struct net *net;
76 struct work_struct awork;
77 struct workqueue_struct *rcv_wq;
78 struct workqueue_struct *send_wq;
79 int max_rcvbuf_size;
80 struct socket *listener;
81 char name[TIPC_SERVER_NAME_LEN];
82};
83
84/**
85 * struct tipc_conn - TIPC connection structure
86 * @kref: reference counter to connection object
87 * @conid: connection identifier
88 * @sock: socket handler associated with connection
89 * @flags: indicates connection state
90 * @server: pointer to connected server
91 * @sub_list: lsit to all pertaing subscriptions
92 * @sub_lock: lock protecting the subscription list
93 * @outqueue_lock: control access to the outqueue
94 * @rwork: receive work item
95 * @rx_action: what to do when connection socket is active
96 * @outqueue: pointer to first outbound message in queue
97 * @outqueue_lock: control access to the outqueue
98 * @swork: send work item
99 */
100struct tipc_conn {
101 struct kref kref;
102 int conid;
103 struct socket *sock;
104 unsigned long flags;
105 struct tipc_topsrv *server;
106 struct list_head sub_list;
107 spinlock_t sub_lock; /* for subscription list */
108 struct work_struct rwork;
109 struct list_head outqueue;
110 spinlock_t outqueue_lock; /* for outqueue */
111 struct work_struct swork;
112};
113
114/* An entry waiting to be sent */
115struct outqueue_entry {
116 bool inactive;
117 struct tipc_event evt;
118 struct list_head list;
119};
120
121static void tipc_conn_recv_work(struct work_struct *work);
122static void tipc_conn_send_work(struct work_struct *work);
123static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt);
124static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s);
125
126static bool connected(struct tipc_conn *con)
127{
128 return con && test_bit(CF_CONNECTED, &con->flags);
129}
130
131static void tipc_conn_kref_release(struct kref *kref)
132{
133 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
134 struct tipc_topsrv *s = con->server;
135 struct outqueue_entry *e, *safe;
136
137 spin_lock_bh(&s->idr_lock);
138 idr_remove(&s->conn_idr, con->conid);
139 s->idr_in_use--;
140 spin_unlock_bh(&s->idr_lock);
141 if (con->sock)
142 sock_release(con->sock);
143
144 spin_lock_bh(&con->outqueue_lock);
145 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
146 list_del(&e->list);
147 kfree(e);
148 }
149 spin_unlock_bh(&con->outqueue_lock);
150 kfree(con);
151}
152
153static void conn_put(struct tipc_conn *con)
154{
155 kref_put(&con->kref, tipc_conn_kref_release);
156}
157
158static void conn_get(struct tipc_conn *con)
159{
160 kref_get(&con->kref);
161}
162
163static void tipc_conn_close(struct tipc_conn *con)
164{
165 struct sock *sk = con->sock->sk;
166 bool disconnect = false;
167
168 write_lock_bh(&sk->sk_callback_lock);
169 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
170
171 if (disconnect) {
172 sk->sk_user_data = NULL;
173 tipc_conn_delete_sub(con, NULL);
174 }
175 write_unlock_bh(&sk->sk_callback_lock);
176
177 /* Handle concurrent calls from sending and receiving threads */
178 if (!disconnect)
179 return;
180
181 /* Don't flush pending works, -just let them expire */
182 kernel_sock_shutdown(con->sock, SHUT_RDWR);
183
184 conn_put(con);
185}
186
187static struct tipc_conn *tipc_conn_alloc(struct tipc_topsrv *s)
188{
189 struct tipc_conn *con;
190 int ret;
191
192 con = kzalloc(sizeof(*con), GFP_ATOMIC);
193 if (!con)
194 return ERR_PTR(-ENOMEM);
195
196 kref_init(&con->kref);
197 INIT_LIST_HEAD(&con->outqueue);
198 INIT_LIST_HEAD(&con->sub_list);
199 spin_lock_init(&con->outqueue_lock);
200 spin_lock_init(&con->sub_lock);
201 INIT_WORK(&con->swork, tipc_conn_send_work);
202 INIT_WORK(&con->rwork, tipc_conn_recv_work);
203
204 spin_lock_bh(&s->idr_lock);
205 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
206 if (ret < 0) {
207 kfree(con);
208 spin_unlock_bh(&s->idr_lock);
209 return ERR_PTR(-ENOMEM);
210 }
211 con->conid = ret;
212 s->idr_in_use++;
213 spin_unlock_bh(&s->idr_lock);
214
215 set_bit(CF_CONNECTED, &con->flags);
216 con->server = s;
217
218 return con;
219}
220
221static struct tipc_conn *tipc_conn_lookup(struct tipc_topsrv *s, int conid)
222{
223 struct tipc_conn *con;
224
225 spin_lock_bh(&s->idr_lock);
226 con = idr_find(&s->conn_idr, conid);
227 if (!connected(con) || !kref_get_unless_zero(&con->kref))
228 con = NULL;
229 spin_unlock_bh(&s->idr_lock);
230 return con;
231}
232
233/* tipc_conn_delete_sub - delete a specific or all subscriptions
234 * for a given subscriber
235 */
236static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
237{
238 struct tipc_net *tn = tipc_net(con->server->net);
239 struct list_head *sub_list = &con->sub_list;
240 struct tipc_subscription *sub, *tmp;
241
242 spin_lock_bh(&con->sub_lock);
243 list_for_each_entry_safe(sub, tmp, sub_list, sub_list) {
244 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) {
245 tipc_sub_unsubscribe(sub);
246 atomic_dec(&tn->subscription_count);
247 } else if (s) {
248 break;
249 }
250 }
251 spin_unlock_bh(&con->sub_lock);
252}
253
254static void tipc_conn_send_to_sock(struct tipc_conn *con)
255{
256 struct list_head *queue = &con->outqueue;
257 struct tipc_topsrv *srv = con->server;
258 struct outqueue_entry *e;
259 struct tipc_event *evt;
260 struct msghdr msg;
261 struct kvec iov;
262 int count = 0;
263 int ret;
264
265 spin_lock_bh(&con->outqueue_lock);
266
267 while (!list_empty(queue)) {
268 e = list_first_entry(queue, struct outqueue_entry, list);
269 evt = &e->evt;
270 spin_unlock_bh(&con->outqueue_lock);
271
272 if (e->inactive)
273 tipc_conn_delete_sub(con, &evt->s);
274
275 memset(&msg, 0, sizeof(msg));
276 msg.msg_flags = MSG_DONTWAIT;
277 iov.iov_base = evt;
278 iov.iov_len = sizeof(*evt);
279 msg.msg_name = NULL;
280
281 if (con->sock) {
282 ret = kernel_sendmsg(con->sock, &msg, &iov,
283 1, sizeof(*evt));
284 if (ret == -EWOULDBLOCK || ret == 0) {
285 cond_resched();
286 return;
287 } else if (ret < 0) {
288 return tipc_conn_close(con);
289 }
290 } else {
291 tipc_topsrv_kern_evt(srv->net, evt);
292 }
293
294 /* Don't starve users filling buffers */
295 if (++count >= MAX_SEND_MSG_COUNT) {
296 cond_resched();
297 count = 0;
298 }
299 spin_lock_bh(&con->outqueue_lock);
300 list_del(&e->list);
301 kfree(e);
302 }
303 spin_unlock_bh(&con->outqueue_lock);
304}
305
306static void tipc_conn_send_work(struct work_struct *work)
307{
308 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
309
310 if (connected(con))
311 tipc_conn_send_to_sock(con);
312
313 conn_put(con);
314}
315
316/* tipc_conn_queue_evt() - interrupt level call from a subscription instance
317 * The queued work is launched into tipc_send_work()->tipc_send_to_sock()
318 */
319void tipc_topsrv_queue_evt(struct net *net, int conid,
320 u32 event, struct tipc_event *evt)
321{
322 struct tipc_topsrv *srv = tipc_topsrv(net);
323 struct outqueue_entry *e;
324 struct tipc_conn *con;
325
326 con = tipc_conn_lookup(srv, conid);
327 if (!con)
328 return;
329
330 if (!connected(con))
331 goto err;
332
333 e = kmalloc(sizeof(*e), GFP_ATOMIC);
334 if (!e)
335 goto err;
336 e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
337 memcpy(&e->evt, evt, sizeof(*evt));
338 spin_lock_bh(&con->outqueue_lock);
339 list_add_tail(&e->list, &con->outqueue);
340 spin_unlock_bh(&con->outqueue_lock);
341
342 if (queue_work(srv->send_wq, &con->swork))
343 return;
344err:
345 conn_put(con);
346}
347
348/* tipc_conn_write_space - interrupt callback after a sendmsg EAGAIN
349 * Indicates that there now is more space in the send buffer
350 * The queued work is launched into tipc_send_work()->tipc_conn_send_to_sock()
351 */
352static void tipc_conn_write_space(struct sock *sk)
353{
354 struct tipc_conn *con;
355
356 read_lock_bh(&sk->sk_callback_lock);
357 con = sk->sk_user_data;
358 if (connected(con)) {
359 conn_get(con);
360 if (!queue_work(con->server->send_wq, &con->swork))
361 conn_put(con);
362 }
363 read_unlock_bh(&sk->sk_callback_lock);
364}
365
366static int tipc_conn_rcv_sub(struct tipc_topsrv *srv,
367 struct tipc_conn *con,
368 struct tipc_subscr *s)
369{
370 struct tipc_net *tn = tipc_net(srv->net);
371 struct tipc_subscription *sub;
372
373 if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) {
374 tipc_conn_delete_sub(con, s);
375 return 0;
376 }
377 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) {
378 pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR);
379 return -1;
380 }
381 sub = tipc_sub_subscribe(srv->net, s, con->conid);
382 if (!sub)
383 return -1;
384 atomic_inc(&tn->subscription_count);
385 spin_lock_bh(&con->sub_lock);
386 list_add(&sub->sub_list, &con->sub_list);
387 spin_unlock_bh(&con->sub_lock);
388 return 0;
389}
390
391static int tipc_conn_rcv_from_sock(struct tipc_conn *con)
392{
393 struct tipc_topsrv *srv = con->server;
394 struct sock *sk = con->sock->sk;
395 struct msghdr msg = {};
396 struct tipc_subscr s;
397 struct kvec iov;
398 int ret;
399
400 iov.iov_base = &s;
401 iov.iov_len = sizeof(s);
402 msg.msg_name = NULL;
403 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
404 ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
405 if (ret == -EWOULDBLOCK)
406 return -EWOULDBLOCK;
407 if (ret > 0) {
408 read_lock_bh(&sk->sk_callback_lock);
409 ret = tipc_conn_rcv_sub(srv, con, &s);
410 read_unlock_bh(&sk->sk_callback_lock);
411 }
412 if (ret < 0)
413 tipc_conn_close(con);
414
415 return ret;
416}
417
418static void tipc_conn_recv_work(struct work_struct *work)
419{
420 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
421 int count = 0;
422
423 while (connected(con)) {
424 if (tipc_conn_rcv_from_sock(con))
425 break;
426
427 /* Don't flood Rx machine */
428 if (++count >= MAX_RECV_MSG_COUNT) {
429 cond_resched();
430 count = 0;
431 }
432 }
433 conn_put(con);
434}
435
436/* tipc_conn_data_ready - interrupt callback indicating the socket has data
437 * The queued work is launched into tipc_recv_work()->tipc_conn_rcv_from_sock()
438 */
439static void tipc_conn_data_ready(struct sock *sk)
440{
441 struct tipc_conn *con;
442
443 read_lock_bh(&sk->sk_callback_lock);
444 con = sk->sk_user_data;
445 if (connected(con)) {
446 conn_get(con);
447 if (!queue_work(con->server->rcv_wq, &con->rwork))
448 conn_put(con);
449 }
450 read_unlock_bh(&sk->sk_callback_lock);
451}
452
453static void tipc_topsrv_accept(struct work_struct *work)
454{
455 struct tipc_topsrv *srv = container_of(work, struct tipc_topsrv, awork);
456 struct socket *lsock = srv->listener;
457 struct socket *newsock;
458 struct tipc_conn *con;
459 struct sock *newsk;
460 int ret;
461
462 while (1) {
463 ret = kernel_accept(lsock, &newsock, O_NONBLOCK);
464 if (ret < 0)
465 return;
466 con = tipc_conn_alloc(srv);
467 if (IS_ERR(con)) {
468 ret = PTR_ERR(con);
469 sock_release(newsock);
470 return;
471 }
472 /* Register callbacks */
473 newsk = newsock->sk;
474 write_lock_bh(&newsk->sk_callback_lock);
475 newsk->sk_data_ready = tipc_conn_data_ready;
476 newsk->sk_write_space = tipc_conn_write_space;
477 newsk->sk_user_data = con;
478 con->sock = newsock;
479 write_unlock_bh(&newsk->sk_callback_lock);
480
481 /* Wake up receive process in case of 'SYN+' message */
482 newsk->sk_data_ready(newsk);
483 }
484}
485
486/* tipc_toprsv_listener_data_ready - interrupt callback with connection request
487 * The queued job is launched into tipc_topsrv_accept()
488 */
489static void tipc_topsrv_listener_data_ready(struct sock *sk)
490{
491 struct tipc_topsrv *srv;
492
493 read_lock_bh(&sk->sk_callback_lock);
494 srv = sk->sk_user_data;
495 if (srv->listener)
496 queue_work(srv->rcv_wq, &srv->awork);
497 read_unlock_bh(&sk->sk_callback_lock);
498}
499
500static int tipc_topsrv_create_listener(struct tipc_topsrv *srv)
501{
502 int imp = TIPC_CRITICAL_IMPORTANCE;
503 struct socket *lsock = NULL;
504 struct sockaddr_tipc saddr;
505 struct sock *sk;
506 int rc;
507
508 rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock);
509 if (rc < 0)
510 return rc;
511
512 srv->listener = lsock;
513 sk = lsock->sk;
514 write_lock_bh(&sk->sk_callback_lock);
515 sk->sk_data_ready = tipc_topsrv_listener_data_ready;
516 sk->sk_user_data = srv;
517 write_unlock_bh(&sk->sk_callback_lock);
518
519 rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE,
520 (char *)&imp, sizeof(imp));
521 if (rc < 0)
522 goto err;
523
524 saddr.family = AF_TIPC;
525 saddr.addrtype = TIPC_ADDR_NAMESEQ;
526 saddr.addr.nameseq.type = TIPC_TOP_SRV;
527 saddr.addr.nameseq.lower = TIPC_TOP_SRV;
528 saddr.addr.nameseq.upper = TIPC_TOP_SRV;
529 saddr.scope = TIPC_NODE_SCOPE;
530
531 rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr));
532 if (rc < 0)
533 goto err;
534 rc = kernel_listen(lsock, 0);
535 if (rc < 0)
536 goto err;
537
538 /* As server's listening socket owner and creator is the same module,
539 * we have to decrease TIPC module reference count to guarantee that
540 * it remains zero after the server socket is created, otherwise,
541 * executing "rmmod" command is unable to make TIPC module deleted
542 * after TIPC module is inserted successfully.
543 *
544 * However, the reference count is ever increased twice in
545 * sock_create_kern(): one is to increase the reference count of owner
546 * of TIPC socket's proto_ops struct; another is to increment the
547 * reference count of owner of TIPC proto struct. Therefore, we must
548 * decrement the module reference count twice to ensure that it keeps
549 * zero after server's listening socket is created. Of course, we
550 * must bump the module reference count twice as well before the socket
551 * is closed.
552 */
553 module_put(lsock->ops->owner);
554 module_put(sk->sk_prot_creator->owner);
555
556 return 0;
557err:
558 sock_release(lsock);
559 return -EINVAL;
560}
561
562bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
563 u32 upper, u32 filter, int *conid)
564{
565 struct tipc_subscr sub;
566 struct tipc_conn *con;
567 int rc;
568
569 sub.seq.type = type;
570 sub.seq.lower = lower;
571 sub.seq.upper = upper;
572 sub.timeout = TIPC_WAIT_FOREVER;
573 sub.filter = filter;
574 *(u32 *)&sub.usr_handle = port;
575
576 con = tipc_conn_alloc(tipc_topsrv(net));
577 if (IS_ERR(con))
578 return false;
579
580 *conid = con->conid;
581 con->sock = NULL;
582 rc = tipc_conn_rcv_sub(tipc_topsrv(net), con, &sub);
583 if (rc < 0)
584 tipc_conn_close(con);
585 return !rc;
586}
587
588void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
589{
590 struct tipc_conn *con;
591
592 con = tipc_conn_lookup(tipc_topsrv(net), conid);
593 if (!con)
594 return;
595
596 test_and_clear_bit(CF_CONNECTED, &con->flags);
597 tipc_conn_delete_sub(con, NULL);
598 conn_put(con);
599 conn_put(con);
600}
601
602static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt)
603{
604 u32 port = *(u32 *)&evt->s.usr_handle;
605 u32 self = tipc_own_addr(net);
606 struct sk_buff_head evtq;
607 struct sk_buff *skb;
608
609 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
610 self, self, port, port, 0);
611 if (!skb)
612 return;
613 msg_set_dest_droppable(buf_msg(skb), true);
614 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
615 skb_queue_head_init(&evtq);
616 __skb_queue_tail(&evtq, skb);
617 tipc_sk_rcv(net, &evtq);
618}
619
620static int tipc_topsrv_work_start(struct tipc_topsrv *s)
621{
622 s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
623 if (!s->rcv_wq) {
624 pr_err("can't start tipc receive workqueue\n");
625 return -ENOMEM;
626 }
627
628 s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
629 if (!s->send_wq) {
630 pr_err("can't start tipc send workqueue\n");
631 destroy_workqueue(s->rcv_wq);
632 return -ENOMEM;
633 }
634
635 return 0;
636}
637
638static void tipc_topsrv_work_stop(struct tipc_topsrv *s)
639{
640 destroy_workqueue(s->rcv_wq);
641 destroy_workqueue(s->send_wq);
642}
643
644int tipc_topsrv_start(struct net *net)
645{
646 struct tipc_net *tn = tipc_net(net);
647 const char name[] = "topology_server";
648 struct tipc_topsrv *srv;
649 int ret;
650
651 srv = kzalloc(sizeof(*srv), GFP_ATOMIC);
652 if (!srv)
653 return -ENOMEM;
654
655 srv->net = net;
656 srv->max_rcvbuf_size = sizeof(struct tipc_subscr);
657 INIT_WORK(&srv->awork, tipc_topsrv_accept);
658
659 strncpy(srv->name, name, strlen(name) + 1);
660 tn->topsrv = srv;
661 atomic_set(&tn->subscription_count, 0);
662
663 spin_lock_init(&srv->idr_lock);
664 idr_init(&srv->conn_idr);
665 srv->idr_in_use = 0;
666
667 ret = tipc_topsrv_work_start(srv);
668 if (ret < 0)
669 return ret;
670
671 ret = tipc_topsrv_create_listener(srv);
672 if (ret < 0)
673 tipc_topsrv_work_stop(srv);
674
675 return ret;
676}
677
678void tipc_topsrv_stop(struct net *net)
679{
680 struct tipc_topsrv *srv = tipc_topsrv(net);
681 struct socket *lsock = srv->listener;
682 struct tipc_conn *con;
683 int id;
684
685 spin_lock_bh(&srv->idr_lock);
686 for (id = 0; srv->idr_in_use; id++) {
687 con = idr_find(&srv->conn_idr, id);
688 if (con) {
689 spin_unlock_bh(&srv->idr_lock);
690 tipc_conn_close(con);
691 spin_lock_bh(&srv->idr_lock);
692 }
693 }
694 __module_get(lsock->ops->owner);
695 __module_get(lsock->sk->sk_prot_creator->owner);
696 sock_release(lsock);
697 srv->listener = NULL;
698 spin_unlock_bh(&srv->idr_lock);
699 tipc_topsrv_work_stop(srv);
700 idr_destroy(&srv->conn_idr);
701 kfree(srv);
702}
diff --git a/net/tipc/server.h b/net/tipc/topsrv.h
index 64df7513cd70..c7ea71293748 100644
--- a/net/tipc/server.h
+++ b/net/tipc/topsrv.h
@@ -2,6 +2,7 @@
2 * net/tipc/server.h: Include file for TIPC server code 2 * net/tipc/server.h: Include file for TIPC server code
3 * 3 *
4 * Copyright (c) 2012-2013, Wind River Systems 4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved. 6 * All rights reserved.
6 * 7 *
7 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -36,68 +37,18 @@
36#ifndef _TIPC_SERVER_H 37#ifndef _TIPC_SERVER_H
37#define _TIPC_SERVER_H 38#define _TIPC_SERVER_H
38 39
39#include <linux/idr.h> 40#include "core.h"
40#include <linux/tipc.h>
41#include <net/net_namespace.h>
42 41
43#define TIPC_SERVER_NAME_LEN 32 42#define TIPC_SERVER_NAME_LEN 32
44#define TIPC_SUB_CLUSTER_SCOPE 0x20 43#define TIPC_SUB_CLUSTER_SCOPE 0x20
45#define TIPC_SUB_NODE_SCOPE 0x40 44#define TIPC_SUB_NODE_SCOPE 0x40
46#define TIPC_SUB_NO_STATUS 0x80 45#define TIPC_SUB_NO_STATUS 0x80
47 46
48/** 47void tipc_topsrv_queue_evt(struct net *net, int conid,
49 * struct tipc_server - TIPC server structure 48 u32 event, struct tipc_event *evt);
50 * @conn_idr: identifier set of connection
51 * @idr_lock: protect the connection identifier set
52 * @idr_in_use: amount of allocated identifier entry
53 * @net: network namspace instance
54 * @rcvbuf_cache: memory cache of server receive buffer
55 * @rcv_wq: receive workqueue
56 * @send_wq: send workqueue
57 * @max_rcvbuf_size: maximum permitted receive message length
58 * @tipc_conn_new: callback will be called when new connection is incoming
59 * @tipc_conn_release: callback will be called before releasing the connection
60 * @tipc_conn_recvmsg: callback will be called when message arrives
61 * @saddr: TIPC server address
62 * @name: server name
63 * @imp: message importance
64 * @type: socket type
65 */
66struct tipc_server {
67 struct idr conn_idr;
68 spinlock_t idr_lock;
69 int idr_in_use;
70 struct net *net;
71 struct kmem_cache *rcvbuf_cache;
72 struct workqueue_struct *rcv_wq;
73 struct workqueue_struct *send_wq;
74 int max_rcvbuf_size;
75 void *(*tipc_conn_new)(int conid);
76 void (*tipc_conn_release)(int conid, void *usr_data);
77 int (*tipc_conn_recvmsg)(struct net *net, int conid,
78 struct sockaddr_tipc *addr, void *usr_data,
79 void *buf, size_t len);
80 struct sockaddr_tipc *saddr;
81 char name[TIPC_SERVER_NAME_LEN];
82 int imp;
83 int type;
84};
85
86int tipc_conn_sendmsg(struct tipc_server *s, int conid,
87 struct sockaddr_tipc *addr, void *data, size_t len);
88 49
89bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 50bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
90 u32 upper, u32 filter, int *conid); 51 u32 upper, u32 filter, int *conid);
91void tipc_topsrv_kern_unsubscr(struct net *net, int conid); 52void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
92 53
93/**
94 * tipc_conn_terminate - terminate connection with server
95 *
96 * Note: Must call it in process context since it might sleep
97 */
98void tipc_conn_terminate(struct tipc_server *s, int conid);
99int tipc_server_start(struct tipc_server *s);
100
101void tipc_server_stop(struct tipc_server *s);
102
103#endif 54#endif