aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-03-29 17:20:41 -0400
committerDavid S. Miller <davem@davemloft.net>2018-03-31 22:19:52 -0400
commit218527fe27adaebeb81eb770459eb335517e90ee (patch)
tree42001e81e0214aedd8d42ebf810f4d7542ce6a8a
parent24197ee2102359b59044234347dd3504302fa97d (diff)
tipc: replace name table service range array with rb tree
The current design of the binding table has an unnecessary memory consuming and complex data structure. It aggregates the service range items into an array, which is expanded by a factor two every time it becomes too small to hold a new item. Furthermore, the arrays never shrink when the number of ranges diminishes. We now replace this array with an RB tree that is holding the range items as tree nodes, each range directly holding a list of bindings. This, along with a few name changes, improves both readability and volume of the code, as well as reducing memory consumption and hopefully improving cache hit rate. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/core.h1
-rw-r--r--net/tipc/link.c2
-rw-r--r--net/tipc/name_table.c1032
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/node.c4
-rw-r--r--net/tipc/subscr.h4
6 files changed, 477 insertions, 568 deletions
diff --git a/net/tipc/core.h b/net/tipc/core.h
index d0f64ca62d02..8020a6c360ff 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -58,6 +58,7 @@
58#include <linux/etherdevice.h> 58#include <linux/etherdevice.h>
59#include <net/netns/generic.h> 59#include <net/netns/generic.h>
60#include <linux/rhashtable.h> 60#include <linux/rhashtable.h>
61#include <net/genetlink.h>
61 62
62struct tipc_node; 63struct tipc_node;
63struct tipc_bearer; 64struct tipc_bearer;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1289b4ba404f..8f2a9496439b 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1810,7 +1810,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1810 1810
1811void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1811void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1812{ 1812{
1813 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1813 int max_bulk = TIPC_MAX_PUBL / (l->mtu / ITEM_SIZE);
1814 1814
1815 l->window = win; 1815 l->window = win;
1816 l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win); 1816 l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win);
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 4359605b1bec..e06c7a8907aa 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -44,52 +44,40 @@
44#include "addr.h" 44#include "addr.h"
45#include "node.h" 45#include "node.h"
46#include "group.h" 46#include "group.h"
47#include <net/genetlink.h>
48
49#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
50 47
51/** 48/**
52 * struct name_info - name sequence publication info 49 * struct service_range - container for all bindings of a service range
53 * @node_list: list of publications on own node of this <type,lower,upper> 50 * @lower: service range lower bound
54 * @all_publ: list of all publications of this <type,lower,upper> 51 * @upper: service range upper bound
52 * @tree_node: member of service range RB tree
53 * @local_publ: list of identical publications made from this node
54 * Used by closest_first lookup and multicast lookup algorithm
55 * @all_publ: all publications identical to this one, whatever node and scope
56 * Used by round-robin lookup algorithm
55 */ 57 */
56struct name_info { 58struct service_range {
57 struct list_head local_publ;
58 struct list_head all_publ;
59};
60
61/**
62 * struct sub_seq - container for all published instances of a name sequence
63 * @lower: name sequence lower bound
64 * @upper: name sequence upper bound
65 * @info: pointer to name sequence publication info
66 */
67struct sub_seq {
68 u32 lower; 59 u32 lower;
69 u32 upper; 60 u32 upper;
70 struct name_info *info; 61 struct rb_node tree_node;
62 struct list_head local_publ;
63 struct list_head all_publ;
71}; 64};
72 65
73/** 66/**
74 * struct name_seq - container for all published instances of a name type 67 * struct tipc_service - container for all published instances of a service type
75 * @type: 32 bit 'type' value for name sequence 68 * @type: 32 bit 'type' value for service
76 * @sseq: pointer to dynamically-sized array of sub-sequences of this 'type'; 69 * @ranges: rb tree containing all service ranges for this service
77 * sub-sequences are sorted in ascending order 70 * @service_list: links to adjacent name ranges in hash chain
78 * @alloc: number of sub-sequences currently in array 71 * @subscriptions: list of subscriptions for this service type
79 * @first_free: array index of first unused sub-sequence entry 72 * @lock: spinlock controlling access to pertaining service ranges/publications
80 * @ns_list: links to adjacent name sequences in hash chain
81 * @subscriptions: list of subscriptions for this 'type'
82 * @lock: spinlock controlling access to publication lists of all sub-sequences
83 * @rcu: RCU callback head used for deferred freeing 73 * @rcu: RCU callback head used for deferred freeing
84 */ 74 */
85struct name_seq { 75struct tipc_service {
86 u32 type; 76 u32 type;
87 struct sub_seq *sseqs; 77 struct rb_root ranges;
88 u32 alloc; 78 struct hlist_node service_list;
89 u32 first_free;
90 struct hlist_node ns_list;
91 struct list_head subscriptions; 79 struct list_head subscriptions;
92 spinlock_t lock; 80 spinlock_t lock; /* Covers service range list */
93 struct rcu_head rcu; 81 struct rcu_head rcu;
94}; 82};
95 83
@@ -99,17 +87,16 @@ static int hash(int x)
99} 87}
100 88
101/** 89/**
102 * publ_create - create a publication structure 90 * tipc_publ_create - create a publication structure
103 */ 91 */
104static struct publication *publ_create(u32 type, u32 lower, u32 upper, 92static struct publication *tipc_publ_create(u32 type, u32 lower, u32 upper,
105 u32 scope, u32 node, u32 port, 93 u32 scope, u32 node, u32 port,
106 u32 key) 94 u32 key)
107{ 95{
108 struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC); 96 struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC);
109 if (publ == NULL) { 97
110 pr_warn("Publication creation failure, no memory\n"); 98 if (!publ)
111 return NULL; 99 return NULL;
112 }
113 100
114 publ->type = type; 101 publ->type = type;
115 publ->lower = lower; 102 publ->lower = lower;
@@ -119,372 +106,298 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
119 publ->port = port; 106 publ->port = port;
120 publ->key = key; 107 publ->key = key;
121 INIT_LIST_HEAD(&publ->binding_sock); 108 INIT_LIST_HEAD(&publ->binding_sock);
109 INIT_LIST_HEAD(&publ->binding_node);
110 INIT_LIST_HEAD(&publ->local_publ);
111 INIT_LIST_HEAD(&publ->all_publ);
122 return publ; 112 return publ;
123} 113}
124 114
125/** 115/**
126 * tipc_subseq_alloc - allocate a specified number of sub-sequence structures 116 * tipc_service_create - create a service structure for the specified 'type'
127 */
128static struct sub_seq *tipc_subseq_alloc(u32 cnt)
129{
130 return kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC);
131}
132
133/**
134 * tipc_nameseq_create - create a name sequence structure for the specified 'type'
135 * 117 *
136 * Allocates a single sub-sequence structure and sets it to all 0's. 118 * Allocates a single range structure and sets it to all 0's.
137 */ 119 */
138static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head) 120static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd)
139{ 121{
140 struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC); 122 struct tipc_service *service = kzalloc(sizeof(*service), GFP_ATOMIC);
141 struct sub_seq *sseq = tipc_subseq_alloc(1);
142 123
143 if (!nseq || !sseq) { 124 if (!service) {
144 pr_warn("Name sequence creation failed, no memory\n"); 125 pr_warn("Service creation failed, no memory\n");
145 kfree(nseq);
146 kfree(sseq);
147 return NULL; 126 return NULL;
148 } 127 }
149 128
150 spin_lock_init(&nseq->lock); 129 spin_lock_init(&service->lock);
151 nseq->type = type; 130 service->type = type;
152 nseq->sseqs = sseq; 131 service->ranges = RB_ROOT;
153 nseq->alloc = 1; 132 INIT_HLIST_NODE(&service->service_list);
154 INIT_HLIST_NODE(&nseq->ns_list); 133 INIT_LIST_HEAD(&service->subscriptions);
155 INIT_LIST_HEAD(&nseq->subscriptions); 134 hlist_add_head_rcu(&service->service_list, hd);
156 hlist_add_head_rcu(&nseq->ns_list, seq_head); 135 return service;
157 return nseq;
158} 136}
159 137
160/** 138/**
161 * nameseq_find_subseq - find sub-sequence (if any) matching a name instance 139 * tipc_service_find_range - find service range matching a service instance
162 * 140 *
163 * Very time-critical, so binary searches through sub-sequence array. 141 * Very time-critical, so binary search through range rb tree
164 */ 142 */
165static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq, 143static struct service_range *tipc_service_find_range(struct tipc_service *sc,
166 u32 instance) 144 u32 instance)
167{ 145{
168 struct sub_seq *sseqs = nseq->sseqs; 146 struct rb_node *n = sc->ranges.rb_node;
169 int low = 0; 147 struct service_range *sr;
170 int high = nseq->first_free - 1; 148
171 int mid; 149 while (n) {
172 150 sr = container_of(n, struct service_range, tree_node);
173 while (low <= high) { 151 if (sr->lower > instance)
174 mid = (low + high) / 2; 152 n = n->rb_left;
175 if (instance < sseqs[mid].lower) 153 else if (sr->upper < instance)
176 high = mid - 1; 154 n = n->rb_right;
177 else if (instance > sseqs[mid].upper)
178 low = mid + 1;
179 else 155 else
180 return &sseqs[mid]; 156 return sr;
181 } 157 }
182 return NULL; 158 return NULL;
183} 159}
184 160
185/** 161static struct service_range *tipc_service_create_range(struct tipc_service *sc,
186 * nameseq_locate_subseq - determine position of name instance in sub-sequence 162 u32 lower, u32 upper)
187 *
188 * Returns index in sub-sequence array of the entry that contains the specified
189 * instance value; if no entry contains that value, returns the position
190 * where a new entry for it would be inserted in the array.
191 *
192 * Note: Similar to binary search code for locating a sub-sequence.
193 */
194static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
195{ 163{
196 struct sub_seq *sseqs = nseq->sseqs; 164 struct rb_node **n, *parent = NULL;
197 int low = 0; 165 struct service_range *sr, *tmp;
198 int high = nseq->first_free - 1; 166
199 int mid; 167 n = &sc->ranges.rb_node;
200 168 while (*n) {
201 while (low <= high) { 169 tmp = container_of(*n, struct service_range, tree_node);
202 mid = (low + high) / 2; 170 parent = *n;
203 if (instance < sseqs[mid].lower) 171 tmp = container_of(parent, struct service_range, tree_node);
204 high = mid - 1; 172 if (lower < tmp->lower)
205 else if (instance > sseqs[mid].upper) 173 n = &(*n)->rb_left;
206 low = mid + 1; 174 else if (upper > tmp->upper)
175 n = &(*n)->rb_right;
207 else 176 else
208 return mid; 177 return NULL;
209 } 178 }
210 return low; 179 sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
180 if (!sr)
181 return NULL;
182 sr->lower = lower;
183 sr->upper = upper;
184 INIT_LIST_HEAD(&sr->local_publ);
185 INIT_LIST_HEAD(&sr->all_publ);
186 rb_link_node(&sr->tree_node, parent, n);
187 rb_insert_color(&sr->tree_node, &sc->ranges);
188 return sr;
211} 189}
212 190
213/** 191static struct publication *tipc_service_insert_publ(struct net *net,
214 * tipc_nameseq_insert_publ 192 struct tipc_service *sc,
215 */
216static struct publication *tipc_nameseq_insert_publ(struct net *net,
217 struct name_seq *nseq,
218 u32 type, u32 lower, 193 u32 type, u32 lower,
219 u32 upper, u32 scope, 194 u32 upper, u32 scope,
220 u32 node, u32 port, u32 key) 195 u32 node, u32 port,
196 u32 key)
221{ 197{
222 struct tipc_subscription *s; 198 struct tipc_subscription *sub, *tmp;
223 struct tipc_subscription *st; 199 struct service_range *sr;
224 struct publication *publ; 200 struct publication *p;
225 struct sub_seq *sseq; 201 bool first = false;
226 struct name_info *info; 202
227 int created_subseq = 0; 203 sr = tipc_service_find_range(sc, lower);
228 204 if (!sr) {
229 sseq = nameseq_find_subseq(nseq, lower); 205 sr = tipc_service_create_range(sc, lower, upper);
230 if (sseq) { 206 if (!sr)
231 207 goto err;
232 /* Lower end overlaps existing entry => need an exact match */ 208 first = true;
233 if ((sseq->lower != lower) || (sseq->upper != upper)) {
234 return NULL;
235 }
236
237 info = sseq->info;
238
239 /* Check if an identical publication already exists */
240 list_for_each_entry(publ, &info->all_publ, all_publ) {
241 if (publ->port == port && publ->key == key &&
242 (!publ->node || publ->node == node))
243 return NULL;
244 }
245 } else {
246 u32 inspos;
247 struct sub_seq *freesseq;
248
249 /* Find where lower end should be inserted */
250 inspos = nameseq_locate_subseq(nseq, lower);
251
252 /* Fail if upper end overlaps into an existing entry */
253 if ((inspos < nseq->first_free) &&
254 (upper >= nseq->sseqs[inspos].lower)) {
255 return NULL;
256 }
257
258 /* Ensure there is space for new sub-sequence */
259 if (nseq->first_free == nseq->alloc) {
260 struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2);
261
262 if (!sseqs) {
263 pr_warn("Cannot publish {%u,%u,%u}, no memory\n",
264 type, lower, upper);
265 return NULL;
266 }
267 memcpy(sseqs, nseq->sseqs,
268 nseq->alloc * sizeof(struct sub_seq));
269 kfree(nseq->sseqs);
270 nseq->sseqs = sseqs;
271 nseq->alloc *= 2;
272 }
273
274 info = kzalloc(sizeof(*info), GFP_ATOMIC);
275 if (!info) {
276 pr_warn("Cannot publish {%u,%u,%u}, no memory\n",
277 type, lower, upper);
278 return NULL;
279 }
280
281 INIT_LIST_HEAD(&info->local_publ);
282 INIT_LIST_HEAD(&info->all_publ);
283
284 /* Insert new sub-sequence */
285 sseq = &nseq->sseqs[inspos];
286 freesseq = &nseq->sseqs[nseq->first_free];
287 memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof(*sseq));
288 memset(sseq, 0, sizeof(*sseq));
289 nseq->first_free++;
290 sseq->lower = lower;
291 sseq->upper = upper;
292 sseq->info = info;
293 created_subseq = 1;
294 } 209 }
295 210
296 /* Insert a publication */ 211 /* Lower end overlaps existing entry, but we need an exact match */
297 publ = publ_create(type, lower, upper, scope, node, port, key); 212 if (sr->lower != lower || sr->upper != upper)
298 if (!publ)
299 return NULL; 213 return NULL;
300 214
301 list_add(&publ->all_publ, &info->all_publ); 215 /* Return if the publication already exists */
216 list_for_each_entry(p, &sr->all_publ, all_publ) {
217 if (p->key == key && (!p->node || p->node == node))
218 return NULL;
219 }
302 220
221 /* Create and insert publication */
222 p = tipc_publ_create(type, lower, upper, scope, node, port, key);
223 if (!p)
224 goto err;
303 if (in_own_node(net, node)) 225 if (in_own_node(net, node))
304 list_add(&publ->local_publ, &info->local_publ); 226 list_add(&p->local_publ, &sr->local_publ);
227 list_add(&p->all_publ, &sr->all_publ);
305 228
306 /* Any subscriptions waiting for notification? */ 229 /* Any subscriptions waiting for notification? */
307 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 230 list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
308 tipc_sub_report_overlap(s, publ->lower, publ->upper, 231 tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_PUBLISHED,
309 TIPC_PUBLISHED, publ->port, 232 p->port, p->node, p->scope, first);
310 publ->node, publ->scope,
311 created_subseq);
312 } 233 }
313 return publ; 234 return p;
235err:
236 pr_warn("Failed to bind to %u,%u,%u, no memory\n", type, lower, upper);
237 return NULL;
314} 238}
315 239
316/** 240/**
317 * tipc_nameseq_remove_publ 241 * tipc_service_remove_publ - remove a publication from a service
318 * 242 *
319 * NOTE: There may be cases where TIPC is asked to remove a publication 243 * NOTE: There may be cases where TIPC is asked to remove a publication
320 * that is not in the name table. For example, if another node issues a 244 * that is not in the name table. For example, if another node issues a
321 * publication for a name sequence that overlaps an existing name sequence 245 * publication for a name range that overlaps an existing name range
322 * the publication will not be recorded, which means the publication won't 246 * the publication will not be recorded, which means the publication won't
323 * be found when the name sequence is later withdrawn by that node. 247 * be found when the name range is later withdrawn by that node.
324 * A failed withdraw request simply returns a failure indication and lets the 248 * A failed withdraw request simply returns a failure indication and lets the
325 * caller issue any error or warning messages associated with such a problem. 249 * caller issue any error or warning messages associated with such a problem.
326 */ 250 */
327static struct publication *tipc_nameseq_remove_publ(struct net *net, 251static struct publication *tipc_service_remove_publ(struct net *net,
328 struct name_seq *nseq, 252 struct tipc_service *sc,
329 u32 inst, u32 node, 253 u32 inst, u32 node,
330 u32 port, u32 key) 254 u32 port, u32 key)
331{ 255{
332 struct publication *publ; 256 struct tipc_subscription *sub, *tmp;
333 struct sub_seq *sseq = nameseq_find_subseq(nseq, inst); 257 struct service_range *sr;
334 struct name_info *info; 258 struct publication *p;
335 struct sub_seq *free; 259 bool found = false;
336 struct tipc_subscription *s, *st; 260 bool last = false;
337 int removed_subseq = 0;
338
339 if (!sseq)
340 return NULL;
341 261
342 info = sseq->info; 262 sr = tipc_service_find_range(sc, inst);
263 if (!sr)
264 return NULL;
343 265
344 /* Locate publication, if it exists */ 266 /* Find publication, if it exists */
345 list_for_each_entry(publ, &info->all_publ, all_publ) { 267 list_for_each_entry(p, &sr->all_publ, all_publ) {
346 if (publ->key == key && publ->port == port && 268 if (p->key != key || (node && node != p->node))
347 (!publ->node || publ->node == node)) 269 continue;
348 goto found; 270 found = true;
271 break;
349 } 272 }
350 return NULL; 273 if (!found)
274 return NULL;
351 275
352found: 276 list_del(&p->all_publ);
353 list_del(&publ->all_publ); 277 list_del(&p->local_publ);
354 if (in_own_node(net, node)) 278
355 list_del(&publ->local_publ); 279 /* Remove service range item if this was its last publication */
356 280 if (list_empty(&sr->all_publ)) {
357 /* Contract subseq list if no more publications for that subseq */ 281 last = true;
358 if (list_empty(&info->all_publ)) { 282 rb_erase(&sr->tree_node, &sc->ranges);
359 kfree(info); 283 kfree(sr);
360 free = &nseq->sseqs[nseq->first_free--];
361 memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof(*sseq));
362 removed_subseq = 1;
363 } 284 }
364 285
365 /* Notify any waiting subscriptions */ 286 /* Notify any waiting subscriptions */
366 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 287 list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
367 tipc_sub_report_overlap(s, publ->lower, publ->upper, 288 tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_WITHDRAWN,
368 TIPC_WITHDRAWN, publ->port, 289 p->port, p->node, p->scope, last);
369 publ->node, publ->scope,
370 removed_subseq);
371 } 290 }
372 291 return p;
373 return publ;
374} 292}
375 293
376/** 294/**
377 * tipc_nameseq_subscribe - attach a subscription, and optionally 295 * tipc_service_subscribe - attach a subscription, and optionally
378 * issue the prescribed number of events if there is any sub- 296 * issue the prescribed number of events if there is any service
379 * sequence overlapping with the requested sequence 297 * range overlapping with the requested range
380 */ 298 */
381static void tipc_nameseq_subscribe(struct name_seq *nseq, 299static void tipc_service_subscribe(struct tipc_service *service,
382 struct tipc_subscription *sub) 300 struct tipc_subscription *sub)
383{ 301{
384 struct sub_seq *sseq = nseq->sseqs; 302 struct tipc_subscr *sb = &sub->evt.s;
303 struct service_range *sr;
385 struct tipc_name_seq ns; 304 struct tipc_name_seq ns;
386 struct tipc_subscr *s = &sub->evt.s; 305 struct publication *p;
387 bool no_status; 306 struct rb_node *n;
307 bool first;
388 308
389 ns.type = tipc_sub_read(s, seq.type); 309 ns.type = tipc_sub_read(sb, seq.type);
390 ns.lower = tipc_sub_read(s, seq.lower); 310 ns.lower = tipc_sub_read(sb, seq.lower);
391 ns.upper = tipc_sub_read(s, seq.upper); 311 ns.upper = tipc_sub_read(sb, seq.upper);
392 no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS;
393 312
394 tipc_sub_get(sub); 313 tipc_sub_get(sub);
395 list_add(&sub->nameseq_list, &nseq->subscriptions); 314 list_add(&sub->service_list, &service->subscriptions);
396 315
397 if (no_status || !sseq) 316 if (tipc_sub_read(sb, filter) & TIPC_SUB_NO_STATUS)
398 return; 317 return;
399 318
400 while (sseq != &nseq->sseqs[nseq->first_free]) { 319 for (n = rb_first(&service->ranges); n; n = rb_next(n)) {
401 if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) { 320 sr = container_of(n, struct service_range, tree_node);
402 struct publication *crs; 321 if (sr->lower > ns.upper)
403 struct name_info *info = sseq->info; 322 break;
404 int must_report = 1; 323 if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper))
405 324 continue;
406 list_for_each_entry(crs, &info->all_publ, all_publ) { 325 first = true;
407 tipc_sub_report_overlap(sub, sseq->lower, 326
408 sseq->upper, 327 list_for_each_entry(p, &sr->all_publ, all_publ) {
409 TIPC_PUBLISHED, 328 tipc_sub_report_overlap(sub, sr->lower, sr->upper,
410 crs->port, 329 TIPC_PUBLISHED, p->port,
411 crs->node, 330 p->node, p->scope, first);
412 crs->scope, 331 first = false;
413 must_report);
414 must_report = 0;
415 }
416 } 332 }
417 sseq++;
418 } 333 }
419} 334}
420 335
421static struct name_seq *nametbl_find_seq(struct net *net, u32 type) 336static struct tipc_service *tipc_service_find(struct net *net, u32 type)
422{ 337{
423 struct tipc_net *tn = net_generic(net, tipc_net_id); 338 struct name_table *nt = tipc_name_table(net);
424 struct hlist_head *seq_head; 339 struct hlist_head *service_head;
425 struct name_seq *ns; 340 struct tipc_service *service;
426 341
427 seq_head = &tn->nametbl->seq_hlist[hash(type)]; 342 service_head = &nt->services[hash(type)];
428 hlist_for_each_entry_rcu(ns, seq_head, ns_list) { 343 hlist_for_each_entry_rcu(service, service_head, service_list) {
429 if (ns->type == type) 344 if (service->type == type)
430 return ns; 345 return service;
431 } 346 }
432
433 return NULL; 347 return NULL;
434}; 348};
435 349
436struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, 350struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
437 u32 lower, u32 upper, u32 scope, 351 u32 lower, u32 upper,
438 u32 node, u32 port, u32 key) 352 u32 scope, u32 node,
353 u32 port, u32 key)
439{ 354{
440 struct tipc_net *tn = net_generic(net, tipc_net_id); 355 struct name_table *nt = tipc_name_table(net);
441 struct publication *publ; 356 struct tipc_service *sc;
442 struct name_seq *seq = nametbl_find_seq(net, type); 357 struct publication *p;
443 int index = hash(type);
444 358
445 if (scope > TIPC_NODE_SCOPE || lower > upper) { 359 if (scope > TIPC_NODE_SCOPE || lower > upper) {
446 pr_debug("Failed to publish illegal {%u,%u,%u} with scope %u\n", 360 pr_debug("Failed to bind illegal {%u,%u,%u} with scope %u\n",
447 type, lower, upper, scope); 361 type, lower, upper, scope);
448 return NULL; 362 return NULL;
449 } 363 }
450 364 sc = tipc_service_find(net, type);
451 if (!seq) 365 if (!sc)
452 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); 366 sc = tipc_service_create(type, &nt->services[hash(type)]);
453 if (!seq) 367 if (!sc)
454 return NULL; 368 return NULL;
455 369
456 spin_lock_bh(&seq->lock); 370 spin_lock_bh(&sc->lock);
457 publ = tipc_nameseq_insert_publ(net, seq, type, lower, upper, 371 p = tipc_service_insert_publ(net, sc, type, lower, upper,
458 scope, node, port, key); 372 scope, node, port, key);
459 spin_unlock_bh(&seq->lock); 373 spin_unlock_bh(&sc->lock);
460 return publ; 374 return p;
461} 375}
462 376
463struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 377struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
464 u32 lower, u32 node, u32 port, 378 u32 lower, u32 node, u32 port,
465 u32 key) 379 u32 key)
466{ 380{
467 struct publication *publ; 381 struct tipc_service *sc = tipc_service_find(net, type);
468 struct name_seq *seq = nametbl_find_seq(net, type); 382 struct publication *p = NULL;
469 383
470 if (!seq) 384 if (!sc)
471 return NULL; 385 return NULL;
472 386
473 spin_lock_bh(&seq->lock); 387 spin_lock_bh(&sc->lock);
474 publ = tipc_nameseq_remove_publ(net, seq, lower, node, port, key); 388 p = tipc_service_remove_publ(net, sc, lower, node, port, key);
475 if (!seq->first_free && list_empty(&seq->subscriptions)) { 389
476 hlist_del_init_rcu(&seq->ns_list); 390 /* Delete service item if this no more publications and subscriptions */
477 kfree(seq->sseqs); 391 if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
478 spin_unlock_bh(&seq->lock); 392 hlist_del_init_rcu(&sc->service_list);
479 kfree_rcu(seq, rcu); 393 kfree_rcu(sc, rcu);
480 return publ;
481 } 394 }
482 spin_unlock_bh(&seq->lock); 395 spin_unlock_bh(&sc->lock);
483 return publ; 396 return p;
484} 397}
485 398
486/** 399/**
487 * tipc_nametbl_translate - perform name translation 400 * tipc_nametbl_translate - perform service instance to socket translation
488 * 401 *
489 * On entry, 'destnode' is the search domain used during translation. 402 * On entry, 'destnode' is the search domain used during translation.
490 * 403 *
@@ -492,7 +405,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
492 * - if name translation is deferred to another node/cluster/zone, 405 * - if name translation is deferred to another node/cluster/zone,
493 * leaves 'destnode' unchanged (will be non-zero) and returns 0 406 * leaves 'destnode' unchanged (will be non-zero) and returns 0
494 * - if name translation is attempted and succeeds, sets 'destnode' 407 * - if name translation is attempted and succeeds, sets 'destnode'
495 * to publishing node and returns port reference (will be non-zero) 408 * to publication node and returns port reference (will be non-zero)
496 * - if name translation is attempted and fails, sets 'destnode' to 0 409 * - if name translation is attempted and fails, sets 'destnode' to 0
497 * and returns 0 410 * and returns 0
498 */ 411 */
@@ -502,10 +415,9 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
502 struct tipc_net *tn = tipc_net(net); 415 struct tipc_net *tn = tipc_net(net);
503 bool legacy = tn->legacy_addr_format; 416 bool legacy = tn->legacy_addr_format;
504 u32 self = tipc_own_addr(net); 417 u32 self = tipc_own_addr(net);
505 struct sub_seq *sseq; 418 struct service_range *sr;
506 struct name_info *info; 419 struct tipc_service *sc;
507 struct publication *publ; 420 struct publication *p;
508 struct name_seq *seq;
509 u32 port = 0; 421 u32 port = 0;
510 u32 node = 0; 422 u32 node = 0;
511 423
@@ -513,49 +425,49 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
513 return 0; 425 return 0;
514 426
515 rcu_read_lock(); 427 rcu_read_lock();
516 seq = nametbl_find_seq(net, type); 428 sc = tipc_service_find(net, type);
517 if (unlikely(!seq)) 429 if (unlikely(!sc))
518 goto not_found; 430 goto not_found;
519 spin_lock_bh(&seq->lock); 431
520 sseq = nameseq_find_subseq(seq, instance); 432 spin_lock_bh(&sc->lock);
521 if (unlikely(!sseq)) 433 sr = tipc_service_find_range(sc, instance);
434 if (unlikely(!sr))
522 goto no_match; 435 goto no_match;
523 info = sseq->info;
524 436
525 /* Closest-First Algorithm */ 437 /* Closest-First Algorithm */
526 if (legacy && !*destnode) { 438 if (legacy && !*destnode) {
527 if (!list_empty(&info->local_publ)) { 439 if (!list_empty(&sr->local_publ)) {
528 publ = list_first_entry(&info->local_publ, 440 p = list_first_entry(&sr->local_publ,
529 struct publication, 441 struct publication,
530 local_publ); 442 local_publ);
531 list_move_tail(&publ->local_publ, 443 list_move_tail(&p->local_publ,
532 &info->local_publ); 444 &sr->local_publ);
533 } else { 445 } else {
534 publ = list_first_entry(&info->all_publ, 446 p = list_first_entry(&sr->all_publ,
535 struct publication, 447 struct publication,
536 all_publ); 448 all_publ);
537 list_move_tail(&publ->all_publ, 449 list_move_tail(&p->all_publ,
538 &info->all_publ); 450 &sr->all_publ);
539 } 451 }
540 } 452 }
541 453
542 /* Round-Robin Algorithm */ 454 /* Round-Robin Algorithm */
543 else if (*destnode == tipc_own_addr(net)) { 455 else if (*destnode == self) {
544 if (list_empty(&info->local_publ)) 456 if (list_empty(&sr->local_publ))
545 goto no_match; 457 goto no_match;
546 publ = list_first_entry(&info->local_publ, struct publication, 458 p = list_first_entry(&sr->local_publ, struct publication,
547 local_publ); 459 local_publ);
548 list_move_tail(&publ->local_publ, &info->local_publ); 460 list_move_tail(&p->local_publ, &sr->local_publ);
549 } else { 461 } else {
550 publ = list_first_entry(&info->all_publ, struct publication, 462 p = list_first_entry(&sr->all_publ, struct publication,
551 all_publ); 463 all_publ);
552 list_move_tail(&publ->all_publ, &info->all_publ); 464 list_move_tail(&p->all_publ, &sr->all_publ);
553 } 465 }
554 466
555 port = publ->port; 467 port = p->port;
556 node = publ->node; 468 node = p->node;
557no_match: 469no_match:
558 spin_unlock_bh(&seq->lock); 470 spin_unlock_bh(&sc->lock);
559not_found: 471not_found:
560 rcu_read_unlock(); 472 rcu_read_unlock();
561 *destnode = node; 473 *destnode = node;
@@ -567,34 +479,36 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
567 bool all) 479 bool all)
568{ 480{
569 u32 self = tipc_own_addr(net); 481 u32 self = tipc_own_addr(net);
570 struct publication *publ; 482 struct service_range *sr;
571 struct name_info *info; 483 struct tipc_service *sc;
572 struct name_seq *seq; 484 struct publication *p;
573 struct sub_seq *sseq;
574 485
575 *dstcnt = 0; 486 *dstcnt = 0;
576 rcu_read_lock(); 487 rcu_read_lock();
577 seq = nametbl_find_seq(net, type); 488 sc = tipc_service_find(net, type);
578 if (unlikely(!seq)) 489 if (unlikely(!sc))
579 goto exit; 490 goto exit;
580 spin_lock_bh(&seq->lock); 491
581 sseq = nameseq_find_subseq(seq, instance); 492 spin_lock_bh(&sc->lock);
582 if (likely(sseq)) { 493
583 info = sseq->info; 494 sr = tipc_service_find_range(sc, instance);
584 list_for_each_entry(publ, &info->all_publ, all_publ) { 495 if (!sr)
585 if (publ->scope != scope) 496 goto no_match;
586 continue; 497
587 if (publ->port == exclude && publ->node == self) 498 list_for_each_entry(p, &sr->all_publ, all_publ) {
588 continue; 499 if (p->scope != scope)
589 tipc_dest_push(dsts, publ->node, publ->port); 500 continue;
590 (*dstcnt)++; 501 if (p->port == exclude && p->node == self)
591 if (all) 502 continue;
592 continue; 503 tipc_dest_push(dsts, p->node, p->port);
593 list_move_tail(&publ->all_publ, &info->all_publ); 504 (*dstcnt)++;
594 break; 505 if (all)
595 } 506 continue;
507 list_move_tail(&p->all_publ, &sr->all_publ);
508 break;
596 } 509 }
597 spin_unlock_bh(&seq->lock); 510no_match:
511 spin_unlock_bh(&sc->lock);
598exit: 512exit:
599 rcu_read_unlock(); 513 rcu_read_unlock();
600 return !list_empty(dsts); 514 return !list_empty(dsts);
@@ -603,61 +517,64 @@ exit:
603void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, 517void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
604 u32 scope, bool exact, struct list_head *dports) 518 u32 scope, bool exact, struct list_head *dports)
605{ 519{
606 struct sub_seq *sseq_stop; 520 struct service_range *sr;
607 struct name_info *info; 521 struct tipc_service *sc;
608 struct publication *p; 522 struct publication *p;
609 struct name_seq *seq; 523 struct rb_node *n;
610 struct sub_seq *sseq;
611 524
612 rcu_read_lock(); 525 rcu_read_lock();
613 seq = nametbl_find_seq(net, type); 526 sc = tipc_service_find(net, type);
614 if (!seq) 527 if (!sc)
615 goto exit; 528 goto exit;
616 529
617 spin_lock_bh(&seq->lock); 530 spin_lock_bh(&sc->lock);
618 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 531
619 sseq_stop = seq->sseqs + seq->first_free; 532 for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
620 for (; sseq != sseq_stop; sseq++) { 533 sr = container_of(n, struct service_range, tree_node);
621 if (sseq->lower > upper) 534 if (sr->upper < lower)
535 continue;
536 if (sr->lower > upper)
622 break; 537 break;
623 info = sseq->info; 538 list_for_each_entry(p, &sr->local_publ, local_publ) {
624 list_for_each_entry(p, &info->local_publ, local_publ) {
625 if (p->scope == scope || (!exact && p->scope < scope)) 539 if (p->scope == scope || (!exact && p->scope < scope))
626 tipc_dest_push(dports, 0, p->port); 540 tipc_dest_push(dports, 0, p->port);
627 } 541 }
628 } 542 }
629 spin_unlock_bh(&seq->lock); 543 spin_unlock_bh(&sc->lock);
630exit: 544exit:
631 rcu_read_unlock(); 545 rcu_read_unlock();
632} 546}
633 547
634/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes 548/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes
635 * - Creates list of nodes that overlap the given multicast address 549 * - Creates list of nodes that overlap the given multicast address
636 * - Determines if any node local ports overlap 550 * - Determines if any node local destinations overlap
637 */ 551 */
638void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 552void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
639 u32 upper, struct tipc_nlist *nodes) 553 u32 upper, struct tipc_nlist *nodes)
640{ 554{
641 struct sub_seq *sseq, *stop; 555 struct service_range *sr;
642 struct publication *publ; 556 struct tipc_service *sc;
643 struct name_info *info; 557 struct publication *p;
644 struct name_seq *seq; 558 struct rb_node *n;
645 559
646 rcu_read_lock(); 560 rcu_read_lock();
647 seq = nametbl_find_seq(net, type); 561 sc = tipc_service_find(net, type);
648 if (!seq) 562 if (!sc)
649 goto exit; 563 goto exit;
650 564
651 spin_lock_bh(&seq->lock); 565 spin_lock_bh(&sc->lock);
652 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 566
653 stop = seq->sseqs + seq->first_free; 567 for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
654 for (; sseq != stop && sseq->lower <= upper; sseq++) { 568 sr = container_of(n, struct service_range, tree_node);
655 info = sseq->info; 569 if (sr->upper < lower)
656 list_for_each_entry(publ, &info->all_publ, all_publ) { 570 continue;
657 tipc_nlist_add(nodes, publ->node); 571 if (sr->lower > upper)
572 break;
573 list_for_each_entry(p, &sr->all_publ, all_publ) {
574 tipc_nlist_add(nodes, p->node);
658 } 575 }
659 } 576 }
660 spin_unlock_bh(&seq->lock); 577 spin_unlock_bh(&sc->lock);
661exit: 578exit:
662 rcu_read_unlock(); 579 rcu_read_unlock();
663} 580}
@@ -667,89 +584,88 @@ exit:
667void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 584void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
668 u32 type, u32 scope) 585 u32 type, u32 scope)
669{ 586{
670 struct sub_seq *sseq, *stop; 587 struct service_range *sr;
671 struct name_info *info; 588 struct tipc_service *sc;
672 struct publication *p; 589 struct publication *p;
673 struct name_seq *seq; 590 struct rb_node *n;
674 591
675 rcu_read_lock(); 592 rcu_read_lock();
676 seq = nametbl_find_seq(net, type); 593 sc = tipc_service_find(net, type);
677 if (!seq) 594 if (!sc)
678 goto exit; 595 goto exit;
679 596
680 spin_lock_bh(&seq->lock); 597 spin_lock_bh(&sc->lock);
681 sseq = seq->sseqs; 598 for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
682 stop = seq->sseqs + seq->first_free; 599 sr = container_of(n, struct service_range, tree_node);
683 for (; sseq != stop; sseq++) { 600 list_for_each_entry(p, &sr->all_publ, all_publ) {
684 info = sseq->info;
685 list_for_each_entry(p, &info->all_publ, all_publ) {
686 if (p->scope != scope) 601 if (p->scope != scope)
687 continue; 602 continue;
688 tipc_group_add_member(grp, p->node, p->port, p->lower); 603 tipc_group_add_member(grp, p->node, p->port, p->lower);
689 } 604 }
690 } 605 }
691 spin_unlock_bh(&seq->lock); 606 spin_unlock_bh(&sc->lock);
692exit: 607exit:
693 rcu_read_unlock(); 608 rcu_read_unlock();
694} 609}
695 610
696/* 611/* tipc_nametbl_publish - add service binding to name table
697 * tipc_nametbl_publish - add name publication to network name tables
698 */ 612 */
699struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, 613struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
700 u32 upper, u32 scope, u32 port_ref, 614 u32 upper, u32 scope, u32 port,
701 u32 key) 615 u32 key)
702{ 616{
703 struct publication *publ; 617 struct name_table *nt = tipc_name_table(net);
704 struct sk_buff *buf = NULL; 618 struct tipc_net *tn = tipc_net(net);
705 struct tipc_net *tn = net_generic(net, tipc_net_id); 619 struct publication *p = NULL;
620 struct sk_buff *skb = NULL;
706 621
707 spin_lock_bh(&tn->nametbl_lock); 622 spin_lock_bh(&tn->nametbl_lock);
708 if (tn->nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) { 623
709 pr_warn("Publication failed, local publication limit reached (%u)\n", 624 if (nt->local_publ_count >= TIPC_MAX_PUBL) {
710 TIPC_MAX_PUBLICATIONS); 625 pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL);
711 spin_unlock_bh(&tn->nametbl_lock); 626 goto exit;
712 return NULL;
713 } 627 }
714 628
715 publ = tipc_nametbl_insert_publ(net, type, lower, upper, scope, 629 p = tipc_nametbl_insert_publ(net, type, lower, upper, scope,
716 tipc_own_addr(net), port_ref, key); 630 tipc_own_addr(net), port, key);
717 if (likely(publ)) { 631 if (p) {
718 tn->nametbl->local_publ_count++; 632 nt->local_publ_count++;
719 buf = tipc_named_publish(net, publ); 633 skb = tipc_named_publish(net, p);
720 /* Any pending external events? */ 634 /* Any pending external events? */
721 tipc_named_process_backlog(net); 635 tipc_named_process_backlog(net);
722 } 636 }
637exit:
723 spin_unlock_bh(&tn->nametbl_lock); 638 spin_unlock_bh(&tn->nametbl_lock);
724 639
725 if (buf) 640 if (skb)
726 tipc_node_broadcast(net, buf); 641 tipc_node_broadcast(net, skb);
727 return publ; 642 return p;
728} 643}
729 644
730/** 645/**
731 * tipc_nametbl_withdraw - withdraw name publication from network name tables 646 * tipc_nametbl_withdraw - withdraw a service binding
732 */ 647 */
733int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, 648int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
734 u32 key) 649 u32 port, u32 key)
735{ 650{
736 struct publication *publ; 651 struct name_table *nt = tipc_name_table(net);
652 struct tipc_net *tn = tipc_net(net);
653 u32 self = tipc_own_addr(net);
737 struct sk_buff *skb = NULL; 654 struct sk_buff *skb = NULL;
738 struct tipc_net *tn = net_generic(net, tipc_net_id); 655 struct publication *p;
739 656
740 spin_lock_bh(&tn->nametbl_lock); 657 spin_lock_bh(&tn->nametbl_lock);
741 publ = tipc_nametbl_remove_publ(net, type, lower, tipc_own_addr(net), 658
742 port, key); 659 p = tipc_nametbl_remove_publ(net, type, lower, self, port, key);
743 if (likely(publ)) { 660 if (p) {
744 tn->nametbl->local_publ_count--; 661 nt->local_publ_count--;
745 skb = tipc_named_withdraw(net, publ); 662 skb = tipc_named_withdraw(net, p);
746 /* Any pending external events? */ 663 /* Any pending external events? */
747 tipc_named_process_backlog(net); 664 tipc_named_process_backlog(net);
748 list_del_init(&publ->binding_sock); 665 list_del_init(&p->binding_sock);
749 kfree_rcu(publ, rcu); 666 kfree_rcu(p, rcu);
750 } else { 667 } else {
751 pr_err("Unable to remove local publication\n" 668 pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
752 "(type=%u, lower=%u, port=%u, key=%u)\n",
753 type, lower, port, key); 669 type, lower, port, key);
754 } 670 }
755 spin_unlock_bh(&tn->nametbl_lock); 671 spin_unlock_bh(&tn->nametbl_lock);
@@ -766,27 +682,24 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port,
766 */ 682 */
767void tipc_nametbl_subscribe(struct tipc_subscription *sub) 683void tipc_nametbl_subscribe(struct tipc_subscription *sub)
768{ 684{
685 struct name_table *nt = tipc_name_table(sub->net);
769 struct tipc_net *tn = tipc_net(sub->net); 686 struct tipc_net *tn = tipc_net(sub->net);
770 struct tipc_subscr *s = &sub->evt.s; 687 struct tipc_subscr *s = &sub->evt.s;
771 u32 type = tipc_sub_read(s, seq.type); 688 u32 type = tipc_sub_read(s, seq.type);
772 int index = hash(type); 689 struct tipc_service *sc;
773 struct name_seq *seq;
774 struct tipc_name_seq ns;
775 690
776 spin_lock_bh(&tn->nametbl_lock); 691 spin_lock_bh(&tn->nametbl_lock);
777 seq = nametbl_find_seq(sub->net, type); 692 sc = tipc_service_find(sub->net, type);
778 if (!seq) 693 if (!sc)
779 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); 694 sc = tipc_service_create(type, &nt->services[hash(type)]);
780 if (seq) { 695 if (sc) {
781 spin_lock_bh(&seq->lock); 696 spin_lock_bh(&sc->lock);
782 tipc_nameseq_subscribe(seq, sub); 697 tipc_service_subscribe(sc, sub);
783 spin_unlock_bh(&seq->lock); 698 spin_unlock_bh(&sc->lock);
784 } else { 699 } else {
785 ns.type = tipc_sub_read(s, seq.type); 700 pr_warn("Failed to subscribe for {%u,%u,%u}\n", type,
786 ns.lower = tipc_sub_read(s, seq.lower); 701 tipc_sub_read(s, seq.lower),
787 ns.upper = tipc_sub_read(s, seq.upper); 702 tipc_sub_read(s, seq.upper));
788 pr_warn("Failed to create subscription for {%u,%u,%u}\n",
789 ns.type, ns.lower, ns.upper);
790 } 703 }
791 spin_unlock_bh(&tn->nametbl_lock); 704 spin_unlock_bh(&tn->nametbl_lock);
792} 705}
@@ -796,124 +709,122 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub)
796 */ 709 */
797void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) 710void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
798{ 711{
799 struct tipc_subscr *s = &sub->evt.s;
800 struct tipc_net *tn = tipc_net(sub->net); 712 struct tipc_net *tn = tipc_net(sub->net);
801 struct name_seq *seq; 713 struct tipc_subscr *s = &sub->evt.s;
802 u32 type = tipc_sub_read(s, seq.type); 714 u32 type = tipc_sub_read(s, seq.type);
715 struct tipc_service *sc;
803 716
804 spin_lock_bh(&tn->nametbl_lock); 717 spin_lock_bh(&tn->nametbl_lock);
805 seq = nametbl_find_seq(sub->net, type); 718 sc = tipc_service_find(sub->net, type);
806 if (seq != NULL) { 719 if (!sc)
807 spin_lock_bh(&seq->lock); 720 goto exit;
808 list_del_init(&sub->nameseq_list); 721
809 tipc_sub_put(sub); 722 spin_lock_bh(&sc->lock);
810 if (!seq->first_free && list_empty(&seq->subscriptions)) { 723 list_del_init(&sub->service_list);
811 hlist_del_init_rcu(&seq->ns_list); 724 tipc_sub_put(sub);
812 kfree(seq->sseqs); 725
813 spin_unlock_bh(&seq->lock); 726 /* Delete service item if no more publications and subscriptions */
814 kfree_rcu(seq, rcu); 727 if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
815 } else { 728 hlist_del_init_rcu(&sc->service_list);
816 spin_unlock_bh(&seq->lock); 729 kfree_rcu(sc, rcu);
817 }
818 } 730 }
731 spin_unlock_bh(&sc->lock);
732exit:
819 spin_unlock_bh(&tn->nametbl_lock); 733 spin_unlock_bh(&tn->nametbl_lock);
820} 734}
821 735
822int tipc_nametbl_init(struct net *net) 736int tipc_nametbl_init(struct net *net)
823{ 737{
824 struct tipc_net *tn = net_generic(net, tipc_net_id); 738 struct tipc_net *tn = tipc_net(net);
825 struct name_table *tipc_nametbl; 739 struct name_table *nt;
826 int i; 740 int i;
827 741
828 tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC); 742 nt = kzalloc(sizeof(*nt), GFP_ATOMIC);
829 if (!tipc_nametbl) 743 if (!nt)
830 return -ENOMEM; 744 return -ENOMEM;
831 745
832 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) 746 for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
833 INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]); 747 INIT_HLIST_HEAD(&nt->services[i]);
834 748
835 INIT_LIST_HEAD(&tipc_nametbl->node_scope); 749 INIT_LIST_HEAD(&nt->node_scope);
836 INIT_LIST_HEAD(&tipc_nametbl->cluster_scope); 750 INIT_LIST_HEAD(&nt->cluster_scope);
837 tn->nametbl = tipc_nametbl; 751 tn->nametbl = nt;
838 spin_lock_init(&tn->nametbl_lock); 752 spin_lock_init(&tn->nametbl_lock);
839 return 0; 753 return 0;
840} 754}
841 755
842/** 756/**
843 * tipc_purge_publications - remove all publications for a given type 757 * tipc_service_delete - purge all publications for a service and delete it
844 *
845 * tipc_nametbl_lock must be held when calling this function
846 */ 758 */
847static void tipc_purge_publications(struct net *net, struct name_seq *seq) 759static void tipc_service_delete(struct net *net, struct tipc_service *sc)
848{ 760{
849 struct publication *publ, *safe; 761 struct service_range *sr, *tmpr;
850 struct sub_seq *sseq; 762 struct publication *p, *tmpb;
851 struct name_info *info; 763
852 764 spin_lock_bh(&sc->lock);
853 spin_lock_bh(&seq->lock); 765 rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
854 sseq = seq->sseqs; 766 list_for_each_entry_safe(p, tmpb,
855 info = sseq->info; 767 &sr->all_publ, all_publ) {
856 list_for_each_entry_safe(publ, safe, &info->all_publ, all_publ) { 768 tipc_service_remove_publ(net, sc, p->lower, p->node,
857 tipc_nameseq_remove_publ(net, seq, publ->lower, publ->node, 769 p->port, p->key);
858 publ->port, publ->key); 770 kfree_rcu(p, rcu);
859 kfree_rcu(publ, rcu); 771 }
860 } 772 }
861 hlist_del_init_rcu(&seq->ns_list); 773 hlist_del_init_rcu(&sc->service_list);
862 kfree(seq->sseqs); 774 spin_unlock_bh(&sc->lock);
863 spin_unlock_bh(&seq->lock); 775 kfree_rcu(sc, rcu);
864
865 kfree_rcu(seq, rcu);
866} 776}
867 777
868void tipc_nametbl_stop(struct net *net) 778void tipc_nametbl_stop(struct net *net)
869{ 779{
780 struct name_table *nt = tipc_name_table(net);
781 struct tipc_net *tn = tipc_net(net);
782 struct hlist_head *service_head;
783 struct tipc_service *service;
870 u32 i; 784 u32 i;
871 struct name_seq *seq;
872 struct hlist_head *seq_head;
873 struct tipc_net *tn = net_generic(net, tipc_net_id);
874 struct name_table *tipc_nametbl = tn->nametbl;
875 785
876 /* Verify name table is empty and purge any lingering 786 /* Verify name table is empty and purge any lingering
877 * publications, then release the name table 787 * publications, then release the name table
878 */ 788 */
879 spin_lock_bh(&tn->nametbl_lock); 789 spin_lock_bh(&tn->nametbl_lock);
880 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { 790 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
881 if (hlist_empty(&tipc_nametbl->seq_hlist[i])) 791 if (hlist_empty(&nt->services[i]))
882 continue; 792 continue;
883 seq_head = &tipc_nametbl->seq_hlist[i]; 793 service_head = &nt->services[i];
884 hlist_for_each_entry_rcu(seq, seq_head, ns_list) { 794 hlist_for_each_entry_rcu(service, service_head, service_list) {
885 tipc_purge_publications(net, seq); 795 tipc_service_delete(net, service);
886 } 796 }
887 } 797 }
888 spin_unlock_bh(&tn->nametbl_lock); 798 spin_unlock_bh(&tn->nametbl_lock);
889 799
890 synchronize_net(); 800 synchronize_net();
891 kfree(tipc_nametbl); 801 kfree(nt);
892
893} 802}
894 803
895static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, 804static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
896 struct name_seq *seq, 805 struct tipc_service *service,
897 struct sub_seq *sseq, u32 *last_publ) 806 struct service_range *sr,
807 u32 *last_key)
898{ 808{
899 void *hdr;
900 struct nlattr *attrs;
901 struct nlattr *publ;
902 struct publication *p; 809 struct publication *p;
810 struct nlattr *attrs;
811 struct nlattr *b;
812 void *hdr;
903 813
904 if (*last_publ) { 814 if (*last_key) {
905 list_for_each_entry(p, &sseq->info->all_publ, all_publ) 815 list_for_each_entry(p, &sr->all_publ, all_publ)
906 if (p->key == *last_publ) 816 if (p->key == *last_key)
907 break; 817 break;
908 if (p->key != *last_publ) 818 if (p->key != *last_key)
909 return -EPIPE; 819 return -EPIPE;
910 } else { 820 } else {
911 p = list_first_entry(&sseq->info->all_publ, struct publication, 821 p = list_first_entry(&sr->all_publ,
822 struct publication,
912 all_publ); 823 all_publ);
913 } 824 }
914 825
915 list_for_each_entry_from(p, &sseq->info->all_publ, all_publ) { 826 list_for_each_entry_from(p, &sr->all_publ, all_publ) {
916 *last_publ = p->key; 827 *last_key = p->key;
917 828
918 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, 829 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
919 &tipc_genl_family, NLM_F_MULTI, 830 &tipc_genl_family, NLM_F_MULTI,
@@ -925,15 +836,15 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
925 if (!attrs) 836 if (!attrs)
926 goto msg_full; 837 goto msg_full;
927 838
928 publ = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); 839 b = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL);
929 if (!publ) 840 if (!b)
930 goto attr_msg_full; 841 goto attr_msg_full;
931 842
932 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, seq->type)) 843 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, service->type))
933 goto publ_msg_full; 844 goto publ_msg_full;
934 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sseq->lower)) 845 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sr->lower))
935 goto publ_msg_full; 846 goto publ_msg_full;
936 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sseq->upper)) 847 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sr->upper))
937 goto publ_msg_full; 848 goto publ_msg_full;
938 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope)) 849 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope))
939 goto publ_msg_full; 850 goto publ_msg_full;
@@ -944,16 +855,16 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
944 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key)) 855 if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key))
945 goto publ_msg_full; 856 goto publ_msg_full;
946 857
947 nla_nest_end(msg->skb, publ); 858 nla_nest_end(msg->skb, b);
948 nla_nest_end(msg->skb, attrs); 859 nla_nest_end(msg->skb, attrs);
949 genlmsg_end(msg->skb, hdr); 860 genlmsg_end(msg->skb, hdr);
950 } 861 }
951 *last_publ = 0; 862 *last_key = 0;
952 863
953 return 0; 864 return 0;
954 865
955publ_msg_full: 866publ_msg_full:
956 nla_nest_cancel(msg->skb, publ); 867 nla_nest_cancel(msg->skb, b);
957attr_msg_full: 868attr_msg_full:
958 nla_nest_cancel(msg->skb, attrs); 869 nla_nest_cancel(msg->skb, attrs);
959msg_full: 870msg_full:
@@ -962,39 +873,34 @@ msg_full:
962 return -EMSGSIZE; 873 return -EMSGSIZE;
963} 874}
964 875
965static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, 876static int __tipc_nl_service_range_list(struct tipc_nl_msg *msg,
966 u32 *last_lower, u32 *last_publ) 877 struct tipc_service *sc,
878 u32 *last_lower, u32 *last_key)
967{ 879{
968 struct sub_seq *sseq; 880 struct service_range *sr;
969 struct sub_seq *sseq_start; 881 struct rb_node *n;
970 int err; 882 int err;
971 883
972 if (*last_lower) { 884 for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
973 sseq_start = nameseq_find_subseq(seq, *last_lower); 885 sr = container_of(n, struct service_range, tree_node);
974 if (!sseq_start) 886 if (sr->lower < *last_lower)
975 return -EPIPE; 887 continue;
976 } else { 888 err = __tipc_nl_add_nametable_publ(msg, sc, sr, last_key);
977 sseq_start = seq->sseqs;
978 }
979
980 for (sseq = sseq_start; sseq != &seq->sseqs[seq->first_free]; sseq++) {
981 err = __tipc_nl_add_nametable_publ(msg, seq, sseq, last_publ);
982 if (err) { 889 if (err) {
983 *last_lower = sseq->lower; 890 *last_lower = sr->lower;
984 return err; 891 return err;
985 } 892 }
986 } 893 }
987 *last_lower = 0; 894 *last_lower = 0;
988
989 return 0; 895 return 0;
990} 896}
991 897
992static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, 898static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg,
993 u32 *last_type, u32 *last_lower, u32 *last_publ) 899 u32 *last_type, u32 *last_lower, u32 *last_key)
994{ 900{
995 struct tipc_net *tn = net_generic(net, tipc_net_id); 901 struct tipc_net *tn = tipc_net(net);
996 struct hlist_head *seq_head; 902 struct tipc_service *service = NULL;
997 struct name_seq *seq = NULL; 903 struct hlist_head *head;
998 int err; 904 int err;
999 int i; 905 int i;
1000 906
@@ -1004,30 +910,31 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg,
1004 i = 0; 910 i = 0;
1005 911
1006 for (; i < TIPC_NAMETBL_SIZE; i++) { 912 for (; i < TIPC_NAMETBL_SIZE; i++) {
1007 seq_head = &tn->nametbl->seq_hlist[i]; 913 head = &tn->nametbl->services[i];
1008 914
1009 if (*last_type) { 915 if (*last_type) {
1010 seq = nametbl_find_seq(net, *last_type); 916 service = tipc_service_find(net, *last_type);
1011 if (!seq) 917 if (!service)
1012 return -EPIPE; 918 return -EPIPE;
1013 } else { 919 } else {
1014 hlist_for_each_entry_rcu(seq, seq_head, ns_list) 920 hlist_for_each_entry_rcu(service, head, service_list)
1015 break; 921 break;
1016 if (!seq) 922 if (!service)
1017 continue; 923 continue;
1018 } 924 }
1019 925
1020 hlist_for_each_entry_from_rcu(seq, ns_list) { 926 hlist_for_each_entry_from_rcu(service, service_list) {
1021 spin_lock_bh(&seq->lock); 927 spin_lock_bh(&service->lock);
1022 err = __tipc_nl_subseq_list(msg, seq, last_lower, 928 err = __tipc_nl_service_range_list(msg, service,
1023 last_publ); 929 last_lower,
930 last_key);
1024 931
1025 if (err) { 932 if (err) {
1026 *last_type = seq->type; 933 *last_type = service->type;
1027 spin_unlock_bh(&seq->lock); 934 spin_unlock_bh(&service->lock);
1028 return err; 935 return err;
1029 } 936 }
1030 spin_unlock_bh(&seq->lock); 937 spin_unlock_bh(&service->lock);
1031 } 938 }
1032 *last_type = 0; 939 *last_type = 0;
1033 } 940 }
@@ -1036,13 +943,13 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg,
1036 943
1037int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) 944int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1038{ 945{
1039 int err; 946 struct net *net = sock_net(skb->sk);
1040 int done = cb->args[3];
1041 u32 last_type = cb->args[0]; 947 u32 last_type = cb->args[0];
1042 u32 last_lower = cb->args[1]; 948 u32 last_lower = cb->args[1];
1043 u32 last_publ = cb->args[2]; 949 u32 last_key = cb->args[2];
1044 struct net *net = sock_net(skb->sk); 950 int done = cb->args[3];
1045 struct tipc_nl_msg msg; 951 struct tipc_nl_msg msg;
952 int err;
1046 953
1047 if (done) 954 if (done)
1048 return 0; 955 return 0;
@@ -1052,7 +959,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1052 msg.seq = cb->nlh->nlmsg_seq; 959 msg.seq = cb->nlh->nlmsg_seq;
1053 960
1054 rcu_read_lock(); 961 rcu_read_lock();
1055 err = tipc_nl_seq_list(net, &msg, &last_type, &last_lower, &last_publ); 962 err = tipc_nl_service_list(net, &msg, &last_type,
963 &last_lower, &last_key);
1056 if (!err) { 964 if (!err) {
1057 done = 1; 965 done = 1;
1058 } else if (err != -EMSGSIZE) { 966 } else if (err != -EMSGSIZE) {
@@ -1068,7 +976,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1068 976
1069 cb->args[0] = last_type; 977 cb->args[0] = last_type;
1070 cb->args[1] = last_lower; 978 cb->args[1] = last_lower;
1071 cb->args[2] = last_publ; 979 cb->args[2] = last_key;
1072 cb->args[3] = done; 980 cb->args[3] = done;
1073 981
1074 return skb->len; 982 return skb->len;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 34a4ccb907aa..1b03b8751707 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -97,7 +97,7 @@ struct publication {
97 * @local_publ_count: number of publications issued by this node 97 * @local_publ_count: number of publications issued by this node
98 */ 98 */
99struct name_table { 99struct name_table {
100 struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE]; 100 struct hlist_head services[TIPC_NAMETBL_SIZE];
101 struct list_head node_scope; 101 struct list_head node_scope;
102 struct list_head cluster_scope; 102 struct list_head cluster_scope;
103 u32 local_publ_count; 103 u32 local_publ_count;
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 4fb4327311bb..85e777e00ec9 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -324,12 +324,12 @@ static void tipc_node_write_unlock(struct tipc_node *n)
324 if (flags & TIPC_NOTIFY_LINK_UP) { 324 if (flags & TIPC_NOTIFY_LINK_UP) {
325 tipc_mon_peer_up(net, addr, bearer_id); 325 tipc_mon_peer_up(net, addr, bearer_id);
326 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr, 326 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
327 TIPC_NODE_SCOPE, link_id, addr); 327 TIPC_NODE_SCOPE, link_id, link_id);
328 } 328 }
329 if (flags & TIPC_NOTIFY_LINK_DOWN) { 329 if (flags & TIPC_NOTIFY_LINK_DOWN) {
330 tipc_mon_peer_down(net, addr, bearer_id); 330 tipc_mon_peer_down(net, addr, bearer_id);
331 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 331 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
332 link_id, addr); 332 link_id, link_id);
333 } 333 }
334} 334}
335 335
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 8b2d22b18f22..d793b4343885 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -40,7 +40,7 @@
40#include "topsrv.h" 40#include "topsrv.h"
41 41
42#define TIPC_MAX_SUBSCR 65535 42#define TIPC_MAX_SUBSCR 65535
43#define TIPC_MAX_PUBLICATIONS 65535 43#define TIPC_MAX_PUBL 65535
44 44
45struct tipc_subscription; 45struct tipc_subscription;
46struct tipc_conn; 46struct tipc_conn;
@@ -58,7 +58,7 @@ struct tipc_subscription {
58 struct kref kref; 58 struct kref kref;
59 struct net *net; 59 struct net *net;
60 struct timer_list timer; 60 struct timer_list timer;
61 struct list_head nameseq_list; 61 struct list_head service_list;
62 struct list_head sub_list; 62 struct list_head sub_list;
63 struct tipc_event evt; 63 struct tipc_event evt;
64 int conid; 64 int conid;