aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/core.c6
-rw-r--r--net/tipc/socket.c3
-rw-r--r--net/tipc/subscr.c334
-rw-r--r--net/tipc/subscr.h8
4 files changed, 104 insertions, 247 deletions
diff --git a/net/tipc/core.c b/net/tipc/core.c
index b0e42a087291..15bbe99b609d 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -2,7 +2,7 @@
2 * net/tipc/core.c: TIPC module code 2 * net/tipc/core.c: TIPC module code
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2006, Ericsson AB
5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2006, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -137,8 +137,6 @@ static int tipc_core_start(void)
137 if (!res) 137 if (!res)
138 res = tipc_nametbl_init(); 138 res = tipc_nametbl_init();
139 if (!res) 139 if (!res)
140 res = tipc_subscr_start();
141 if (!res)
142 res = tipc_cfg_init(); 140 res = tipc_cfg_init();
143 if (!res) 141 if (!res)
144 res = tipc_netlink_start(); 142 res = tipc_netlink_start();
@@ -146,6 +144,8 @@ static int tipc_core_start(void)
146 res = tipc_socket_init(); 144 res = tipc_socket_init();
147 if (!res) 145 if (!res)
148 res = tipc_register_sysctl(); 146 res = tipc_register_sysctl();
147 if (!res)
148 res = tipc_subscr_start();
149 if (res) 149 if (res)
150 tipc_core_stop(); 150 tipc_core_stop();
151 151
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index bd8e2cdeceef..d0254157a30d 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -402,7 +402,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
402 else if (addr->addrtype != TIPC_ADDR_NAMESEQ) 402 else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
403 return -EAFNOSUPPORT; 403 return -EAFNOSUPPORT;
404 404
405 if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES) 405 if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
406 (addr->addr.nameseq.type != TIPC_TOP_SRV))
406 return -EACCES; 407 return -EACCES;
407 408
408 return (addr->scope > 0) ? 409 return (addr->scope > 0) ?
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 6b42d47029af..f6be92a6973a 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -2,7 +2,7 @@
2 * net/tipc/subscr.c: TIPC network topology service 2 * net/tipc/subscr.c: TIPC network topology service
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -41,33 +41,42 @@
41 41
42/** 42/**
43 * struct tipc_subscriber - TIPC network topology subscriber 43 * struct tipc_subscriber - TIPC network topology subscriber
44 * @port_ref: object reference to server port connecting to subscriber 44 * @conid: connection identifier to server connecting to subscriber
45 * @lock: pointer to spinlock controlling access to subscriber's server port 45 * @lock: controll access to subscriber
46 * @subscriber_list: adjacent subscribers in top. server's list of subscribers
47 * @subscription_list: list of subscription objects for this subscriber 46 * @subscription_list: list of subscription objects for this subscriber
48 */ 47 */
49struct tipc_subscriber { 48struct tipc_subscriber {
50 u32 port_ref; 49 int conid;
51 spinlock_t *lock; 50 spinlock_t lock;
52 struct list_head subscriber_list;
53 struct list_head subscription_list; 51 struct list_head subscription_list;
54}; 52};
55 53
56/** 54static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
57 * struct top_srv - TIPC network topology subscription service 55 void *usr_data, void *buf, size_t len);
58 * @setup_port: reference to TIPC port that handles subscription requests 56static void *subscr_named_msg_event(int conid);
59 * @subscription_count: number of active subscriptions (not subscribers!) 57static void subscr_conn_shutdown_event(int conid, void *usr_data);
60 * @subscriber_list: list of ports subscribing to service 58
61 * @lock: spinlock govering access to subscriber list 59static atomic_t subscription_count = ATOMIC_INIT(0);
62 */ 60
63struct top_srv { 61static struct sockaddr_tipc topsrv_addr __read_mostly = {
64 u32 setup_port; 62 .family = AF_TIPC,
65 atomic_t subscription_count; 63 .addrtype = TIPC_ADDR_NAMESEQ,
66 struct list_head subscriber_list; 64 .addr.nameseq.type = TIPC_TOP_SRV,
67 spinlock_t lock; 65 .addr.nameseq.lower = TIPC_TOP_SRV,
66 .addr.nameseq.upper = TIPC_TOP_SRV,
67 .scope = TIPC_NODE_SCOPE
68}; 68};
69 69
70static struct top_srv topsrv; 70static struct tipc_server topsrv __read_mostly = {
71 .saddr = &topsrv_addr,
72 .imp = TIPC_CRITICAL_IMPORTANCE,
73 .type = SOCK_SEQPACKET,
74 .max_rcvbuf_size = sizeof(struct tipc_subscr),
75 .name = "topology_server",
76 .tipc_conn_recvmsg = subscr_conn_msg_event,
77 .tipc_conn_new = subscr_named_msg_event,
78 .tipc_conn_shutdown = subscr_conn_shutdown_event,
79};
71 80
72/** 81/**
73 * htohl - convert value to endianness used by destination 82 * htohl - convert value to endianness used by destination
@@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap)
81 return swap ? swab32(in) : in; 90 return swap ? swab32(in) : in;
82} 91}
83 92
84/** 93static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
85 * subscr_send_event - send a message containing a tipc_event to the subscriber 94 u32 found_upper, u32 event, u32 port_ref,
86 *
87 * Note: Must not hold subscriber's server port lock, since tipc_send() will
88 * try to take the lock if the message is rejected and returned!
89 */
90static void subscr_send_event(struct tipc_subscription *sub,
91 u32 found_lower,
92 u32 found_upper,
93 u32 event,
94 u32 port_ref,
95 u32 node) 95 u32 node)
96{ 96{
97 struct iovec msg_sect; 97 struct tipc_subscriber *subscriber = sub->subscriber;
98 struct kvec msg_sect;
99 int ret;
98 100
99 msg_sect.iov_base = (void *)&sub->evt; 101 msg_sect.iov_base = (void *)&sub->evt;
100 msg_sect.iov_len = sizeof(struct tipc_event); 102 msg_sect.iov_len = sizeof(struct tipc_event);
@@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub,
104 sub->evt.found_upper = htohl(found_upper, sub->swap); 106 sub->evt.found_upper = htohl(found_upper, sub->swap);
105 sub->evt.port.ref = htohl(port_ref, sub->swap); 107 sub->evt.port.ref = htohl(port_ref, sub->swap);
106 sub->evt.port.node = htohl(node, sub->swap); 108 sub->evt.port.node = htohl(node, sub->swap);
107 tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); 109 ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
110 msg_sect.iov_base, msg_sect.iov_len);
111 if (ret < 0)
112 pr_err("Sending subscription event failed, no memory\n");
108} 113}
109 114
110/** 115/**
@@ -147,21 +152,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub,
147 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 152 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
148} 153}
149 154
150/**
151 * subscr_timeout - subscription timeout has occurred
152 */
153static void subscr_timeout(struct tipc_subscription *sub) 155static void subscr_timeout(struct tipc_subscription *sub)
154{ 156{
155 struct tipc_port *server_port; 157 struct tipc_subscriber *subscriber = sub->subscriber;
156 158
157 /* Validate server port reference (in case subscriber is terminating) */ 159 /* The spin lock per subscriber is used to protect its members */
158 server_port = tipc_port_lock(sub->server_ref); 160 spin_lock_bh(&subscriber->lock);
159 if (server_port == NULL) 161
162 /* Validate if the connection related to the subscriber is
163 * closed (in case subscriber is terminating)
164 */
165 if (subscriber->conid == 0) {
166 spin_unlock_bh(&subscriber->lock);
160 return; 167 return;
168 }
161 169
162 /* Validate timeout (in case subscription is being cancelled) */ 170 /* Validate timeout (in case subscription is being cancelled) */
163 if (sub->timeout == TIPC_WAIT_FOREVER) { 171 if (sub->timeout == TIPC_WAIT_FOREVER) {
164 tipc_port_unlock(server_port); 172 spin_unlock_bh(&subscriber->lock);
165 return; 173 return;
166 } 174 }
167 175
@@ -171,8 +179,7 @@ static void subscr_timeout(struct tipc_subscription *sub)
171 /* Unlink subscription from subscriber */ 179 /* Unlink subscription from subscriber */
172 list_del(&sub->subscription_list); 180 list_del(&sub->subscription_list);
173 181
174 /* Release subscriber's server port */ 182 spin_unlock_bh(&subscriber->lock);
175 tipc_port_unlock(server_port);
176 183
177 /* Notify subscriber of timeout */ 184 /* Notify subscriber of timeout */
178 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 185 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
@@ -181,64 +188,54 @@ static void subscr_timeout(struct tipc_subscription *sub)
181 /* Now destroy subscription */ 188 /* Now destroy subscription */
182 k_term_timer(&sub->timer); 189 k_term_timer(&sub->timer);
183 kfree(sub); 190 kfree(sub);
184 atomic_dec(&topsrv.subscription_count); 191 atomic_dec(&subscription_count);
185} 192}
186 193
187/** 194/**
188 * subscr_del - delete a subscription within a subscription list 195 * subscr_del - delete a subscription within a subscription list
189 * 196 *
190 * Called with subscriber port locked. 197 * Called with subscriber lock held.
191 */ 198 */
192static void subscr_del(struct tipc_subscription *sub) 199static void subscr_del(struct tipc_subscription *sub)
193{ 200{
194 tipc_nametbl_unsubscribe(sub); 201 tipc_nametbl_unsubscribe(sub);
195 list_del(&sub->subscription_list); 202 list_del(&sub->subscription_list);
196 kfree(sub); 203 kfree(sub);
197 atomic_dec(&topsrv.subscription_count); 204 atomic_dec(&subscription_count);
198} 205}
199 206
200/** 207/**
201 * subscr_terminate - terminate communication with a subscriber 208 * subscr_terminate - terminate communication with a subscriber
202 * 209 *
203 * Called with subscriber port locked. Routine must temporarily release lock 210 * Note: Must call it in process context since it might sleep.
204 * to enable subscription timeout routine(s) to finish without deadlocking;
205 * the lock is then reclaimed to allow caller to release it upon return.
206 * (This should work even in the unlikely event some other thread creates
207 * a new object reference in the interim that uses this lock; this routine will
208 * simply wait for it to be released, then claim it.)
209 */ 211 */
210static void subscr_terminate(struct tipc_subscriber *subscriber) 212static void subscr_terminate(struct tipc_subscriber *subscriber)
211{ 213{
212 u32 port_ref; 214 tipc_conn_terminate(&topsrv, subscriber->conid);
215}
216
217static void subscr_release(struct tipc_subscriber *subscriber)
218{
213 struct tipc_subscription *sub; 219 struct tipc_subscription *sub;
214 struct tipc_subscription *sub_temp; 220 struct tipc_subscription *sub_temp;
215 221
216 /* Invalidate subscriber reference */ 222 spin_lock_bh(&subscriber->lock);
217 port_ref = subscriber->port_ref;
218 subscriber->port_ref = 0;
219 spin_unlock_bh(subscriber->lock);
220 223
221 /* Sever connection to subscriber */ 224 /* Invalidate subscriber reference */
222 tipc_shutdown(port_ref); 225 subscriber->conid = 0;
223 tipc_deleteport(port_ref);
224 226
225 /* Destroy any existing subscriptions for subscriber */ 227 /* Destroy any existing subscriptions for subscriber */
226 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 228 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
227 subscription_list) { 229 subscription_list) {
228 if (sub->timeout != TIPC_WAIT_FOREVER) { 230 if (sub->timeout != TIPC_WAIT_FOREVER) {
231 spin_unlock_bh(&subscriber->lock);
229 k_cancel_timer(&sub->timer); 232 k_cancel_timer(&sub->timer);
230 k_term_timer(&sub->timer); 233 k_term_timer(&sub->timer);
234 spin_lock_bh(&subscriber->lock);
231 } 235 }
232 subscr_del(sub); 236 subscr_del(sub);
233 } 237 }
234 238 spin_unlock_bh(&subscriber->lock);
235 /* Remove subscriber from topology server's subscriber list */
236 spin_lock_bh(&topsrv.lock);
237 list_del(&subscriber->subscriber_list);
238 spin_unlock_bh(&topsrv.lock);
239
240 /* Reclaim subscriber lock */
241 spin_lock_bh(subscriber->lock);
242 239
243 /* Now destroy subscriber */ 240 /* Now destroy subscriber */
244 kfree(subscriber); 241 kfree(subscriber);
@@ -247,7 +244,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
247/** 244/**
248 * subscr_cancel - handle subscription cancellation request 245 * subscr_cancel - handle subscription cancellation request
249 * 246 *
250 * Called with subscriber port locked. Routine must temporarily release lock 247 * Called with subscriber lock held. Routine must temporarily release lock
251 * to enable the subscription timeout routine to finish without deadlocking; 248 * to enable the subscription timeout routine to finish without deadlocking;
252 * the lock is then reclaimed to allow caller to release it upon return. 249 * the lock is then reclaimed to allow caller to release it upon return.
253 * 250 *
@@ -274,10 +271,10 @@ static void subscr_cancel(struct tipc_subscr *s,
274 /* Cancel subscription timer (if used), then delete subscription */ 271 /* Cancel subscription timer (if used), then delete subscription */
275 if (sub->timeout != TIPC_WAIT_FOREVER) { 272 if (sub->timeout != TIPC_WAIT_FOREVER) {
276 sub->timeout = TIPC_WAIT_FOREVER; 273 sub->timeout = TIPC_WAIT_FOREVER;
277 spin_unlock_bh(subscriber->lock); 274 spin_unlock_bh(&subscriber->lock);
278 k_cancel_timer(&sub->timer); 275 k_cancel_timer(&sub->timer);
279 k_term_timer(&sub->timer); 276 k_term_timer(&sub->timer);
280 spin_lock_bh(subscriber->lock); 277 spin_lock_bh(&subscriber->lock);
281 } 278 }
282 subscr_del(sub); 279 subscr_del(sub);
283} 280}
@@ -285,7 +282,7 @@ static void subscr_cancel(struct tipc_subscr *s,
285/** 282/**
286 * subscr_subscribe - create subscription for subscriber 283 * subscr_subscribe - create subscription for subscriber
287 * 284 *
288 * Called with subscriber port locked. 285 * Called with subscriber lock held.
289 */ 286 */
290static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, 287static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
291 struct tipc_subscriber *subscriber) 288 struct tipc_subscriber *subscriber)
@@ -304,7 +301,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
304 } 301 }
305 302
306 /* Refuse subscription if global limit exceeded */ 303 /* Refuse subscription if global limit exceeded */
307 if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 304 if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
308 pr_warn("Subscription rejected, limit reached (%u)\n", 305 pr_warn("Subscription rejected, limit reached (%u)\n",
309 TIPC_MAX_SUBSCRIPTIONS); 306 TIPC_MAX_SUBSCRIPTIONS);
310 subscr_terminate(subscriber); 307 subscr_terminate(subscriber);
@@ -335,10 +332,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
335 } 332 }
336 INIT_LIST_HEAD(&sub->nameseq_list); 333 INIT_LIST_HEAD(&sub->nameseq_list);
337 list_add(&sub->subscription_list, &subscriber->subscription_list); 334 list_add(&sub->subscription_list, &subscriber->subscription_list);
338 sub->server_ref = subscriber->port_ref; 335 sub->subscriber = subscriber;
339 sub->swap = swap; 336 sub->swap = swap;
340 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 337 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
341 atomic_inc(&topsrv.subscription_count); 338 atomic_inc(&subscription_count);
342 if (sub->timeout != TIPC_WAIT_FOREVER) { 339 if (sub->timeout != TIPC_WAIT_FOREVER) {
343 k_init_timer(&sub->timer, 340 k_init_timer(&sub->timer,
344 (Handler)subscr_timeout, (unsigned long)sub); 341 (Handler)subscr_timeout, (unsigned long)sub);
@@ -348,196 +345,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
348 return sub; 345 return sub;
349} 346}
350 347
351/** 348/* Handle one termination request for the subscriber */
352 * subscr_conn_shutdown_event - handle termination request from subscriber 349static void subscr_conn_shutdown_event(int conid, void *usr_data)
353 *
354 * Called with subscriber's server port unlocked.
355 */
356static void subscr_conn_shutdown_event(void *usr_handle,
357 u32 port_ref,
358 struct sk_buff **buf,
359 unsigned char const *data,
360 unsigned int size,
361 int reason)
362{ 350{
363 struct tipc_subscriber *subscriber = usr_handle; 351 subscr_release((struct tipc_subscriber *)usr_data);
364 spinlock_t *subscriber_lock;
365
366 if (tipc_port_lock(port_ref) == NULL)
367 return;
368
369 subscriber_lock = subscriber->lock;
370 subscr_terminate(subscriber);
371 spin_unlock_bh(subscriber_lock);
372} 352}
373 353
374/** 354/* Handle one request to create a new subscription for the subscriber */
375 * subscr_conn_msg_event - handle new subscription request from subscriber 355static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
376 * 356 void *usr_data, void *buf, size_t len)
377 * Called with subscriber's server port unlocked.
378 */
379static void subscr_conn_msg_event(void *usr_handle,
380 u32 port_ref,
381 struct sk_buff **buf,
382 const unchar *data,
383 u32 size)
384{ 357{
385 struct tipc_subscriber *subscriber = usr_handle; 358 struct tipc_subscriber *subscriber = usr_data;
386 spinlock_t *subscriber_lock;
387 struct tipc_subscription *sub; 359 struct tipc_subscription *sub;
388 360
389 /* 361 spin_lock_bh(&subscriber->lock);
390 * Lock subscriber's server port (& make a local copy of lock pointer, 362 sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
391 * in case subscriber is deleted while processing subscription request) 363 if (sub)
392 */ 364 tipc_nametbl_subscribe(sub);
393 if (tipc_port_lock(port_ref) == NULL) 365 spin_unlock_bh(&subscriber->lock);
394 return;
395
396 subscriber_lock = subscriber->lock;
397
398 if (size != sizeof(struct tipc_subscr)) {
399 subscr_terminate(subscriber);
400 spin_unlock_bh(subscriber_lock);
401 } else {
402 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
403 spin_unlock_bh(subscriber_lock);
404 if (sub != NULL) {
405
406 /*
407 * We must release the server port lock before adding a
408 * subscription to the name table since TIPC needs to be
409 * able to (re)acquire the port lock if an event message
410 * issued by the subscription process is rejected and
411 * returned. The subscription cannot be deleted while
412 * it is being added to the name table because:
413 * a) the single-threading of the native API port code
414 * ensures the subscription cannot be cancelled and
415 * the subscriber connection cannot be broken, and
416 * b) the name table lock ensures the subscription
417 * timeout code cannot delete the subscription,
418 * so the subscription object is still protected.
419 */
420 tipc_nametbl_subscribe(sub);
421 }
422 }
423} 366}
424 367
425/** 368
426 * subscr_named_msg_event - handle request to establish a new subscriber 369/* Handle one request to establish a new subscriber */
427 */ 370static void *subscr_named_msg_event(int conid)
428static void subscr_named_msg_event(void *usr_handle,
429 u32 port_ref,
430 struct sk_buff **buf,
431 const unchar *data,
432 u32 size,
433 u32 importance,
434 struct tipc_portid const *orig,
435 struct tipc_name_seq const *dest)
436{ 371{
437 struct tipc_subscriber *subscriber; 372 struct tipc_subscriber *subscriber;
438 u32 server_port_ref;
439 373
440 /* Create subscriber object */ 374 /* Create subscriber object */
441 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); 375 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
442 if (subscriber == NULL) { 376 if (subscriber == NULL) {
443 pr_warn("Subscriber rejected, no memory\n"); 377 pr_warn("Subscriber rejected, no memory\n");
444 return; 378 return NULL;
445 } 379 }
446 INIT_LIST_HEAD(&subscriber->subscription_list); 380 INIT_LIST_HEAD(&subscriber->subscription_list);
447 INIT_LIST_HEAD(&subscriber->subscriber_list); 381 subscriber->conid = conid;
448 382 spin_lock_init(&subscriber->lock);
449 /* Create server port & establish connection to subscriber */
450 tipc_createport(subscriber,
451 importance,
452 NULL,
453 NULL,
454 subscr_conn_shutdown_event,
455 NULL,
456 NULL,
457 subscr_conn_msg_event,
458 NULL,
459 &subscriber->port_ref);
460 if (subscriber->port_ref == 0) {
461 pr_warn("Subscriber rejected, unable to create port\n");
462 kfree(subscriber);
463 return;
464 }
465 tipc_connect(subscriber->port_ref, orig);
466
467 /* Lock server port (& save lock address for future use) */
468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
469
470 /* Add subscriber to topology server's subscriber list */
471 spin_lock_bh(&topsrv.lock);
472 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
473 spin_unlock_bh(&topsrv.lock);
474
475 /* Unlock server port */
476 server_port_ref = subscriber->port_ref;
477 spin_unlock_bh(subscriber->lock);
478
479 /* Send an ACK- to complete connection handshaking */
480 tipc_send(server_port_ref, 0, NULL, 0);
481 383
482 /* Handle optional subscription request */ 384 return (void *)subscriber;
483 if (size != 0) {
484 subscr_conn_msg_event(subscriber, server_port_ref,
485 buf, data, size);
486 }
487} 385}
488 386
489int tipc_subscr_start(void) 387int tipc_subscr_start(void)
490{ 388{
491 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; 389 return tipc_server_start(&topsrv);
492 int res;
493
494 spin_lock_init(&topsrv.lock);
495 INIT_LIST_HEAD(&topsrv.subscriber_list);
496
497 res = tipc_createport(NULL,
498 TIPC_CRITICAL_IMPORTANCE,
499 NULL,
500 NULL,
501 NULL,
502 NULL,
503 subscr_named_msg_event,
504 NULL,
505 NULL,
506 &topsrv.setup_port);
507 if (res)
508 goto failed;
509
510 res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
511 if (res) {
512 tipc_deleteport(topsrv.setup_port);
513 topsrv.setup_port = 0;
514 goto failed;
515 }
516
517 return 0;
518
519failed:
520 pr_err("Failed to create subscription service\n");
521 return res;
522} 390}
523 391
524void tipc_subscr_stop(void) 392void tipc_subscr_stop(void)
525{ 393{
526 struct tipc_subscriber *subscriber; 394 tipc_server_stop(&topsrv);
527 struct tipc_subscriber *subscriber_temp;
528 spinlock_t *subscriber_lock;
529
530 if (topsrv.setup_port) {
531 tipc_deleteport(topsrv.setup_port);
532 topsrv.setup_port = 0;
533
534 list_for_each_entry_safe(subscriber, subscriber_temp,
535 &topsrv.subscriber_list,
536 subscriber_list) {
537 subscriber_lock = subscriber->lock;
538 spin_lock_bh(subscriber_lock);
539 subscr_terminate(subscriber);
540 spin_unlock_bh(subscriber_lock);
541 }
542 }
543} 395}
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 218d2e07f0cc..43e6d6332a02 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -2,7 +2,7 @@
2 * net/tipc/subscr.h: Include file for TIPC network topology service 2 * net/tipc/subscr.h: Include file for TIPC network topology service
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2006, Ericsson AB
5 * Copyright (c) 2005-2007, Wind River Systems 5 * Copyright (c) 2005-2007, 2012-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -37,10 +37,14 @@
37#ifndef _TIPC_SUBSCR_H 37#ifndef _TIPC_SUBSCR_H
38#define _TIPC_SUBSCR_H 38#define _TIPC_SUBSCR_H
39 39
40#include "server.h"
41
40struct tipc_subscription; 42struct tipc_subscription;
43struct tipc_subscriber;
41 44
42/** 45/**
43 * struct tipc_subscription - TIPC network topology subscription object 46 * struct tipc_subscription - TIPC network topology subscription object
47 * @subscriber: pointer to its subscriber
44 * @seq: name sequence associated with subscription 48 * @seq: name sequence associated with subscription
45 * @timeout: duration of subscription (in ms) 49 * @timeout: duration of subscription (in ms)
46 * @filter: event filtering to be done for subscription 50 * @filter: event filtering to be done for subscription
@@ -52,13 +56,13 @@ struct tipc_subscription;
52 * @evt: template for events generated by subscription 56 * @evt: template for events generated by subscription
53 */ 57 */
54struct tipc_subscription { 58struct tipc_subscription {
59 struct tipc_subscriber *subscriber;
55 struct tipc_name_seq seq; 60 struct tipc_name_seq seq;
56 u32 timeout; 61 u32 timeout;
57 u32 filter; 62 u32 filter;
58 struct timer_list timer; 63 struct timer_list timer;
59 struct list_head nameseq_list; 64 struct list_head nameseq_list;
60 struct list_head subscription_list; 65 struct list_head subscription_list;
61 u32 server_ref;
62 int swap; 66 int swap;
63 struct tipc_event evt; 67 struct tipc_event evt;
64}; 68};