aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/subscr.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/subscr.c')
-rw-r--r--net/tipc/subscr.c348
1 files changed, 97 insertions, 251 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 6b42d47029af..d38bb45d82e9 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -2,7 +2,7 @@
2 * net/tipc/subscr.c: TIPC network topology service 2 * net/tipc/subscr.c: TIPC network topology service
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -41,33 +41,42 @@
41 41
42/** 42/**
43 * struct tipc_subscriber - TIPC network topology subscriber 43 * struct tipc_subscriber - TIPC network topology subscriber
44 * @port_ref: object reference to server port connecting to subscriber 44 * @conid: connection identifier to server connecting to subscriber
45 * @lock: pointer to spinlock controlling access to subscriber's server port 45 * @lock: controll access to subscriber
46 * @subscriber_list: adjacent subscribers in top. server's list of subscribers
47 * @subscription_list: list of subscription objects for this subscriber 46 * @subscription_list: list of subscription objects for this subscriber
48 */ 47 */
49struct tipc_subscriber { 48struct tipc_subscriber {
50 u32 port_ref; 49 int conid;
51 spinlock_t *lock; 50 spinlock_t lock;
52 struct list_head subscriber_list;
53 struct list_head subscription_list; 51 struct list_head subscription_list;
54}; 52};
55 53
56/** 54static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
57 * struct top_srv - TIPC network topology subscription service 55 void *usr_data, void *buf, size_t len);
58 * @setup_port: reference to TIPC port that handles subscription requests 56static void *subscr_named_msg_event(int conid);
59 * @subscription_count: number of active subscriptions (not subscribers!) 57static void subscr_conn_shutdown_event(int conid, void *usr_data);
60 * @subscriber_list: list of ports subscribing to service 58
61 * @lock: spinlock govering access to subscriber list 59static atomic_t subscription_count = ATOMIC_INIT(0);
62 */ 60
63struct top_srv { 61static struct sockaddr_tipc topsrv_addr __read_mostly = {
64 u32 setup_port; 62 .family = AF_TIPC,
65 atomic_t subscription_count; 63 .addrtype = TIPC_ADDR_NAMESEQ,
66 struct list_head subscriber_list; 64 .addr.nameseq.type = TIPC_TOP_SRV,
67 spinlock_t lock; 65 .addr.nameseq.lower = TIPC_TOP_SRV,
66 .addr.nameseq.upper = TIPC_TOP_SRV,
67 .scope = TIPC_NODE_SCOPE
68}; 68};
69 69
70static struct top_srv topsrv; 70static struct tipc_server topsrv __read_mostly = {
71 .saddr = &topsrv_addr,
72 .imp = TIPC_CRITICAL_IMPORTANCE,
73 .type = SOCK_SEQPACKET,
74 .max_rcvbuf_size = sizeof(struct tipc_subscr),
75 .name = "topology_server",
76 .tipc_conn_recvmsg = subscr_conn_msg_event,
77 .tipc_conn_new = subscr_named_msg_event,
78 .tipc_conn_shutdown = subscr_conn_shutdown_event,
79};
71 80
72/** 81/**
73 * htohl - convert value to endianness used by destination 82 * htohl - convert value to endianness used by destination
@@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap)
81 return swap ? swab32(in) : in; 90 return swap ? swab32(in) : in;
82} 91}
83 92
84/** 93static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
85 * subscr_send_event - send a message containing a tipc_event to the subscriber 94 u32 found_upper, u32 event, u32 port_ref,
86 *
87 * Note: Must not hold subscriber's server port lock, since tipc_send() will
88 * try to take the lock if the message is rejected and returned!
89 */
90static void subscr_send_event(struct tipc_subscription *sub,
91 u32 found_lower,
92 u32 found_upper,
93 u32 event,
94 u32 port_ref,
95 u32 node) 95 u32 node)
96{ 96{
97 struct iovec msg_sect; 97 struct tipc_subscriber *subscriber = sub->subscriber;
98 struct kvec msg_sect;
99 int ret;
98 100
99 msg_sect.iov_base = (void *)&sub->evt; 101 msg_sect.iov_base = (void *)&sub->evt;
100 msg_sect.iov_len = sizeof(struct tipc_event); 102 msg_sect.iov_len = sizeof(struct tipc_event);
@@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub,
104 sub->evt.found_upper = htohl(found_upper, sub->swap); 106 sub->evt.found_upper = htohl(found_upper, sub->swap);
105 sub->evt.port.ref = htohl(port_ref, sub->swap); 107 sub->evt.port.ref = htohl(port_ref, sub->swap);
106 sub->evt.port.node = htohl(node, sub->swap); 108 sub->evt.port.node = htohl(node, sub->swap);
107 tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); 109 ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
110 msg_sect.iov_base, msg_sect.iov_len);
111 if (ret < 0)
112 pr_err("Sending subscription event failed, no memory\n");
108} 113}
109 114
110/** 115/**
@@ -112,10 +117,8 @@ static void subscr_send_event(struct tipc_subscription *sub,
112 * 117 *
113 * Returns 1 if there is overlap, otherwise 0. 118 * Returns 1 if there is overlap, otherwise 0.
114 */ 119 */
115int tipc_subscr_overlap(struct tipc_subscription *sub, 120int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
116 u32 found_lower,
117 u32 found_upper) 121 u32 found_upper)
118
119{ 122{
120 if (found_lower < sub->seq.lower) 123 if (found_lower < sub->seq.lower)
121 found_lower = sub->seq.lower; 124 found_lower = sub->seq.lower;
@@ -131,13 +134,9 @@ int tipc_subscr_overlap(struct tipc_subscription *sub,
131 * 134 *
132 * Protected by nameseq.lock in name_table.c 135 * Protected by nameseq.lock in name_table.c
133 */ 136 */
134void tipc_subscr_report_overlap(struct tipc_subscription *sub, 137void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
135 u32 found_lower, 138 u32 found_upper, u32 event, u32 port_ref,
136 u32 found_upper, 139 u32 node, int must)
137 u32 event,
138 u32 port_ref,
139 u32 node,
140 int must)
141{ 140{
142 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 141 if (!tipc_subscr_overlap(sub, found_lower, found_upper))
143 return; 142 return;
@@ -147,21 +146,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub,
147 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 146 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
148} 147}
149 148
150/**
151 * subscr_timeout - subscription timeout has occurred
152 */
153static void subscr_timeout(struct tipc_subscription *sub) 149static void subscr_timeout(struct tipc_subscription *sub)
154{ 150{
155 struct tipc_port *server_port; 151 struct tipc_subscriber *subscriber = sub->subscriber;
152
153 /* The spin lock per subscriber is used to protect its members */
154 spin_lock_bh(&subscriber->lock);
156 155
157 /* Validate server port reference (in case subscriber is terminating) */ 156 /* Validate if the connection related to the subscriber is
158 server_port = tipc_port_lock(sub->server_ref); 157 * closed (in case subscriber is terminating)
159 if (server_port == NULL) 158 */
159 if (subscriber->conid == 0) {
160 spin_unlock_bh(&subscriber->lock);
160 return; 161 return;
162 }
161 163
162 /* Validate timeout (in case subscription is being cancelled) */ 164 /* Validate timeout (in case subscription is being cancelled) */
163 if (sub->timeout == TIPC_WAIT_FOREVER) { 165 if (sub->timeout == TIPC_WAIT_FOREVER) {
164 tipc_port_unlock(server_port); 166 spin_unlock_bh(&subscriber->lock);
165 return; 167 return;
166 } 168 }
167 169
@@ -171,8 +173,7 @@ static void subscr_timeout(struct tipc_subscription *sub)
171 /* Unlink subscription from subscriber */ 173 /* Unlink subscription from subscriber */
172 list_del(&sub->subscription_list); 174 list_del(&sub->subscription_list);
173 175
174 /* Release subscriber's server port */ 176 spin_unlock_bh(&subscriber->lock);
175 tipc_port_unlock(server_port);
176 177
177 /* Notify subscriber of timeout */ 178 /* Notify subscriber of timeout */
178 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 179 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
@@ -181,64 +182,54 @@ static void subscr_timeout(struct tipc_subscription *sub)
181 /* Now destroy subscription */ 182 /* Now destroy subscription */
182 k_term_timer(&sub->timer); 183 k_term_timer(&sub->timer);
183 kfree(sub); 184 kfree(sub);
184 atomic_dec(&topsrv.subscription_count); 185 atomic_dec(&subscription_count);
185} 186}
186 187
187/** 188/**
188 * subscr_del - delete a subscription within a subscription list 189 * subscr_del - delete a subscription within a subscription list
189 * 190 *
190 * Called with subscriber port locked. 191 * Called with subscriber lock held.
191 */ 192 */
192static void subscr_del(struct tipc_subscription *sub) 193static void subscr_del(struct tipc_subscription *sub)
193{ 194{
194 tipc_nametbl_unsubscribe(sub); 195 tipc_nametbl_unsubscribe(sub);
195 list_del(&sub->subscription_list); 196 list_del(&sub->subscription_list);
196 kfree(sub); 197 kfree(sub);
197 atomic_dec(&topsrv.subscription_count); 198 atomic_dec(&subscription_count);
198} 199}
199 200
200/** 201/**
201 * subscr_terminate - terminate communication with a subscriber 202 * subscr_terminate - terminate communication with a subscriber
202 * 203 *
203 * Called with subscriber port locked. Routine must temporarily release lock 204 * Note: Must call it in process context since it might sleep.
204 * to enable subscription timeout routine(s) to finish without deadlocking;
205 * the lock is then reclaimed to allow caller to release it upon return.
206 * (This should work even in the unlikely event some other thread creates
207 * a new object reference in the interim that uses this lock; this routine will
208 * simply wait for it to be released, then claim it.)
209 */ 205 */
210static void subscr_terminate(struct tipc_subscriber *subscriber) 206static void subscr_terminate(struct tipc_subscriber *subscriber)
211{ 207{
212 u32 port_ref; 208 tipc_conn_terminate(&topsrv, subscriber->conid);
209}
210
211static void subscr_release(struct tipc_subscriber *subscriber)
212{
213 struct tipc_subscription *sub; 213 struct tipc_subscription *sub;
214 struct tipc_subscription *sub_temp; 214 struct tipc_subscription *sub_temp;
215 215
216 /* Invalidate subscriber reference */ 216 spin_lock_bh(&subscriber->lock);
217 port_ref = subscriber->port_ref;
218 subscriber->port_ref = 0;
219 spin_unlock_bh(subscriber->lock);
220 217
221 /* Sever connection to subscriber */ 218 /* Invalidate subscriber reference */
222 tipc_shutdown(port_ref); 219 subscriber->conid = 0;
223 tipc_deleteport(port_ref);
224 220
225 /* Destroy any existing subscriptions for subscriber */ 221 /* Destroy any existing subscriptions for subscriber */
226 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 222 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
227 subscription_list) { 223 subscription_list) {
228 if (sub->timeout != TIPC_WAIT_FOREVER) { 224 if (sub->timeout != TIPC_WAIT_FOREVER) {
225 spin_unlock_bh(&subscriber->lock);
229 k_cancel_timer(&sub->timer); 226 k_cancel_timer(&sub->timer);
230 k_term_timer(&sub->timer); 227 k_term_timer(&sub->timer);
228 spin_lock_bh(&subscriber->lock);
231 } 229 }
232 subscr_del(sub); 230 subscr_del(sub);
233 } 231 }
234 232 spin_unlock_bh(&subscriber->lock);
235 /* Remove subscriber from topology server's subscriber list */
236 spin_lock_bh(&topsrv.lock);
237 list_del(&subscriber->subscriber_list);
238 spin_unlock_bh(&topsrv.lock);
239
240 /* Reclaim subscriber lock */
241 spin_lock_bh(subscriber->lock);
242 233
243 /* Now destroy subscriber */ 234 /* Now destroy subscriber */
244 kfree(subscriber); 235 kfree(subscriber);
@@ -247,7 +238,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
247/** 238/**
248 * subscr_cancel - handle subscription cancellation request 239 * subscr_cancel - handle subscription cancellation request
249 * 240 *
250 * Called with subscriber port locked. Routine must temporarily release lock 241 * Called with subscriber lock held. Routine must temporarily release lock
251 * to enable the subscription timeout routine to finish without deadlocking; 242 * to enable the subscription timeout routine to finish without deadlocking;
252 * the lock is then reclaimed to allow caller to release it upon return. 243 * the lock is then reclaimed to allow caller to release it upon return.
253 * 244 *
@@ -274,10 +265,10 @@ static void subscr_cancel(struct tipc_subscr *s,
274 /* Cancel subscription timer (if used), then delete subscription */ 265 /* Cancel subscription timer (if used), then delete subscription */
275 if (sub->timeout != TIPC_WAIT_FOREVER) { 266 if (sub->timeout != TIPC_WAIT_FOREVER) {
276 sub->timeout = TIPC_WAIT_FOREVER; 267 sub->timeout = TIPC_WAIT_FOREVER;
277 spin_unlock_bh(subscriber->lock); 268 spin_unlock_bh(&subscriber->lock);
278 k_cancel_timer(&sub->timer); 269 k_cancel_timer(&sub->timer);
279 k_term_timer(&sub->timer); 270 k_term_timer(&sub->timer);
280 spin_lock_bh(subscriber->lock); 271 spin_lock_bh(&subscriber->lock);
281 } 272 }
282 subscr_del(sub); 273 subscr_del(sub);
283} 274}
@@ -285,7 +276,7 @@ static void subscr_cancel(struct tipc_subscr *s,
285/** 276/**
286 * subscr_subscribe - create subscription for subscriber 277 * subscr_subscribe - create subscription for subscriber
287 * 278 *
288 * Called with subscriber port locked. 279 * Called with subscriber lock held.
289 */ 280 */
290static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, 281static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
291 struct tipc_subscriber *subscriber) 282 struct tipc_subscriber *subscriber)
@@ -304,7 +295,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
304 } 295 }
305 296
306 /* Refuse subscription if global limit exceeded */ 297 /* Refuse subscription if global limit exceeded */
307 if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 298 if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
308 pr_warn("Subscription rejected, limit reached (%u)\n", 299 pr_warn("Subscription rejected, limit reached (%u)\n",
309 TIPC_MAX_SUBSCRIPTIONS); 300 TIPC_MAX_SUBSCRIPTIONS);
310 subscr_terminate(subscriber); 301 subscr_terminate(subscriber);
@@ -335,10 +326,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
335 } 326 }
336 INIT_LIST_HEAD(&sub->nameseq_list); 327 INIT_LIST_HEAD(&sub->nameseq_list);
337 list_add(&sub->subscription_list, &subscriber->subscription_list); 328 list_add(&sub->subscription_list, &subscriber->subscription_list);
338 sub->server_ref = subscriber->port_ref; 329 sub->subscriber = subscriber;
339 sub->swap = swap; 330 sub->swap = swap;
340 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 331 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
341 atomic_inc(&topsrv.subscription_count); 332 atomic_inc(&subscription_count);
342 if (sub->timeout != TIPC_WAIT_FOREVER) { 333 if (sub->timeout != TIPC_WAIT_FOREVER) {
343 k_init_timer(&sub->timer, 334 k_init_timer(&sub->timer,
344 (Handler)subscr_timeout, (unsigned long)sub); 335 (Handler)subscr_timeout, (unsigned long)sub);
@@ -348,196 +339,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
348 return sub; 339 return sub;
349} 340}
350 341
351/** 342/* Handle one termination request for the subscriber */
352 * subscr_conn_shutdown_event - handle termination request from subscriber 343static void subscr_conn_shutdown_event(int conid, void *usr_data)
353 *
354 * Called with subscriber's server port unlocked.
355 */
356static void subscr_conn_shutdown_event(void *usr_handle,
357 u32 port_ref,
358 struct sk_buff **buf,
359 unsigned char const *data,
360 unsigned int size,
361 int reason)
362{ 344{
363 struct tipc_subscriber *subscriber = usr_handle; 345 subscr_release((struct tipc_subscriber *)usr_data);
364 spinlock_t *subscriber_lock;
365
366 if (tipc_port_lock(port_ref) == NULL)
367 return;
368
369 subscriber_lock = subscriber->lock;
370 subscr_terminate(subscriber);
371 spin_unlock_bh(subscriber_lock);
372} 346}
373 347
374/** 348/* Handle one request to create a new subscription for the subscriber */
375 * subscr_conn_msg_event - handle new subscription request from subscriber 349static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
376 * 350 void *usr_data, void *buf, size_t len)
377 * Called with subscriber's server port unlocked.
378 */
379static void subscr_conn_msg_event(void *usr_handle,
380 u32 port_ref,
381 struct sk_buff **buf,
382 const unchar *data,
383 u32 size)
384{ 351{
385 struct tipc_subscriber *subscriber = usr_handle; 352 struct tipc_subscriber *subscriber = usr_data;
386 spinlock_t *subscriber_lock;
387 struct tipc_subscription *sub; 353 struct tipc_subscription *sub;
388 354
389 /* 355 spin_lock_bh(&subscriber->lock);
390 * Lock subscriber's server port (& make a local copy of lock pointer, 356 sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
391 * in case subscriber is deleted while processing subscription request) 357 if (sub)
392 */ 358 tipc_nametbl_subscribe(sub);
393 if (tipc_port_lock(port_ref) == NULL) 359 spin_unlock_bh(&subscriber->lock);
394 return;
395
396 subscriber_lock = subscriber->lock;
397
398 if (size != sizeof(struct tipc_subscr)) {
399 subscr_terminate(subscriber);
400 spin_unlock_bh(subscriber_lock);
401 } else {
402 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
403 spin_unlock_bh(subscriber_lock);
404 if (sub != NULL) {
405
406 /*
407 * We must release the server port lock before adding a
408 * subscription to the name table since TIPC needs to be
409 * able to (re)acquire the port lock if an event message
410 * issued by the subscription process is rejected and
411 * returned. The subscription cannot be deleted while
412 * it is being added to the name table because:
413 * a) the single-threading of the native API port code
414 * ensures the subscription cannot be cancelled and
415 * the subscriber connection cannot be broken, and
416 * b) the name table lock ensures the subscription
417 * timeout code cannot delete the subscription,
418 * so the subscription object is still protected.
419 */
420 tipc_nametbl_subscribe(sub);
421 }
422 }
423} 360}
424 361
425/** 362
426 * subscr_named_msg_event - handle request to establish a new subscriber 363/* Handle one request to establish a new subscriber */
427 */ 364static void *subscr_named_msg_event(int conid)
428static void subscr_named_msg_event(void *usr_handle,
429 u32 port_ref,
430 struct sk_buff **buf,
431 const unchar *data,
432 u32 size,
433 u32 importance,
434 struct tipc_portid const *orig,
435 struct tipc_name_seq const *dest)
436{ 365{
437 struct tipc_subscriber *subscriber; 366 struct tipc_subscriber *subscriber;
438 u32 server_port_ref;
439 367
440 /* Create subscriber object */ 368 /* Create subscriber object */
441 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); 369 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
442 if (subscriber == NULL) { 370 if (subscriber == NULL) {
443 pr_warn("Subscriber rejected, no memory\n"); 371 pr_warn("Subscriber rejected, no memory\n");
444 return; 372 return NULL;
445 } 373 }
446 INIT_LIST_HEAD(&subscriber->subscription_list); 374 INIT_LIST_HEAD(&subscriber->subscription_list);
447 INIT_LIST_HEAD(&subscriber->subscriber_list); 375 subscriber->conid = conid;
448 376 spin_lock_init(&subscriber->lock);
449 /* Create server port & establish connection to subscriber */
450 tipc_createport(subscriber,
451 importance,
452 NULL,
453 NULL,
454 subscr_conn_shutdown_event,
455 NULL,
456 NULL,
457 subscr_conn_msg_event,
458 NULL,
459 &subscriber->port_ref);
460 if (subscriber->port_ref == 0) {
461 pr_warn("Subscriber rejected, unable to create port\n");
462 kfree(subscriber);
463 return;
464 }
465 tipc_connect(subscriber->port_ref, orig);
466
467 /* Lock server port (& save lock address for future use) */
468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
469
470 /* Add subscriber to topology server's subscriber list */
471 spin_lock_bh(&topsrv.lock);
472 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
473 spin_unlock_bh(&topsrv.lock);
474
475 /* Unlock server port */
476 server_port_ref = subscriber->port_ref;
477 spin_unlock_bh(subscriber->lock);
478
479 /* Send an ACK- to complete connection handshaking */
480 tipc_send(server_port_ref, 0, NULL, 0);
481 377
482 /* Handle optional subscription request */ 378 return (void *)subscriber;
483 if (size != 0) {
484 subscr_conn_msg_event(subscriber, server_port_ref,
485 buf, data, size);
486 }
487} 379}
488 380
489int tipc_subscr_start(void) 381int tipc_subscr_start(void)
490{ 382{
491 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; 383 return tipc_server_start(&topsrv);
492 int res;
493
494 spin_lock_init(&topsrv.lock);
495 INIT_LIST_HEAD(&topsrv.subscriber_list);
496
497 res = tipc_createport(NULL,
498 TIPC_CRITICAL_IMPORTANCE,
499 NULL,
500 NULL,
501 NULL,
502 NULL,
503 subscr_named_msg_event,
504 NULL,
505 NULL,
506 &topsrv.setup_port);
507 if (res)
508 goto failed;
509
510 res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
511 if (res) {
512 tipc_deleteport(topsrv.setup_port);
513 topsrv.setup_port = 0;
514 goto failed;
515 }
516
517 return 0;
518
519failed:
520 pr_err("Failed to create subscription service\n");
521 return res;
522} 384}
523 385
524void tipc_subscr_stop(void) 386void tipc_subscr_stop(void)
525{ 387{
526 struct tipc_subscriber *subscriber; 388 tipc_server_stop(&topsrv);
527 struct tipc_subscriber *subscriber_temp;
528 spinlock_t *subscriber_lock;
529
530 if (topsrv.setup_port) {
531 tipc_deleteport(topsrv.setup_port);
532 topsrv.setup_port = 0;
533
534 list_for_each_entry_safe(subscriber, subscriber_temp,
535 &topsrv.subscriber_list,
536 subscriber_list) {
537 subscriber_lock = subscriber->lock;
538 spin_lock_bh(subscriber_lock);
539 subscr_terminate(subscriber);
540 spin_unlock_bh(subscriber_lock);
541 }
542 }
543} 389}