aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-01-08 15:03:29 -0500
committerDavid S. Miller <davem@davemloft.net>2018-01-09 12:35:58 -0500
commit8348500f80d5660af29c475e1f15d412d83564c9 (patch)
tree587b78587831673462e857101a1249fbf7516f15 /net/tipc
parentd12d2e12cec2d66eab6cd58f592dad9fd386b97d (diff)
tipc: add option to suppress PUBLISH events for pre-existing publications
Currently, when a user is subscribing for binding table publications, he will receive a PUBLISH event for all already existing matching items in the binding table. However, a group socket making a subscriptions doesn't need this initial status update from the binding table, because it has already scanned it during the join operation. Worse, the multiplicatory effect of issuing mutual events for dozens or hundreds group members within a short time frame put a heavy load on the topology server, with the end result that scale out operations on a big group tend to take much longer than needed. We now add a new filter option, TIPC_SUB_NO_STATUS, for topology server subscriptions, so that this initial avalanche of events is suppressed. This change, along with the previous commit, significantly improves the range and speed of group scale out operations. We keep the new option internal for the tipc driver, at least for now. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/group.c4
-rw-r--r--net/tipc/name_table.c13
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/server.c4
-rw-r--r--net/tipc/server.h3
-rw-r--r--net/tipc/subscr.c10
6 files changed, 21 insertions, 15 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 6ca07f0da60c..cf996bd6ec98 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -177,7 +177,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
177 grp->scope = mreq->scope; 177 grp->scope = mreq->scope;
178 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 178 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
179 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 179 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
180 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 180 if (tipc_topsrv_kern_subscr(net, portid, type,
181 TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS,
182 0, ~0, &grp->subid))
181 return grp; 183 return grp;
182 kfree(grp); 184 kfree(grp);
183 return NULL; 185 return NULL;
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index e04ab72f313c..60af9885f160 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -405,12 +405,13 @@ found:
405} 405}
406 406
407/** 407/**
408 * tipc_nameseq_subscribe - attach a subscription, and issue 408 * tipc_nameseq_subscribe - attach a subscription, and optionally
409 * the prescribed number of events if there is any sub- 409 * issue the prescribed number of events if there is any sub-
410 * sequence overlapping with the requested sequence 410 * sequence overlapping with the requested sequence
411 */ 411 */
412static void tipc_nameseq_subscribe(struct name_seq *nseq, 412static void tipc_nameseq_subscribe(struct name_seq *nseq,
413 struct tipc_subscription *s) 413 struct tipc_subscription *s,
414 bool status)
414{ 415{
415 struct sub_seq *sseq = nseq->sseqs; 416 struct sub_seq *sseq = nseq->sseqs;
416 struct tipc_name_seq ns; 417 struct tipc_name_seq ns;
@@ -420,7 +421,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
420 tipc_subscrp_get(s); 421 tipc_subscrp_get(s);
421 list_add(&s->nameseq_list, &nseq->subscriptions); 422 list_add(&s->nameseq_list, &nseq->subscriptions);
422 423
423 if (!sseq) 424 if (!status || !sseq)
424 return; 425 return;
425 426
426 while (sseq != &nseq->sseqs[nseq->first_free]) { 427 while (sseq != &nseq->sseqs[nseq->first_free]) {
@@ -811,7 +812,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
811/** 812/**
812 * tipc_nametbl_subscribe - add a subscription object to the name table 813 * tipc_nametbl_subscribe - add a subscription object to the name table
813 */ 814 */
814void tipc_nametbl_subscribe(struct tipc_subscription *s) 815void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
815{ 816{
816 struct tipc_net *tn = net_generic(s->net, tipc_net_id); 817 struct tipc_net *tn = net_generic(s->net, tipc_net_id);
817 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); 818 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
@@ -825,7 +826,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
825 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); 826 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
826 if (seq) { 827 if (seq) {
827 spin_lock_bh(&seq->lock); 828 spin_lock_bh(&seq->lock);
828 tipc_nameseq_subscribe(seq, s); 829 tipc_nameseq_subscribe(seq, s, status);
829 spin_unlock_bh(&seq->lock); 830 spin_unlock_bh(&seq->lock);
830 } else { 831 } else {
831 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); 832 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns);
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 71926e429446..73a148c85c15 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -121,7 +121,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
121struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 121struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
122 u32 lower, u32 node, u32 ref, 122 u32 lower, u32 node, u32 ref,
123 u32 key); 123 u32 key);
124void tipc_nametbl_subscribe(struct tipc_subscription *s); 124void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status);
125void tipc_nametbl_unsubscribe(struct tipc_subscription *s); 125void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
126int tipc_nametbl_init(struct net *net); 126int tipc_nametbl_init(struct net *net);
127void tipc_nametbl_stop(struct net *net); 127void tipc_nametbl_stop(struct net *net);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index d60c30342327..950c54cbcf3a 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -490,7 +490,7 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
490} 490}
491 491
492bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 492bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
493 u32 lower, u32 upper, int *conid) 493 u32 filter, u32 lower, u32 upper, int *conid)
494{ 494{
495 struct tipc_subscriber *scbr; 495 struct tipc_subscriber *scbr;
496 struct tipc_subscr sub; 496 struct tipc_subscr sub;
@@ -501,7 +501,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
501 sub.seq.lower = lower; 501 sub.seq.lower = lower;
502 sub.seq.upper = upper; 502 sub.seq.upper = upper;
503 sub.timeout = TIPC_WAIT_FOREVER; 503 sub.timeout = TIPC_WAIT_FOREVER;
504 sub.filter = TIPC_SUB_PORTS; 504 sub.filter = filter;
505 *(u32 *)&sub.usr_handle = port; 505 *(u32 *)&sub.usr_handle = port;
506 506
507 con = tipc_alloc_conn(tipc_topsrv(net)); 507 con = tipc_alloc_conn(tipc_topsrv(net));
diff --git a/net/tipc/server.h b/net/tipc/server.h
index 2113c9192633..ea1effbff23e 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -41,6 +41,7 @@
41#include <net/net_namespace.h> 41#include <net/net_namespace.h>
42 42
43#define TIPC_SERVER_NAME_LEN 32 43#define TIPC_SERVER_NAME_LEN 32
44#define TIPC_SUB_NO_STATUS 0x80
44 45
45/** 46/**
46 * struct tipc_server - TIPC server structure 47 * struct tipc_server - TIPC server structure
@@ -84,7 +85,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
84 struct sockaddr_tipc *addr, void *data, size_t len); 85 struct sockaddr_tipc *addr, void *data, size_t len);
85 86
86bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 87bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
87 u32 lower, u32 upper, int *conid); 88 u32 filter, u32 lower, u32 upper, int *conid);
88void tipc_topsrv_kern_unsubscr(struct net *net, int conid); 89void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
89 90
90/** 91/**
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 251065dfd8df..1052341a0ea9 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -286,7 +286,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
286} 286}
287 287
288static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, 288static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
289 struct tipc_subscriber *subscriber, int swap) 289 struct tipc_subscriber *subscriber, int swap,
290 bool status)
290{ 291{
291 struct tipc_net *tn = net_generic(net, tipc_net_id); 292 struct tipc_net *tn = net_generic(net, tipc_net_id);
292 struct tipc_subscription *sub = NULL; 293 struct tipc_subscription *sub = NULL;
@@ -299,7 +300,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
299 spin_lock_bh(&subscriber->lock); 300 spin_lock_bh(&subscriber->lock);
300 list_add(&sub->subscrp_list, &subscriber->subscrp_list); 301 list_add(&sub->subscrp_list, &subscriber->subscrp_list);
301 sub->subscriber = subscriber; 302 sub->subscriber = subscriber;
302 tipc_nametbl_subscribe(sub); 303 tipc_nametbl_subscribe(sub, status);
303 tipc_subscrb_get(subscriber); 304 tipc_subscrb_get(subscriber);
304 spin_unlock_bh(&subscriber->lock); 305 spin_unlock_bh(&subscriber->lock);
305 306
@@ -323,6 +324,7 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
323{ 324{
324 struct tipc_subscriber *subscriber = usr_data; 325 struct tipc_subscriber *subscriber = usr_data;
325 struct tipc_subscr *s = (struct tipc_subscr *)buf; 326 struct tipc_subscr *s = (struct tipc_subscr *)buf;
327 bool status;
326 int swap; 328 int swap;
327 329
328 /* Determine subscriber's endianness */ 330 /* Determine subscriber's endianness */
@@ -334,8 +336,8 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
334 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); 336 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
335 return tipc_subscrp_cancel(s, subscriber); 337 return tipc_subscrp_cancel(s, subscriber);
336 } 338 }
337 339 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
338 tipc_subscrp_subscribe(net, s, subscriber, swap); 340 tipc_subscrp_subscribe(net, s, subscriber, swap, status);
339} 341}
340 342
341/* Handle one request to establish a new subscriber */ 343/* Handle one request to establish a new subscriber */