aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/subscr.c
diff options
context:
space:
mode:
authorAllan Stephens <allan.stephens@windriver.com>2008-05-19 16:29:47 -0400
committerDavid S. Miller <davem@davemloft.net>2008-05-19 16:29:47 -0400
commit28353e7fad1d224687220a448950dc552645a50a (patch)
tree69f60055703f0213601d3a2d38c26886def6e52e /net/tipc/subscr.c
parentfc5ad582709ce9c7b9ab7b70c1e5b5e2cfc384db (diff)
tipc: Consolidate subscriber & subscriber port references
This patch modifies TIPC's network topology service so that it only requires a single reference table entry per subscriber connection, rather than two. This is achieved by letting the reference to the server port communicating with the subscriber act as the reference to the subscriber object itself. (Since the subscriber cannot exist without its port, and vice versa, this dual role for the reference is perfectly natural.) This consolidation reduces the size of the reference table by 50% in the default configuration. Signed-off-by: Allan Stephens <allan.stephens@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/subscr.c')
-rw-r--r--net/tipc/subscr.c199
1 files changed, 113 insertions, 86 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index dde23f1e7542..7c62791eb0cc 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -38,23 +38,22 @@
38#include "dbg.h" 38#include "dbg.h"
39#include "subscr.h" 39#include "subscr.h"
40#include "name_table.h" 40#include "name_table.h"
41#include "port.h"
41#include "ref.h" 42#include "ref.h"
42 43
43/** 44/**
44 * struct subscriber - TIPC network topology subscriber 45 * struct subscriber - TIPC network topology subscriber
45 * @ref: object reference to subscriber object itself 46 * @port_ref: object reference to server port connecting to subscriber
46 * @lock: pointer to spinlock controlling access to subscriber object 47 * @lock: pointer to spinlock controlling access to subscriber's server port
47 * @subscriber_list: adjacent subscribers in top. server's list of subscribers 48 * @subscriber_list: adjacent subscribers in top. server's list of subscribers
48 * @subscription_list: list of subscription objects for this subscriber 49 * @subscription_list: list of subscription objects for this subscriber
49 * @port_ref: object reference to port used to communicate with subscriber
50 */ 50 */
51 51
52struct subscriber { 52struct subscriber {
53 u32 ref; 53 u32 port_ref;
54 spinlock_t *lock; 54 spinlock_t *lock;
55 struct list_head subscriber_list; 55 struct list_head subscriber_list;
56 struct list_head subscription_list; 56 struct list_head subscription_list;
57 u32 port_ref;
58}; 57};
59 58
60/** 59/**
@@ -91,6 +90,9 @@ static u32 htohl(u32 in, int swap)
91 90
92/** 91/**
93 * subscr_send_event - send a message containing a tipc_event to the subscriber 92 * subscr_send_event - send a message containing a tipc_event to the subscriber
93 *
94 * Note: Must not hold subscriber's server port lock, since tipc_send() will
95 * try to take the lock if the message is rejected and returned!
94 */ 96 */
95 97
96static void subscr_send_event(struct subscription *sub, 98static void subscr_send_event(struct subscription *sub,
@@ -110,7 +112,7 @@ static void subscr_send_event(struct subscription *sub,
110 sub->evt.found_upper = htohl(found_upper, sub->swap); 112 sub->evt.found_upper = htohl(found_upper, sub->swap);
111 sub->evt.port.ref = htohl(port_ref, sub->swap); 113 sub->evt.port.ref = htohl(port_ref, sub->swap);
112 sub->evt.port.node = htohl(node, sub->swap); 114 sub->evt.port.node = htohl(node, sub->swap);
113 tipc_send(sub->owner->port_ref, 1, &msg_sect); 115 tipc_send(sub->server_ref, 1, &msg_sect);
114} 116}
115 117
116/** 118/**
@@ -163,20 +165,18 @@ void tipc_subscr_report_overlap(struct subscription *sub,
163 165
164static void subscr_timeout(struct subscription *sub) 166static void subscr_timeout(struct subscription *sub)
165{ 167{
166 struct subscriber *subscriber; 168 struct port *server_port;
167 u32 subscriber_ref;
168 169
169 /* Validate subscriber reference (in case subscriber is terminating) */ 170 /* Validate server port reference (in case subscriber is terminating) */
170 171
171 subscriber_ref = sub->owner->ref; 172 server_port = tipc_port_lock(sub->server_ref);
172 subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref); 173 if (server_port == NULL)
173 if (subscriber == NULL)
174 return; 174 return;
175 175
176 /* Validate timeout (in case subscription is being cancelled) */ 176 /* Validate timeout (in case subscription is being cancelled) */
177 177
178 if (sub->timeout == TIPC_WAIT_FOREVER) { 178 if (sub->timeout == TIPC_WAIT_FOREVER) {
179 tipc_ref_unlock(subscriber_ref); 179 tipc_port_unlock(server_port);
180 return; 180 return;
181 } 181 }
182 182
@@ -184,19 +184,21 @@ static void subscr_timeout(struct subscription *sub)
184 184
185 tipc_nametbl_unsubscribe(sub); 185 tipc_nametbl_unsubscribe(sub);
186 186
187 /* Notify subscriber of timeout, then unlink subscription */ 187 /* Unlink subscription from subscriber */
188 188
189 subscr_send_event(sub,
190 sub->evt.s.seq.lower,
191 sub->evt.s.seq.upper,
192 TIPC_SUBSCR_TIMEOUT,
193 0,
194 0);
195 list_del(&sub->subscription_list); 189 list_del(&sub->subscription_list);
196 190
191 /* Release subscriber's server port */
192
193 tipc_port_unlock(server_port);
194
195 /* Notify subscriber of timeout */
196
197 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
198 TIPC_SUBSCR_TIMEOUT, 0, 0);
199
197 /* Now destroy subscription */ 200 /* Now destroy subscription */
198 201
199 tipc_ref_unlock(subscriber_ref);
200 k_term_timer(&sub->timer); 202 k_term_timer(&sub->timer);
201 kfree(sub); 203 kfree(sub);
202 atomic_dec(&topsrv.subscription_count); 204 atomic_dec(&topsrv.subscription_count);
@@ -205,7 +207,7 @@ static void subscr_timeout(struct subscription *sub)
205/** 207/**
206 * subscr_del - delete a subscription within a subscription list 208 * subscr_del - delete a subscription within a subscription list
207 * 209 *
208 * Called with subscriber locked. 210 * Called with subscriber port locked.
209 */ 211 */
210 212
211static void subscr_del(struct subscription *sub) 213static void subscr_del(struct subscription *sub)
@@ -219,7 +221,7 @@ static void subscr_del(struct subscription *sub)
219/** 221/**
220 * subscr_terminate - terminate communication with a subscriber 222 * subscr_terminate - terminate communication with a subscriber
221 * 223 *
222 * Called with subscriber locked. Routine must temporarily release this lock 224 * Called with subscriber port locked. Routine must temporarily release lock
223 * to enable subscription timeout routine(s) to finish without deadlocking; 225 * to enable subscription timeout routine(s) to finish without deadlocking;
224 * the lock is then reclaimed to allow caller to release it upon return. 226 * the lock is then reclaimed to allow caller to release it upon return.
225 * (This should work even in the unlikely event some other thread creates 227 * (This should work even in the unlikely event some other thread creates
@@ -229,14 +231,21 @@ static void subscr_del(struct subscription *sub)
229 231
230static void subscr_terminate(struct subscriber *subscriber) 232static void subscr_terminate(struct subscriber *subscriber)
231{ 233{
234 u32 port_ref;
232 struct subscription *sub; 235 struct subscription *sub;
233 struct subscription *sub_temp; 236 struct subscription *sub_temp;
234 237
235 /* Invalidate subscriber reference */ 238 /* Invalidate subscriber reference */
236 239
237 tipc_ref_discard(subscriber->ref); 240 port_ref = subscriber->port_ref;
241 subscriber->port_ref = 0;
238 spin_unlock_bh(subscriber->lock); 242 spin_unlock_bh(subscriber->lock);
239 243
244 /* Sever connection to subscriber */
245
246 tipc_shutdown(port_ref);
247 tipc_deleteport(port_ref);
248
240 /* Destroy any existing subscriptions for subscriber */ 249 /* Destroy any existing subscriptions for subscriber */
241 250
242 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 251 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
@@ -250,27 +259,25 @@ static void subscr_terminate(struct subscriber *subscriber)
250 subscr_del(sub); 259 subscr_del(sub);
251 } 260 }
252 261
253 /* Sever connection to subscriber */
254
255 tipc_shutdown(subscriber->port_ref);
256 tipc_deleteport(subscriber->port_ref);
257
258 /* Remove subscriber from topology server's subscriber list */ 262 /* Remove subscriber from topology server's subscriber list */
259 263
260 spin_lock_bh(&topsrv.lock); 264 spin_lock_bh(&topsrv.lock);
261 list_del(&subscriber->subscriber_list); 265 list_del(&subscriber->subscriber_list);
262 spin_unlock_bh(&topsrv.lock); 266 spin_unlock_bh(&topsrv.lock);
263 267
264 /* Now destroy subscriber */ 268 /* Reclaim subscriber lock */
265 269
266 spin_lock_bh(subscriber->lock); 270 spin_lock_bh(subscriber->lock);
271
272 /* Now destroy subscriber */
273
267 kfree(subscriber); 274 kfree(subscriber);
268} 275}
269 276
270/** 277/**
271 * subscr_cancel - handle subscription cancellation request 278 * subscr_cancel - handle subscription cancellation request
272 * 279 *
273 * Called with subscriber locked. Routine must temporarily release this lock 280 * Called with subscriber port locked. Routine must temporarily release lock
274 * to enable the subscription timeout routine to finish without deadlocking; 281 * to enable the subscription timeout routine to finish without deadlocking;
275 * the lock is then reclaimed to allow caller to release it upon return. 282 * the lock is then reclaimed to allow caller to release it upon return.
276 * 283 *
@@ -313,11 +320,11 @@ static void subscr_cancel(struct tipc_subscr *s,
313/** 320/**
314 * subscr_subscribe - create subscription for subscriber 321 * subscr_subscribe - create subscription for subscriber
315 * 322 *
316 * Called with subscriber locked 323 * Called with subscriber port locked.
317 */ 324 */
318 325
319static void subscr_subscribe(struct tipc_subscr *s, 326static struct subscription *subscr_subscribe(struct tipc_subscr *s,
320 struct subscriber *subscriber) 327 struct subscriber *subscriber)
321{ 328{
322 struct subscription *sub; 329 struct subscription *sub;
323 int swap; 330 int swap;
@@ -331,7 +338,7 @@ static void subscr_subscribe(struct tipc_subscr *s,
331 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { 338 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
332 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); 339 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
333 subscr_cancel(s, subscriber); 340 subscr_cancel(s, subscriber);
334 return; 341 return NULL;
335 } 342 }
336 343
337 /* Refuse subscription if global limit exceeded */ 344 /* Refuse subscription if global limit exceeded */
@@ -340,16 +347,16 @@ static void subscr_subscribe(struct tipc_subscr *s,
340 warn("Subscription rejected, subscription limit reached (%u)\n", 347 warn("Subscription rejected, subscription limit reached (%u)\n",
341 tipc_max_subscriptions); 348 tipc_max_subscriptions);
342 subscr_terminate(subscriber); 349 subscr_terminate(subscriber);
343 return; 350 return NULL;
344 } 351 }
345 352
346 /* Allocate subscription object */ 353 /* Allocate subscription object */
347 354
348 sub = kzalloc(sizeof(*sub), GFP_ATOMIC); 355 sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
349 if (!sub) { 356 if (!sub) {
350 warn("Subscription rejected, no memory\n"); 357 warn("Subscription rejected, no memory\n");
351 subscr_terminate(subscriber); 358 subscr_terminate(subscriber);
352 return; 359 return NULL;
353 } 360 }
354 361
355 /* Initialize subscription object */ 362 /* Initialize subscription object */
@@ -365,40 +372,41 @@ static void subscr_subscribe(struct tipc_subscr *s,
365 warn("Subscription rejected, illegal request\n"); 372 warn("Subscription rejected, illegal request\n");
366 kfree(sub); 373 kfree(sub);
367 subscr_terminate(subscriber); 374 subscr_terminate(subscriber);
368 return; 375 return NULL;
369 } 376 }
370 sub->event_cb = subscr_send_event; 377 sub->event_cb = subscr_send_event;
371 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
372 INIT_LIST_HEAD(&sub->subscription_list);
373 INIT_LIST_HEAD(&sub->nameseq_list); 378 INIT_LIST_HEAD(&sub->nameseq_list);
374 list_add(&sub->subscription_list, &subscriber->subscription_list); 379 list_add(&sub->subscription_list, &subscriber->subscription_list);
380 sub->server_ref = subscriber->port_ref;
375 sub->swap = swap; 381 sub->swap = swap;
382 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
376 atomic_inc(&topsrv.subscription_count); 383 atomic_inc(&topsrv.subscription_count);
377 if (sub->timeout != TIPC_WAIT_FOREVER) { 384 if (sub->timeout != TIPC_WAIT_FOREVER) {
378 k_init_timer(&sub->timer, 385 k_init_timer(&sub->timer,
379 (Handler)subscr_timeout, (unsigned long)sub); 386 (Handler)subscr_timeout, (unsigned long)sub);
380 k_start_timer(&sub->timer, sub->timeout); 387 k_start_timer(&sub->timer, sub->timeout);
381 } 388 }
382 sub->owner = subscriber; 389
383 tipc_nametbl_subscribe(sub); 390 return sub;
384} 391}
385 392
386/** 393/**
387 * subscr_conn_shutdown_event - handle termination request from subscriber 394 * subscr_conn_shutdown_event - handle termination request from subscriber
395 *
396 * Called with subscriber's server port unlocked.
388 */ 397 */
389 398
390static void subscr_conn_shutdown_event(void *usr_handle, 399static void subscr_conn_shutdown_event(void *usr_handle,
391 u32 portref, 400 u32 port_ref,
392 struct sk_buff **buf, 401 struct sk_buff **buf,
393 unsigned char const *data, 402 unsigned char const *data,
394 unsigned int size, 403 unsigned int size,
395 int reason) 404 int reason)
396{ 405{
397 struct subscriber *subscriber; 406 struct subscriber *subscriber = usr_handle;
398 spinlock_t *subscriber_lock; 407 spinlock_t *subscriber_lock;
399 408
400 subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); 409 if (tipc_port_lock(port_ref) == NULL)
401 if (subscriber == NULL)
402 return; 410 return;
403 411
404 subscriber_lock = subscriber->lock; 412 subscriber_lock = subscriber->lock;
@@ -408,6 +416,8 @@ static void subscr_conn_shutdown_event(void *usr_handle,
408 416
409/** 417/**
410 * subscr_conn_msg_event - handle new subscription request from subscriber 418 * subscr_conn_msg_event - handle new subscription request from subscriber
419 *
420 * Called with subscriber's server port unlocked.
411 */ 421 */
412 422
413static void subscr_conn_msg_event(void *usr_handle, 423static void subscr_conn_msg_event(void *usr_handle,
@@ -416,20 +426,46 @@ static void subscr_conn_msg_event(void *usr_handle,
416 const unchar *data, 426 const unchar *data,
417 u32 size) 427 u32 size)
418{ 428{
419 struct subscriber *subscriber; 429 struct subscriber *subscriber = usr_handle;
420 spinlock_t *subscriber_lock; 430 spinlock_t *subscriber_lock;
431 struct subscription *sub;
432
433 /*
434 * Lock subscriber's server port (& make a local copy of lock pointer,
435 * in case subscriber is deleted while processing subscription request)
436 */
421 437
422 subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); 438 if (tipc_port_lock(port_ref) == NULL)
423 if (subscriber == NULL)
424 return; 439 return;
425 440
426 subscriber_lock = subscriber->lock; 441 subscriber_lock = subscriber->lock;
427 if (size != sizeof(struct tipc_subscr))
428 subscr_terminate(subscriber);
429 else
430 subscr_subscribe((struct tipc_subscr *)data, subscriber);
431 442
432 spin_unlock_bh(subscriber_lock); 443 if (size != sizeof(struct tipc_subscr)) {
444 subscr_terminate(subscriber);
445 spin_unlock_bh(subscriber_lock);
446 } else {
447 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
448 spin_unlock_bh(subscriber_lock);
449 if (sub != NULL) {
450
451 /*
452 * We must release the server port lock before adding a
453 * subscription to the name table since TIPC needs to be
454 * able to (re)acquire the port lock if an event message
455 * issued by the subscription process is rejected and
456 * returned. The subscription cannot be deleted while
457 * it is being added to the name table because:
458 * a) the single-threading of the native API port code
459 * ensures the subscription cannot be cancelled and
460 * the subscriber connection cannot be broken, and
461 * b) the name table lock ensures the subscription
462 * timeout code cannot delete the subscription,
463 * so the subscription object is still protected.
464 */
465
466 tipc_nametbl_subscribe(sub);
467 }
468 }
433} 469}
434 470
435/** 471/**
@@ -445,16 +481,10 @@ static void subscr_named_msg_event(void *usr_handle,
445 struct tipc_portid const *orig, 481 struct tipc_portid const *orig,
446 struct tipc_name_seq const *dest) 482 struct tipc_name_seq const *dest)
447{ 483{
448 struct subscriber *subscriber; 484 static struct iovec msg_sect = {NULL, 0};
449 struct iovec msg_sect = {NULL, 0};
450 spinlock_t *subscriber_lock;
451 485
452 dbg("subscr_named_msg_event: orig = %x own = %x,\n", 486 struct subscriber *subscriber;
453 orig->node, tipc_own_addr); 487 u32 server_port_ref;
454 if (size && (size != sizeof(struct tipc_subscr))) {
455 warn("Subscriber rejected, invalid subscription size\n");
456 return;
457 }
458 488
459 /* Create subscriber object */ 489 /* Create subscriber object */
460 490
@@ -465,18 +495,11 @@ static void subscr_named_msg_event(void *usr_handle,
465 } 495 }
466 INIT_LIST_HEAD(&subscriber->subscription_list); 496 INIT_LIST_HEAD(&subscriber->subscription_list);
467 INIT_LIST_HEAD(&subscriber->subscriber_list); 497 INIT_LIST_HEAD(&subscriber->subscriber_list);
468 subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock);
469 if (subscriber->ref == 0) {
470 warn("Subscriber rejected, reference table exhausted\n");
471 kfree(subscriber);
472 return;
473 }
474 spin_unlock_bh(subscriber->lock);
475 498
476 /* Establish a connection to subscriber */ 499 /* Create server port & establish connection to subscriber */
477 500
478 tipc_createport(topsrv.user_ref, 501 tipc_createport(topsrv.user_ref,
479 (void *)(unsigned long)subscriber->ref, 502 subscriber,
480 importance, 503 importance,
481 NULL, 504 NULL,
482 NULL, 505 NULL,
@@ -488,32 +511,36 @@ static void subscr_named_msg_event(void *usr_handle,
488 &subscriber->port_ref); 511 &subscriber->port_ref);
489 if (subscriber->port_ref == 0) { 512 if (subscriber->port_ref == 0) {
490 warn("Subscriber rejected, unable to create port\n"); 513 warn("Subscriber rejected, unable to create port\n");
491 tipc_ref_discard(subscriber->ref);
492 kfree(subscriber); 514 kfree(subscriber);
493 return; 515 return;
494 } 516 }
495 tipc_connect2port(subscriber->port_ref, orig); 517 tipc_connect2port(subscriber->port_ref, orig);
496 518
519 /* Lock server port (& save lock address for future use) */
520
521 subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock;
497 522
498 /* Add subscriber to topology server's subscriber list */ 523 /* Add subscriber to topology server's subscriber list */
499 524
500 tipc_ref_lock(subscriber->ref);
501 spin_lock_bh(&topsrv.lock); 525 spin_lock_bh(&topsrv.lock);
502 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); 526 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
503 spin_unlock_bh(&topsrv.lock); 527 spin_unlock_bh(&topsrv.lock);
504 528
505 /* 529 /* Unlock server port */
506 * Subscribe now if message contains a subscription,
507 * otherwise send an empty response to complete connection handshaking
508 */
509 530
510 subscriber_lock = subscriber->lock; 531 server_port_ref = subscriber->port_ref;
511 if (size) 532 spin_unlock_bh(subscriber->lock);
512 subscr_subscribe((struct tipc_subscr *)data, subscriber);
513 else
514 tipc_send(subscriber->port_ref, 1, &msg_sect);
515 533
516 spin_unlock_bh(subscriber_lock); 534 /* Send an ACK- to complete connection handshaking */
535
536 tipc_send(server_port_ref, 1, &msg_sect);
537
538 /* Handle optional subscription request */
539
540 if (size != 0) {
541 subscr_conn_msg_event(subscriber, server_port_ref,
542 buf, data, size);
543 }
517} 544}
518 545
519int tipc_subscr_start(void) 546int tipc_subscr_start(void)
@@ -572,8 +599,8 @@ void tipc_subscr_stop(void)
572 list_for_each_entry_safe(subscriber, subscriber_temp, 599 list_for_each_entry_safe(subscriber, subscriber_temp,
573 &topsrv.subscriber_list, 600 &topsrv.subscriber_list,
574 subscriber_list) { 601 subscriber_list) {
575 tipc_ref_lock(subscriber->ref);
576 subscriber_lock = subscriber->lock; 602 subscriber_lock = subscriber->lock;
603 spin_lock_bh(subscriber_lock);
577 subscr_terminate(subscriber); 604 subscr_terminate(subscriber);
578 spin_unlock_bh(subscriber_lock); 605 spin_unlock_bh(subscriber_lock);
579 } 606 }