aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2013-06-17 10:54:40 -0400
committerDavid S. Miller <davem@davemloft.net>2013-06-17 18:53:00 -0400
commit13a2e89873506d64d7e52f17b571da371a3e25a4 (patch)
tree8dc46d4d6eabe1789bf3c250d26e46a4c98a6377 /net
parentc5fa7b3cf3cb22e4ac60485fc2dc187fe012910f (diff)
tipc: convert topology server to use new server facility
As the new TIPC server infrastructure has been introduced, we can now convert the TIPC topology server to it. We get two benefits from doing this: 1) It simplifies the topology server locking policy. In the original locking policy, we placed one spin lock pointer in the tipc_subscriber structure to reuse the lock of the subscriber's server port, controlling access to members of tipc_subscriber instance. That is, we only used one lock to ensure both tipc_port and tipc_subscriber members were safely accessed. Now we introduce another spin lock for tipc_subscriber structure only protecting themselves, to get a finer granularity locking policy. Moreover, the change will allow us to make the topology server code more readable and maintainable. 2) It fixes a bug where sent subscription events may be lost when the topology port is congested. Using the new service, the topology server now queues sent events into an outgoing buffer, and then wakes up a sender process which has been blocked in workqueue context. The process will keep picking events from the buffer and send them to their respective subscribers, using the kernel socket interface, until the buffer is empty. Even if the socket is congested during transmission there is no risk that events may be dropped, since the sender process may block when needed. Some minor reordering of initialization is done, since we now have a scenario where the topology server must be started after socket initialization has taken place, as the former depends on the latter. And overall, we see a simplification of the TIPC subscriber code in making this changeover. Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/core.c6
-rw-r--r--net/tipc/socket.c3
-rw-r--r--net/tipc/subscr.c334
-rw-r--r--net/tipc/subscr.h8
4 files changed, 104 insertions, 247 deletions
diff --git a/net/tipc/core.c b/net/tipc/core.c
index b0e42a087291..15bbe99b609d 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -2,7 +2,7 @@
2 * net/tipc/core.c: TIPC module code 2 * net/tipc/core.c: TIPC module code
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2006, Ericsson AB
5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2006, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -137,8 +137,6 @@ static int tipc_core_start(void)
137 if (!res) 137 if (!res)
138 res = tipc_nametbl_init(); 138 res = tipc_nametbl_init();
139 if (!res) 139 if (!res)
140 res = tipc_subscr_start();
141 if (!res)
142 res = tipc_cfg_init(); 140 res = tipc_cfg_init();
143 if (!res) 141 if (!res)
144 res = tipc_netlink_start(); 142 res = tipc_netlink_start();
@@ -146,6 +144,8 @@ static int tipc_core_start(void)
146 res = tipc_socket_init(); 144 res = tipc_socket_init();
147 if (!res) 145 if (!res)
148 res = tipc_register_sysctl(); 146 res = tipc_register_sysctl();
147 if (!res)
148 res = tipc_subscr_start();
149 if (res) 149 if (res)
150 tipc_core_stop(); 150 tipc_core_stop();
151 151
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index bd8e2cdeceef..d0254157a30d 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -402,7 +402,8 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
402 else if (addr->addrtype != TIPC_ADDR_NAMESEQ) 402 else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
403 return -EAFNOSUPPORT; 403 return -EAFNOSUPPORT;
404 404
405 if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES) 405 if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
406 (addr->addr.nameseq.type != TIPC_TOP_SRV))
406 return -EACCES; 407 return -EACCES;
407 408
408 return (addr->scope > 0) ? 409 return (addr->scope > 0) ?
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 6b42d47029af..f6be92a6973a 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -2,7 +2,7 @@
2 * net/tipc/subscr.c: TIPC network topology service 2 * net/tipc/subscr.c: TIPC network topology service
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, Ericsson AB
5 * Copyright (c) 2005-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -41,33 +41,42 @@
41 41
42/** 42/**
43 * struct tipc_subscriber - TIPC network topology subscriber 43 * struct tipc_subscriber - TIPC network topology subscriber
44 * @port_ref: object reference to server port connecting to subscriber 44 * @conid: connection identifier to server connecting to subscriber
45 * @lock: pointer to spinlock controlling access to subscriber's server port 45 * @lock: controll access to subscriber
46 * @subscriber_list: adjacent subscribers in top. server's list of subscribers
47 * @subscription_list: list of subscription objects for this subscriber 46 * @subscription_list: list of subscription objects for this subscriber
48 */ 47 */
49struct tipc_subscriber { 48struct tipc_subscriber {
50 u32 port_ref; 49 int conid;
51 spinlock_t *lock; 50 spinlock_t lock;
52 struct list_head subscriber_list;
53 struct list_head subscription_list; 51 struct list_head subscription_list;
54}; 52};
55 53
56/** 54static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
57 * struct top_srv - TIPC network topology subscription service 55 void *usr_data, void *buf, size_t len);
58 * @setup_port: reference to TIPC port that handles subscription requests 56static void *subscr_named_msg_event(int conid);
59 * @subscription_count: number of active subscriptions (not subscribers!) 57static void subscr_conn_shutdown_event(int conid, void *usr_data);
60 * @subscriber_list: list of ports subscribing to service 58
61 * @lock: spinlock govering access to subscriber list 59static atomic_t subscription_count = ATOMIC_INIT(0);
62 */ 60
63struct top_srv { 61static struct sockaddr_tipc topsrv_addr __read_mostly = {
64 u32 setup_port; 62 .family = AF_TIPC,
65 atomic_t subscription_count; 63 .addrtype = TIPC_ADDR_NAMESEQ,
66 struct list_head subscriber_list; 64 .addr.nameseq.type = TIPC_TOP_SRV,
67 spinlock_t lock; 65 .addr.nameseq.lower = TIPC_TOP_SRV,
66 .addr.nameseq.upper = TIPC_TOP_SRV,
67 .scope = TIPC_NODE_SCOPE
68}; 68};
69 69
70static struct top_srv topsrv; 70static struct tipc_server topsrv __read_mostly = {
71 .saddr = &topsrv_addr,
72 .imp = TIPC_CRITICAL_IMPORTANCE,
73 .type = SOCK_SEQPACKET,
74 .max_rcvbuf_size = sizeof(struct tipc_subscr),
75 .name = "topology_server",
76 .tipc_conn_recvmsg = subscr_conn_msg_event,
77 .tipc_conn_new = subscr_named_msg_event,
78 .tipc_conn_shutdown = subscr_conn_shutdown_event,
79};
71 80
72/** 81/**
73 * htohl - convert value to endianness used by destination 82 * htohl - convert value to endianness used by destination
@@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap)
81 return swap ? swab32(in) : in; 90 return swap ? swab32(in) : in;
82} 91}
83 92
84/** 93static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
85 * subscr_send_event - send a message containing a tipc_event to the subscriber 94 u32 found_upper, u32 event, u32 port_ref,
86 *
87 * Note: Must not hold subscriber's server port lock, since tipc_send() will
88 * try to take the lock if the message is rejected and returned!
89 */
90static void subscr_send_event(struct tipc_subscription *sub,
91 u32 found_lower,
92 u32 found_upper,
93 u32 event,
94 u32 port_ref,
95 u32 node) 95 u32 node)
96{ 96{
97 struct iovec msg_sect; 97 struct tipc_subscriber *subscriber = sub->subscriber;
98 struct kvec msg_sect;
99 int ret;
98 100
99 msg_sect.iov_base = (void *)&sub->evt; 101 msg_sect.iov_base = (void *)&sub->evt;
100 msg_sect.iov_len = sizeof(struct tipc_event); 102 msg_sect.iov_len = sizeof(struct tipc_event);
@@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub,
104 sub->evt.found_upper = htohl(found_upper, sub->swap); 106 sub->evt.found_upper = htohl(found_upper, sub->swap);
105 sub->evt.port.ref = htohl(port_ref, sub->swap); 107 sub->evt.port.ref = htohl(port_ref, sub->swap);
106 sub->evt.port.node = htohl(node, sub->swap); 108 sub->evt.port.node = htohl(node, sub->swap);
107 tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); 109 ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
110 msg_sect.iov_base, msg_sect.iov_len);
111 if (ret < 0)
112 pr_err("Sending subscription event failed, no memory\n");
108} 113}
109 114
110/** 115/**
@@ -147,21 +152,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub,
147 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 152 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
148} 153}
149 154
150/**
151 * subscr_timeout - subscription timeout has occurred
152 */
153static void subscr_timeout(struct tipc_subscription *sub) 155static void subscr_timeout(struct tipc_subscription *sub)
154{ 156{
155 struct tipc_port *server_port; 157 struct tipc_subscriber *subscriber = sub->subscriber;
156 158
157 /* Validate server port reference (in case subscriber is terminating) */ 159 /* The spin lock per subscriber is used to protect its members */
158 server_port = tipc_port_lock(sub->server_ref); 160 spin_lock_bh(&subscriber->lock);
159 if (server_port == NULL) 161
162 /* Validate if the connection related to the subscriber is
163 * closed (in case subscriber is terminating)
164 */
165 if (subscriber->conid == 0) {
166 spin_unlock_bh(&subscriber->lock);
160 return; 167 return;
168 }
161 169
162 /* Validate timeout (in case subscription is being cancelled) */ 170 /* Validate timeout (in case subscription is being cancelled) */
163 if (sub->timeout == TIPC_WAIT_FOREVER) { 171 if (sub->timeout == TIPC_WAIT_FOREVER) {
164 tipc_port_unlock(server_port); 172 spin_unlock_bh(&subscriber->lock);
165 return; 173 return;
166 } 174 }
167 175
@@ -171,8 +179,7 @@ static void subscr_timeout(struct tipc_subscription *sub)
171 /* Unlink subscription from subscriber */ 179 /* Unlink subscription from subscriber */
172 list_del(&sub->subscription_list); 180 list_del(&sub->subscription_list);
173 181
174 /* Release subscriber's server port */ 182 spin_unlock_bh(&subscriber->lock);
175 tipc_port_unlock(server_port);
176 183
177 /* Notify subscriber of timeout */ 184 /* Notify subscriber of timeout */
178 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 185 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
@@ -181,64 +188,54 @@ static void subscr_timeout(struct tipc_subscription *sub)
181 /* Now destroy subscription */ 188 /* Now destroy subscription */
182 k_term_timer(&sub->timer); 189 k_term_timer(&sub->timer);
183 kfree(sub); 190 kfree(sub);
184 atomic_dec(&topsrv.subscription_count); 191 atomic_dec(&subscription_count);
185} 192}
186 193
187/** 194/**
188 * subscr_del - delete a subscription within a subscription list 195 * subscr_del - delete a subscription within a subscription list
189 * 196 *
190 * Called with subscriber port locked. 197 * Called with subscriber lock held.
191 */ 198 */
192static void subscr_del(struct tipc_subscription *sub) 199static void subscr_del(struct tipc_subscription *sub)
193{ 200{
194 tipc_nametbl_unsubscribe(sub); 201 tipc_nametbl_unsubscribe(sub);
195 list_del(&sub->subscription_list); 202 list_del(&sub->subscription_list);
196 kfree(sub); 203 kfree(sub);
197 atomic_dec(&topsrv.subscription_count); 204 atomic_dec(&subscription_count);
198} 205}
199 206
200/** 207/**
201 * subscr_terminate - terminate communication with a subscriber 208 * subscr_terminate - terminate communication with a subscriber
202 * 209 *
203 * Called with subscriber port locked. Routine must temporarily release lock 210 * Note: Must call it in process context since it might sleep.
204 * to enable subscription timeout routine(s) to finish without deadlocking;
205 * the lock is then reclaimed to allow caller to release it upon return.
206 * (This should work even in the unlikely event some other thread creates
207 * a new object reference in the interim that uses this lock; this routine will
208 * simply wait for it to be released, then claim it.)
209 */ 211 */
210static void subscr_terminate(struct tipc_subscriber *subscriber) 212static void subscr_terminate(struct tipc_subscriber *subscriber)
211{ 213{
212 u32 port_ref; 214 tipc_conn_terminate(&topsrv, subscriber->conid);
215}
216
217static void subscr_release(struct tipc_subscriber *subscriber)
218{
213 struct tipc_subscription *sub; 219 struct tipc_subscription *sub;
214 struct tipc_subscription *sub_temp; 220 struct tipc_subscription *sub_temp;
215 221
216 /* Invalidate subscriber reference */ 222 spin_lock_bh(&subscriber->lock);
217 port_ref = subscriber->port_ref;
218 subscriber->port_ref = 0;
219 spin_unlock_bh(subscriber->lock);
220 223
221 /* Sever connection to subscriber */ 224 /* Invalidate subscriber reference */
222 tipc_shutdown(port_ref); 225 subscriber->conid = 0;
223 tipc_deleteport(port_ref);
224 226
225 /* Destroy any existing subscriptions for subscriber */ 227 /* Destroy any existing subscriptions for subscriber */
226 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 228 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
227 subscription_list) { 229 subscription_list) {
228 if (sub->timeout != TIPC_WAIT_FOREVER) { 230 if (sub->timeout != TIPC_WAIT_FOREVER) {
231 spin_unlock_bh(&subscriber->lock);
229 k_cancel_timer(&sub->timer); 232 k_cancel_timer(&sub->timer);
230 k_term_timer(&sub->timer); 233 k_term_timer(&sub->timer);
234 spin_lock_bh(&subscriber->lock);
231 } 235 }
232 subscr_del(sub); 236 subscr_del(sub);
233 } 237 }
234 238 spin_unlock_bh(&subscriber->lock);
235 /* Remove subscriber from topology server's subscriber list */
236 spin_lock_bh(&topsrv.lock);
237 list_del(&subscriber->subscriber_list);
238 spin_unlock_bh(&topsrv.lock);
239
240 /* Reclaim subscriber lock */
241 spin_lock_bh(subscriber->lock);
242 239
243 /* Now destroy subscriber */ 240 /* Now destroy subscriber */
244 kfree(subscriber); 241 kfree(subscriber);
@@ -247,7 +244,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
247/** 244/**
248 * subscr_cancel - handle subscription cancellation request 245 * subscr_cancel - handle subscription cancellation request
249 * 246 *
250 * Called with subscriber port locked. Routine must temporarily release lock 247 * Called with subscriber lock held. Routine must temporarily release lock
251 * to enable the subscription timeout routine to finish without deadlocking; 248 * to enable the subscription timeout routine to finish without deadlocking;
252 * the lock is then reclaimed to allow caller to release it upon return. 249 * the lock is then reclaimed to allow caller to release it upon return.
253 * 250 *
@@ -274,10 +271,10 @@ static void subscr_cancel(struct tipc_subscr *s,
274 /* Cancel subscription timer (if used), then delete subscription */ 271 /* Cancel subscription timer (if used), then delete subscription */
275 if (sub->timeout != TIPC_WAIT_FOREVER) { 272 if (sub->timeout != TIPC_WAIT_FOREVER) {
276 sub->timeout = TIPC_WAIT_FOREVER; 273 sub->timeout = TIPC_WAIT_FOREVER;
277 spin_unlock_bh(subscriber->lock); 274 spin_unlock_bh(&subscriber->lock);
278 k_cancel_timer(&sub->timer); 275 k_cancel_timer(&sub->timer);
279 k_term_timer(&sub->timer); 276 k_term_timer(&sub->timer);
280 spin_lock_bh(subscriber->lock); 277 spin_lock_bh(&subscriber->lock);
281 } 278 }
282 subscr_del(sub); 279 subscr_del(sub);
283} 280}
@@ -285,7 +282,7 @@ static void subscr_cancel(struct tipc_subscr *s,
285/** 282/**
286 * subscr_subscribe - create subscription for subscriber 283 * subscr_subscribe - create subscription for subscriber
287 * 284 *
288 * Called with subscriber port locked. 285 * Called with subscriber lock held.
289 */ 286 */
290static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, 287static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
291 struct tipc_subscriber *subscriber) 288 struct tipc_subscriber *subscriber)
@@ -304,7 +301,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
304 } 301 }
305 302
306 /* Refuse subscription if global limit exceeded */ 303 /* Refuse subscription if global limit exceeded */
307 if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 304 if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
308 pr_warn("Subscription rejected, limit reached (%u)\n", 305 pr_warn("Subscription rejected, limit reached (%u)\n",
309 TIPC_MAX_SUBSCRIPTIONS); 306 TIPC_MAX_SUBSCRIPTIONS);
310 subscr_terminate(subscriber); 307 subscr_terminate(subscriber);
@@ -335,10 +332,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
335 } 332 }
336 INIT_LIST_HEAD(&sub->nameseq_list); 333 INIT_LIST_HEAD(&sub->nameseq_list);
337 list_add(&sub->subscription_list, &subscriber->subscription_list); 334 list_add(&sub->subscription_list, &subscriber->subscription_list);
338 sub->server_ref = subscriber->port_ref; 335 sub->subscriber = subscriber;
339 sub->swap = swap; 336 sub->swap = swap;
340 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 337 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
341 atomic_inc(&topsrv.subscription_count); 338 atomic_inc(&subscription_count);
342 if (sub->timeout != TIPC_WAIT_FOREVER) { 339 if (sub->timeout != TIPC_WAIT_FOREVER) {
343 k_init_timer(&sub->timer, 340 k_init_timer(&sub->timer,
344 (Handler)subscr_timeout, (unsigned long)sub); 341 (Handler)subscr_timeout, (unsigned long)sub);
@@ -348,196 +345,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
348 return sub; 345 return sub;
349} 346}
350 347
351/** 348/* Handle one termination request for the subscriber */
352 * subscr_conn_shutdown_event - handle termination request from subscriber 349static void subscr_conn_shutdown_event(int conid, void *usr_data)
353 *
354 * Called with subscriber's server port unlocked.
355 */
356static void subscr_conn_shutdown_event(void *usr_handle,
357 u32 port_ref,
358 struct sk_buff **buf,
359 unsigned char const *data,
360 unsigned int size,
361 int reason)
362{ 350{
363 struct tipc_subscriber *subscriber = usr_handle; 351 subscr_release((struct tipc_subscriber *)usr_data);
364 spinlock_t *subscriber_lock;
365
366 if (tipc_port_lock(port_ref) == NULL)
367 return;
368
369 subscriber_lock = subscriber->lock;
370 subscr_terminate(subscriber);
371 spin_unlock_bh(subscriber_lock);
372} 352}
373 353
374/** 354/* Handle one request to create a new subscription for the subscriber */
375 * subscr_conn_msg_event - handle new subscription request from subscriber 355static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
376 * 356 void *usr_data, void *buf, size_t len)
377 * Called with subscriber's server port unlocked.
378 */
379static void subscr_conn_msg_event(void *usr_handle,
380 u32 port_ref,
381 struct sk_buff **buf,
382 const unchar *data,
383 u32 size)
384{ 357{
385 struct tipc_subscriber *subscriber = usr_handle; 358 struct tipc_subscriber *subscriber = usr_data;
386 spinlock_t *subscriber_lock;
387 struct tipc_subscription *sub; 359 struct tipc_subscription *sub;
388 360
389 /* 361 spin_lock_bh(&subscriber->lock);
390 * Lock subscriber's server port (& make a local copy of lock pointer, 362 sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
391 * in case subscriber is deleted while processing subscription request) 363 if (sub)
392 */ 364 tipc_nametbl_subscribe(sub);
393 if (tipc_port_lock(port_ref) == NULL) 365 spin_unlock_bh(&subscriber->lock);
394 return;
395
396 subscriber_lock = subscriber->lock;
397
398 if (size != sizeof(struct tipc_subscr)) {
399 subscr_terminate(subscriber);
400 spin_unlock_bh(subscriber_lock);
401 } else {
402 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
403 spin_unlock_bh(subscriber_lock);
404 if (sub != NULL) {
405
406 /*
407 * We must release the server port lock before adding a
408 * subscription to the name table since TIPC needs to be
409 * able to (re)acquire the port lock if an event message
410 * issued by the subscription process is rejected and
411 * returned. The subscription cannot be deleted while
412 * it is being added to the name table because:
413 * a) the single-threading of the native API port code
414 * ensures the subscription cannot be cancelled and
415 * the subscriber connection cannot be broken, and
416 * b) the name table lock ensures the subscription
417 * timeout code cannot delete the subscription,
418 * so the subscription object is still protected.
419 */
420 tipc_nametbl_subscribe(sub);
421 }
422 }
423} 366}
424 367
425/** 368
426 * subscr_named_msg_event - handle request to establish a new subscriber 369/* Handle one request to establish a new subscriber */
427 */ 370static void *subscr_named_msg_event(int conid)
428static void subscr_named_msg_event(void *usr_handle,
429 u32 port_ref,
430 struct sk_buff **buf,
431 const unchar *data,
432 u32 size,
433 u32 importance,
434 struct tipc_portid const *orig,
435 struct tipc_name_seq const *dest)
436{ 371{
437 struct tipc_subscriber *subscriber; 372 struct tipc_subscriber *subscriber;
438 u32 server_port_ref;
439 373
440 /* Create subscriber object */ 374 /* Create subscriber object */
441 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); 375 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
442 if (subscriber == NULL) { 376 if (subscriber == NULL) {
443 pr_warn("Subscriber rejected, no memory\n"); 377 pr_warn("Subscriber rejected, no memory\n");
444 return; 378 return NULL;
445 } 379 }
446 INIT_LIST_HEAD(&subscriber->subscription_list); 380 INIT_LIST_HEAD(&subscriber->subscription_list);
447 INIT_LIST_HEAD(&subscriber->subscriber_list); 381 subscriber->conid = conid;
448 382 spin_lock_init(&subscriber->lock);
449 /* Create server port & establish connection to subscriber */
450 tipc_createport(subscriber,
451 importance,
452 NULL,
453 NULL,
454 subscr_conn_shutdown_event,
455 NULL,
456 NULL,
457 subscr_conn_msg_event,
458 NULL,
459 &subscriber->port_ref);
460 if (subscriber->port_ref == 0) {
461 pr_warn("Subscriber rejected, unable to create port\n");
462 kfree(subscriber);
463 return;
464 }
465 tipc_connect(subscriber->port_ref, orig);
466
467 /* Lock server port (& save lock address for future use) */
468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
469
470 /* Add subscriber to topology server's subscriber list */
471 spin_lock_bh(&topsrv.lock);
472 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
473 spin_unlock_bh(&topsrv.lock);
474
475 /* Unlock server port */
476 server_port_ref = subscriber->port_ref;
477 spin_unlock_bh(subscriber->lock);
478
479 /* Send an ACK- to complete connection handshaking */
480 tipc_send(server_port_ref, 0, NULL, 0);
481 383
482 /* Handle optional subscription request */ 384 return (void *)subscriber;
483 if (size != 0) {
484 subscr_conn_msg_event(subscriber, server_port_ref,
485 buf, data, size);
486 }
487} 385}
488 386
489int tipc_subscr_start(void) 387int tipc_subscr_start(void)
490{ 388{
491 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; 389 return tipc_server_start(&topsrv);
492 int res;
493
494 spin_lock_init(&topsrv.lock);
495 INIT_LIST_HEAD(&topsrv.subscriber_list);
496
497 res = tipc_createport(NULL,
498 TIPC_CRITICAL_IMPORTANCE,
499 NULL,
500 NULL,
501 NULL,
502 NULL,
503 subscr_named_msg_event,
504 NULL,
505 NULL,
506 &topsrv.setup_port);
507 if (res)
508 goto failed;
509
510 res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
511 if (res) {
512 tipc_deleteport(topsrv.setup_port);
513 topsrv.setup_port = 0;
514 goto failed;
515 }
516
517 return 0;
518
519failed:
520 pr_err("Failed to create subscription service\n");
521 return res;
522} 390}
523 391
524void tipc_subscr_stop(void) 392void tipc_subscr_stop(void)
525{ 393{
526 struct tipc_subscriber *subscriber; 394 tipc_server_stop(&topsrv);
527 struct tipc_subscriber *subscriber_temp;
528 spinlock_t *subscriber_lock;
529
530 if (topsrv.setup_port) {
531 tipc_deleteport(topsrv.setup_port);
532 topsrv.setup_port = 0;
533
534 list_for_each_entry_safe(subscriber, subscriber_temp,
535 &topsrv.subscriber_list,
536 subscriber_list) {
537 subscriber_lock = subscriber->lock;
538 spin_lock_bh(subscriber_lock);
539 subscr_terminate(subscriber);
540 spin_unlock_bh(subscriber_lock);
541 }
542 }
543} 395}
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 218d2e07f0cc..43e6d6332a02 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -2,7 +2,7 @@
2 * net/tipc/subscr.h: Include file for TIPC network topology service 2 * net/tipc/subscr.h: Include file for TIPC network topology service
3 * 3 *
4 * Copyright (c) 2003-2006, Ericsson AB 4 * Copyright (c) 2003-2006, Ericsson AB
5 * Copyright (c) 2005-2007, Wind River Systems 5 * Copyright (c) 2005-2007, 2012-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -37,10 +37,14 @@
37#ifndef _TIPC_SUBSCR_H 37#ifndef _TIPC_SUBSCR_H
38#define _TIPC_SUBSCR_H 38#define _TIPC_SUBSCR_H
39 39
40#include "server.h"
41
40struct tipc_subscription; 42struct tipc_subscription;
43struct tipc_subscriber;
41 44
42/** 45/**
43 * struct tipc_subscription - TIPC network topology subscription object 46 * struct tipc_subscription - TIPC network topology subscription object
47 * @subscriber: pointer to its subscriber
44 * @seq: name sequence associated with subscription 48 * @seq: name sequence associated with subscription
45 * @timeout: duration of subscription (in ms) 49 * @timeout: duration of subscription (in ms)
46 * @filter: event filtering to be done for subscription 50 * @filter: event filtering to be done for subscription
@@ -52,13 +56,13 @@ struct tipc_subscription;
52 * @evt: template for events generated by subscription 56 * @evt: template for events generated by subscription
53 */ 57 */
54struct tipc_subscription { 58struct tipc_subscription {
59 struct tipc_subscriber *subscriber;
55 struct tipc_name_seq seq; 60 struct tipc_name_seq seq;
56 u32 timeout; 61 u32 timeout;
57 u32 filter; 62 u32 filter;
58 struct timer_list timer; 63 struct timer_list timer;
59 struct list_head nameseq_list; 64 struct list_head nameseq_list;
60 struct list_head subscription_list; 65 struct list_head subscription_list;
61 u32 server_ref;
62 int swap; 66 int swap;
63 struct tipc_event evt; 67 struct tipc_event evt;
64}; 68};