aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/subscr.c
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2015-01-09 02:27:11 -0500
committerDavid S. Miller <davem@davemloft.net>2015-01-12 16:24:33 -0500
commita62fbccecd62bacb4416fc427239f5b43b25d05e (patch)
tree7f2646f08ce92d8d82cf4a6cbd2126e7f71555ec /net/tipc/subscr.c
parent347475395434abb2b61bf59c2952470f37072567 (diff)
tipc: make subscriber server support net namespace
TIPC establishes one subscriber server which allows users to subscribe their interesting name service status. After tipc supports namespace, one dedicated tipc stack instance is created for each namespace, and each instance can be deemed as one independent TIPC node. As a result, subscriber server must be built for each namespace. Signed-off-by: Ying Xue <ying.xue@windriver.com> Tested-by: Tero Aho <Tero.Aho@coriant.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/subscr.c')
-rw-r--r--net/tipc/subscr.c104
1 files changed, 61 insertions, 43 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index b71dbc0ae8f9..72c339e432aa 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -50,34 +50,6 @@ struct tipc_subscriber {
50 struct list_head subscription_list; 50 struct list_head subscription_list;
51}; 51};
52 52
53static void subscr_conn_msg_event(struct net *net, int conid,
54 struct sockaddr_tipc *addr, void *usr_data,
55 void *buf, size_t len);
56static void *subscr_named_msg_event(int conid);
57static void subscr_conn_shutdown_event(int conid, void *usr_data);
58
59static atomic_t subscription_count = ATOMIC_INIT(0);
60
61static struct sockaddr_tipc topsrv_addr __read_mostly = {
62 .family = AF_TIPC,
63 .addrtype = TIPC_ADDR_NAMESEQ,
64 .addr.nameseq.type = TIPC_TOP_SRV,
65 .addr.nameseq.lower = TIPC_TOP_SRV,
66 .addr.nameseq.upper = TIPC_TOP_SRV,
67 .scope = TIPC_NODE_SCOPE
68};
69
70static struct tipc_server topsrv __read_mostly = {
71 .saddr = &topsrv_addr,
72 .imp = TIPC_CRITICAL_IMPORTANCE,
73 .type = SOCK_SEQPACKET,
74 .max_rcvbuf_size = sizeof(struct tipc_subscr),
75 .name = "topology_server",
76 .tipc_conn_recvmsg = subscr_conn_msg_event,
77 .tipc_conn_new = subscr_named_msg_event,
78 .tipc_conn_shutdown = subscr_conn_shutdown_event,
79};
80
81/** 53/**
82 * htohl - convert value to endianness used by destination 54 * htohl - convert value to endianness used by destination
83 * @in: value to convert 55 * @in: value to convert
@@ -94,6 +66,7 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
94 u32 found_upper, u32 event, u32 port_ref, 66 u32 found_upper, u32 event, u32 port_ref,
95 u32 node) 67 u32 node)
96{ 68{
69 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
97 struct tipc_subscriber *subscriber = sub->subscriber; 70 struct tipc_subscriber *subscriber = sub->subscriber;
98 struct kvec msg_sect; 71 struct kvec msg_sect;
99 72
@@ -104,8 +77,8 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
104 sub->evt.found_upper = htohl(found_upper, sub->swap); 77 sub->evt.found_upper = htohl(found_upper, sub->swap);
105 sub->evt.port.ref = htohl(port_ref, sub->swap); 78 sub->evt.port.ref = htohl(port_ref, sub->swap);
106 sub->evt.port.node = htohl(node, sub->swap); 79 sub->evt.port.node = htohl(node, sub->swap);
107 tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base, 80 tipc_conn_sendmsg(tn->topsrv, subscriber->conid, NULL,
108 msg_sect.iov_len); 81 msg_sect.iov_base, msg_sect.iov_len);
109} 82}
110 83
111/** 84/**
@@ -146,6 +119,7 @@ static void subscr_timeout(unsigned long data)
146{ 119{
147 struct tipc_subscription *sub = (struct tipc_subscription *)data; 120 struct tipc_subscription *sub = (struct tipc_subscription *)data;
148 struct tipc_subscriber *subscriber = sub->subscriber; 121 struct tipc_subscriber *subscriber = sub->subscriber;
122 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
149 123
150 /* The spin lock per subscriber is used to protect its members */ 124 /* The spin lock per subscriber is used to protect its members */
151 spin_lock_bh(&subscriber->lock); 125 spin_lock_bh(&subscriber->lock);
@@ -170,7 +144,7 @@ static void subscr_timeout(unsigned long data)
170 144
171 /* Now destroy subscription */ 145 /* Now destroy subscription */
172 kfree(sub); 146 kfree(sub);
173 atomic_dec(&subscription_count); 147 atomic_dec(&tn->subscription_count);
174} 148}
175 149
176/** 150/**
@@ -180,10 +154,12 @@ static void subscr_timeout(unsigned long data)
180 */ 154 */
181static void subscr_del(struct tipc_subscription *sub) 155static void subscr_del(struct tipc_subscription *sub)
182{ 156{
157 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
158
183 tipc_nametbl_unsubscribe(sub); 159 tipc_nametbl_unsubscribe(sub);
184 list_del(&sub->subscription_list); 160 list_del(&sub->subscription_list);
185 kfree(sub); 161 kfree(sub);
186 atomic_dec(&subscription_count); 162 atomic_dec(&tn->subscription_count);
187} 163}
188 164
189/** 165/**
@@ -191,9 +167,12 @@ static void subscr_del(struct tipc_subscription *sub)
191 * 167 *
192 * Note: Must call it in process context since it might sleep. 168 * Note: Must call it in process context since it might sleep.
193 */ 169 */
194static void subscr_terminate(struct tipc_subscriber *subscriber) 170static void subscr_terminate(struct tipc_subscription *sub)
195{ 171{
196 tipc_conn_terminate(&topsrv, subscriber->conid); 172 struct tipc_subscriber *subscriber = sub->subscriber;
173 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
174
175 tipc_conn_terminate(tn->topsrv, subscriber->conid);
197} 176}
198 177
199static void subscr_release(struct tipc_subscriber *subscriber) 178static void subscr_release(struct tipc_subscriber *subscriber)
@@ -263,7 +242,9 @@ static void subscr_cancel(struct tipc_subscr *s,
263 */ 242 */
264static int subscr_subscribe(struct net *net, struct tipc_subscr *s, 243static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
265 struct tipc_subscriber *subscriber, 244 struct tipc_subscriber *subscriber,
266 struct tipc_subscription **sub_p) { 245 struct tipc_subscription **sub_p)
246{
247 struct tipc_net *tn = net_generic(net, tipc_net_id);
267 struct tipc_subscription *sub; 248 struct tipc_subscription *sub;
268 int swap; 249 int swap;
269 250
@@ -278,7 +259,7 @@ static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
278 } 259 }
279 260
280 /* Refuse subscription if global limit exceeded */ 261 /* Refuse subscription if global limit exceeded */
281 if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 262 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
282 pr_warn("Subscription rejected, limit reached (%u)\n", 263 pr_warn("Subscription rejected, limit reached (%u)\n",
283 TIPC_MAX_SUBSCRIPTIONS); 264 TIPC_MAX_SUBSCRIPTIONS);
284 return -EINVAL; 265 return -EINVAL;
@@ -309,7 +290,7 @@ static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
309 sub->subscriber = subscriber; 290 sub->subscriber = subscriber;
310 sub->swap = swap; 291 sub->swap = swap;
311 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 292 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
312 atomic_inc(&subscription_count); 293 atomic_inc(&tn->subscription_count);
313 if (sub->timeout != TIPC_WAIT_FOREVER) { 294 if (sub->timeout != TIPC_WAIT_FOREVER) {
314 setup_timer(&sub->timer, subscr_timeout, (unsigned long)sub); 295 setup_timer(&sub->timer, subscr_timeout, (unsigned long)sub);
315 mod_timer(&sub->timer, jiffies + sub->timeout); 296 mod_timer(&sub->timer, jiffies + sub->timeout);
@@ -336,7 +317,7 @@ static void subscr_conn_msg_event(struct net *net, int conid,
336 if (subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, 317 if (subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber,
337 &sub) < 0) { 318 &sub) < 0) {
338 spin_unlock_bh(&subscriber->lock); 319 spin_unlock_bh(&subscriber->lock);
339 subscr_terminate(subscriber); 320 subscr_terminate(sub);
340 return; 321 return;
341 } 322 }
342 if (sub) 323 if (sub)
@@ -344,7 +325,6 @@ static void subscr_conn_msg_event(struct net *net, int conid,
344 spin_unlock_bh(&subscriber->lock); 325 spin_unlock_bh(&subscriber->lock);
345} 326}
346 327
347
348/* Handle one request to establish a new subscriber */ 328/* Handle one request to establish a new subscriber */
349static void *subscr_named_msg_event(int conid) 329static void *subscr_named_msg_event(int conid)
350{ 330{
@@ -363,12 +343,50 @@ static void *subscr_named_msg_event(int conid)
363 return (void *)subscriber; 343 return (void *)subscriber;
364} 344}
365 345
366int tipc_subscr_start(void) 346int tipc_subscr_start(struct net *net)
367{ 347{
368 return tipc_server_start(&topsrv); 348 struct tipc_net *tn = net_generic(net, tipc_net_id);
349 const char name[] = "topology_server";
350 struct tipc_server *topsrv;
351 struct sockaddr_tipc *saddr;
352
353 saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC);
354 if (!saddr)
355 return -ENOMEM;
356 saddr->family = AF_TIPC;
357 saddr->addrtype = TIPC_ADDR_NAMESEQ;
358 saddr->addr.nameseq.type = TIPC_TOP_SRV;
359 saddr->addr.nameseq.lower = TIPC_TOP_SRV;
360 saddr->addr.nameseq.upper = TIPC_TOP_SRV;
361 saddr->scope = TIPC_NODE_SCOPE;
362
363 topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC);
364 if (!topsrv) {
365 kfree(saddr);
366 return -ENOMEM;
367 }
368 topsrv->net = net;
369 topsrv->saddr = saddr;
370 topsrv->imp = TIPC_CRITICAL_IMPORTANCE;
371 topsrv->type = SOCK_SEQPACKET;
372 topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
373 topsrv->tipc_conn_recvmsg = subscr_conn_msg_event;
374 topsrv->tipc_conn_new = subscr_named_msg_event;
375 topsrv->tipc_conn_shutdown = subscr_conn_shutdown_event;
376
377 strncpy(topsrv->name, name, strlen(name) + 1);
378 tn->topsrv = topsrv;
379 atomic_set(&tn->subscription_count, 0);
380
381 return tipc_server_start(topsrv);
369} 382}
370 383
371void tipc_subscr_stop(void) 384void tipc_subscr_stop(struct net *net)
372{ 385{
373 tipc_server_stop(&topsrv); 386 struct tipc_net *tn = net_generic(net, tipc_net_id);
387 struct tipc_server *topsrv = tn->topsrv;
388
389 tipc_server_stop(topsrv);
390 kfree(topsrv->saddr);
391 kfree(topsrv);
374} 392}