aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-02-15 04:40:46 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 15:26:33 -0500
commit8985ecc7c1e07c42acc1e44ac56fa224f8a5c62f (patch)
treeed564ee8b615b55e4569b38ea039e3eb89dccbe0 /net/tipc
parent414574a0af36d329f560f542e650cc4a81cc1d69 (diff)
tipc: simplify endianness handling in topology subscriber
Because of the requirement for total distribution transparency, users send subscriptions and receive topology events in their own host format. It is up to the topology server to determine this format and do the correct conversions to and from its own host format when needed. Until now, this has been handled in a rather non-transparent way inside the topology server and subscriber code, leading to unnecessary complexity when creating subscriptions and issuing events. We now improve this situation by adding two new macros, tipc_sub_read() and tipc_evt_write(). Both those functions calculate the need for conversion internally before performing their respective operations. Hence, all handling of such conversions become transparent to the rest of the code. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/name_table.c42
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/server.c25
-rw-r--r--net/tipc/subscr.c83
-rw-r--r--net/tipc/subscr.h36
5 files changed, 86 insertions, 102 deletions
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index c0ca7be46f7a..2fbd0a20a958 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -412,18 +412,22 @@ found:
412 * sequence overlapping with the requested sequence 412 * sequence overlapping with the requested sequence
413 */ 413 */
414static void tipc_nameseq_subscribe(struct name_seq *nseq, 414static void tipc_nameseq_subscribe(struct name_seq *nseq,
415 struct tipc_subscription *s, 415 struct tipc_subscription *sub)
416 bool status)
417{ 416{
418 struct sub_seq *sseq = nseq->sseqs; 417 struct sub_seq *sseq = nseq->sseqs;
419 struct tipc_name_seq ns; 418 struct tipc_name_seq ns;
419 struct tipc_subscr *s = &sub->evt.s;
420 bool no_status;
420 421
421 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); 422 ns.type = tipc_sub_read(s, seq.type);
423 ns.lower = tipc_sub_read(s, seq.lower);
424 ns.upper = tipc_sub_read(s, seq.upper);
425 no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS;
422 426
423 tipc_subscrp_get(s); 427 tipc_subscrp_get(sub);
424 list_add(&s->nameseq_list, &nseq->subscriptions); 428 list_add(&sub->nameseq_list, &nseq->subscriptions);
425 429
426 if (!status || !sseq) 430 if (no_status || !sseq)
427 return; 431 return;
428 432
429 while (sseq != &nseq->sseqs[nseq->first_free]) { 433 while (sseq != &nseq->sseqs[nseq->first_free]) {
@@ -433,7 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
433 int must_report = 1; 437 int must_report = 1;
434 438
435 list_for_each_entry(crs, &info->zone_list, zone_list) { 439 list_for_each_entry(crs, &info->zone_list, zone_list) {
436 tipc_subscrp_report_overlap(s, sseq->lower, 440 tipc_subscrp_report_overlap(sub, sseq->lower,
437 sseq->upper, 441 sseq->upper,
438 TIPC_PUBLISHED, 442 TIPC_PUBLISHED,
439 crs->ref, crs->node, 443 crs->ref, crs->node,
@@ -808,11 +812,12 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
808/** 812/**
809 * tipc_nametbl_subscribe - add a subscription object to the name table 813 * tipc_nametbl_subscribe - add a subscription object to the name table
810 */ 814 */
811void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) 815void tipc_nametbl_subscribe(struct tipc_subscription *sub)
812{ 816{
813 struct tipc_server *srv = s->server; 817 struct tipc_server *srv = sub->server;
814 struct tipc_net *tn = tipc_net(srv->net); 818 struct tipc_net *tn = tipc_net(srv->net);
815 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); 819 struct tipc_subscr *s = &sub->evt.s;
820 u32 type = tipc_sub_read(s, seq.type);
816 int index = hash(type); 821 int index = hash(type);
817 struct name_seq *seq; 822 struct name_seq *seq;
818 struct tipc_name_seq ns; 823 struct tipc_name_seq ns;
@@ -823,10 +828,12 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
823 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); 828 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
824 if (seq) { 829 if (seq) {
825 spin_lock_bh(&seq->lock); 830 spin_lock_bh(&seq->lock);
826 tipc_nameseq_subscribe(seq, s, status); 831 tipc_nameseq_subscribe(seq, sub);
827 spin_unlock_bh(&seq->lock); 832 spin_unlock_bh(&seq->lock);
828 } else { 833 } else {
829 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); 834 ns.type = tipc_sub_read(s, seq.type);
835 ns.lower = tipc_sub_read(s, seq.lower);
836 ns.upper = tipc_sub_read(s, seq.upper);
830 pr_warn("Failed to create subscription for {%u,%u,%u}\n", 837 pr_warn("Failed to create subscription for {%u,%u,%u}\n",
831 ns.type, ns.lower, ns.upper); 838 ns.type, ns.lower, ns.upper);
832 } 839 }
@@ -836,19 +843,20 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
836/** 843/**
837 * tipc_nametbl_unsubscribe - remove a subscription object from name table 844 * tipc_nametbl_unsubscribe - remove a subscription object from name table
838 */ 845 */
839void tipc_nametbl_unsubscribe(struct tipc_subscription *s) 846void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
840{ 847{
841 struct tipc_server *srv = s->server; 848 struct tipc_server *srv = sub->server;
849 struct tipc_subscr *s = &sub->evt.s;
842 struct tipc_net *tn = tipc_net(srv->net); 850 struct tipc_net *tn = tipc_net(srv->net);
843 struct name_seq *seq; 851 struct name_seq *seq;
844 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); 852 u32 type = tipc_sub_read(s, seq.type);
845 853
846 spin_lock_bh(&tn->nametbl_lock); 854 spin_lock_bh(&tn->nametbl_lock);
847 seq = nametbl_find_seq(srv->net, type); 855 seq = nametbl_find_seq(srv->net, type);
848 if (seq != NULL) { 856 if (seq != NULL) {
849 spin_lock_bh(&seq->lock); 857 spin_lock_bh(&seq->lock);
850 list_del_init(&s->nameseq_list); 858 list_del_init(&sub->nameseq_list);
851 tipc_subscrp_put(s); 859 tipc_subscrp_put(sub);
852 if (!seq->first_free && list_empty(&seq->subscriptions)) { 860 if (!seq->first_free && list_empty(&seq->subscriptions)) {
853 hlist_del_init_rcu(&seq->ns_list); 861 hlist_del_init_rcu(&seq->ns_list);
854 kfree(seq->sseqs); 862 kfree(seq->sseqs);
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index f56e7cb3d436..17652602d5e2 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -120,7 +120,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
120struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 120struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
121 u32 lower, u32 node, u32 ref, 121 u32 lower, u32 node, u32 ref,
122 u32 key); 122 u32 key);
123void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status); 123void tipc_nametbl_subscribe(struct tipc_subscription *s);
124void tipc_nametbl_unsubscribe(struct tipc_subscription *s); 124void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
125int tipc_nametbl_init(struct net *net); 125int tipc_nametbl_init(struct net *net);
126void tipc_nametbl_stop(struct net *net); 126void tipc_nametbl_stop(struct net *net);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 7933fb92d852..5d231fa8e4b5 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -98,18 +98,6 @@ static bool connected(struct tipc_conn *con)
98 return con && test_bit(CF_CONNECTED, &con->flags); 98 return con && test_bit(CF_CONNECTED, &con->flags);
99} 99}
100 100
101/**
102 * htohl - convert value to endianness used by destination
103 * @in: value to convert
104 * @swap: non-zero if endianness must be reversed
105 *
106 * Returns converted value
107 */
108static u32 htohl(u32 in, int swap)
109{
110 return swap ? swab32(in) : in;
111}
112
113static void tipc_conn_kref_release(struct kref *kref) 101static void tipc_conn_kref_release(struct kref *kref)
114{ 102{
115 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); 103 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
@@ -285,21 +273,12 @@ static int tipc_con_rcv_sub(struct tipc_server *srv,
285 struct tipc_subscr *s) 273 struct tipc_subscr *s)
286{ 274{
287 struct tipc_subscription *sub; 275 struct tipc_subscription *sub;
288 bool status;
289 int swap;
290
291 /* Determine subscriber's endianness */
292 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
293 TIPC_SUB_CANCEL));
294 276
295 /* Detect & process a subscription cancellation request */ 277 if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) {
296 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
297 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
298 tipc_con_delete_sub(con, s); 278 tipc_con_delete_sub(con, s);
299 return 0; 279 return 0;
300 } 280 }
301 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); 281 sub = tipc_subscrp_subscribe(srv, s, con->conid);
302 sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
303 if (!sub) 282 if (!sub)
304 return -1; 283 return -1;
305 284
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index c3e0b924e8c2..406b09fc227b 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -38,32 +38,19 @@
38#include "name_table.h" 38#include "name_table.h"
39#include "subscr.h" 39#include "subscr.h"
40 40
41/**
42 * htohl - convert value to endianness used by destination
43 * @in: value to convert
44 * @swap: non-zero if endianness must be reversed
45 *
46 * Returns converted value
47 */
48static u32 htohl(u32 in, int swap)
49{
50 return swap ? swab32(in) : in;
51}
52
53static void tipc_subscrp_send_event(struct tipc_subscription *sub, 41static void tipc_subscrp_send_event(struct tipc_subscription *sub,
54 u32 found_lower, u32 found_upper, 42 u32 found_lower, u32 found_upper,
55 u32 event, u32 port, u32 node) 43 u32 event, u32 port, u32 node)
56{ 44{
57 struct tipc_event *evt = &sub->evt; 45 struct tipc_event *evt = &sub->evt;
58 bool swap = sub->swap;
59 46
60 if (sub->inactive) 47 if (sub->inactive)
61 return; 48 return;
62 evt->event = htohl(event, swap); 49 tipc_evt_write(evt, event, event);
63 evt->found_lower = htohl(found_lower, swap); 50 tipc_evt_write(evt, found_lower, found_lower);
64 evt->found_upper = htohl(found_upper, swap); 51 tipc_evt_write(evt, found_upper, found_upper);
65 evt->port.ref = htohl(port, swap); 52 tipc_evt_write(evt, port.ref, port);
66 evt->port.node = htohl(node, swap); 53 tipc_evt_write(evt, port.node, node);
67 tipc_conn_queue_evt(sub->server, sub->conid, event, evt); 54 tipc_conn_queue_evt(sub->server, sub->conid, event, evt);
68} 55}
69 56
@@ -85,29 +72,22 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
85 return 1; 72 return 1;
86} 73}
87 74
88u32 tipc_subscrp_convert_seq_type(u32 type, int swap) 75void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
76 u32 found_lower, u32 found_upper,
77 u32 event, u32 port, u32 node,
78 u32 scope, int must)
89{ 79{
90 return htohl(type, swap);
91}
92
93void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
94 struct tipc_name_seq *out)
95{
96 out->type = htohl(in->type, swap);
97 out->lower = htohl(in->lower, swap);
98 out->upper = htohl(in->upper, swap);
99}
100
101void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
102 u32 found_upper, u32 event, u32 port_ref,
103 u32 node, u32 scope, int must)
104{
105 u32 filter = htohl(sub->evt.s.filter, sub->swap);
106 struct tipc_name_seq seq; 80 struct tipc_name_seq seq;
81 struct tipc_subscr *s = &sub->evt.s;
82 u32 filter = tipc_sub_read(s, filter);
83
84 seq.type = tipc_sub_read(s, seq.type);
85 seq.lower = tipc_sub_read(s, seq.lower);
86 seq.upper = tipc_sub_read(s, seq.upper);
107 87
108 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);
109 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) 88 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))
110 return; 89 return;
90
111 if (!must && !(filter & TIPC_SUB_PORTS)) 91 if (!must && !(filter & TIPC_SUB_PORTS))
112 return; 92 return;
113 if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE) 93 if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
@@ -116,7 +96,7 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
116 return; 96 return;
117 spin_lock(&sub->lock); 97 spin_lock(&sub->lock);
118 tipc_subscrp_send_event(sub, found_lower, found_upper, 98 tipc_subscrp_send_event(sub, found_lower, found_upper,
119 event, port_ref, node); 99 event, port, node);
120 spin_unlock(&sub->lock); 100 spin_unlock(&sub->lock);
121} 101}
122 102
@@ -156,11 +136,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
156 136
157static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, 137static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv,
158 struct tipc_subscr *s, 138 struct tipc_subscr *s,
159 int conid, bool swap) 139 int conid)
160{ 140{
161 struct tipc_net *tn = tipc_net(srv->net); 141 struct tipc_net *tn = tipc_net(srv->net);
162 struct tipc_subscription *sub; 142 struct tipc_subscription *sub;
163 u32 filter = htohl(s->filter, swap); 143 u32 filter = tipc_sub_read(s, filter);
164 144
165 /* Refuse subscription if global limit exceeded */ 145 /* Refuse subscription if global limit exceeded */
166 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 146 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
@@ -177,39 +157,38 @@ static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv,
177 } 157 }
178 158
179 /* Initialize subscription object */ 159 /* Initialize subscription object */
160 if (filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE)
161 goto err;
162 if (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))
163 goto err;
180 sub->server = srv; 164 sub->server = srv;
181 sub->conid = conid; 165 sub->conid = conid;
182 sub->inactive = false; 166 sub->inactive = false;
183 if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
184 (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
185 pr_warn("Subscription rejected, illegal request\n");
186 kfree(sub);
187 return NULL;
188 }
189
190 sub->swap = swap;
191 memcpy(&sub->evt.s, s, sizeof(*s)); 167 memcpy(&sub->evt.s, s, sizeof(*s));
192 spin_lock_init(&sub->lock); 168 spin_lock_init(&sub->lock);
193 atomic_inc(&tn->subscription_count); 169 atomic_inc(&tn->subscription_count);
194 kref_init(&sub->kref); 170 kref_init(&sub->kref);
195 return sub; 171 return sub;
172err:
173 pr_warn("Subscription rejected, illegal request\n");
174 kfree(sub);
175 return NULL;
196} 176}
197 177
198struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, 178struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
199 struct tipc_subscr *s, 179 struct tipc_subscr *s,
200 int conid, bool swap, 180 int conid)
201 bool status)
202{ 181{
203 struct tipc_subscription *sub = NULL; 182 struct tipc_subscription *sub = NULL;
204 u32 timeout; 183 u32 timeout;
205 184
206 sub = tipc_subscrp_create(srv, s, conid, swap); 185 sub = tipc_subscrp_create(srv, s, conid);
207 if (!sub) 186 if (!sub)
208 return NULL; 187 return NULL;
209 188
210 tipc_nametbl_subscribe(sub, status); 189 tipc_nametbl_subscribe(sub);
211 timer_setup(&sub->timer, tipc_subscrp_timeout, 0); 190 timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
212 timeout = htohl(sub->evt.s.timeout, swap); 191 timeout = tipc_sub_read(&sub->evt.s, timeout);
213 if (timeout != TIPC_WAIT_FOREVER) 192 if (timeout != TIPC_WAIT_FOREVER)
214 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); 193 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
215 return sub; 194 return sub;
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 64ffd32d15e6..db80e41bba08 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -52,7 +52,6 @@ struct tipc_conn;
52 * @timer: timer governing subscription duration (optional) 52 * @timer: timer governing subscription duration (optional)
53 * @nameseq_list: adjacent subscriptions in name sequence's subscription list 53 * @nameseq_list: adjacent subscriptions in name sequence's subscription list
54 * @subscrp_list: adjacent subscriptions in subscriber's subscription list 54 * @subscrp_list: adjacent subscriptions in subscriber's subscription list
55 * @swap: indicates if subscriber uses opposite endianness in its messages
56 * @evt: template for events generated by subscription 55 * @evt: template for events generated by subscription
57 */ 56 */
58struct tipc_subscription { 57struct tipc_subscription {
@@ -63,28 +62,47 @@ struct tipc_subscription {
63 struct list_head subscrp_list; 62 struct list_head subscrp_list;
64 struct tipc_event evt; 63 struct tipc_event evt;
65 int conid; 64 int conid;
66 bool swap;
67 bool inactive; 65 bool inactive;
68 spinlock_t lock; /* serialize up/down and timer events */ 66 spinlock_t lock; /* serialize up/down and timer events */
69}; 67};
70 68
71struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, 69struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
72 struct tipc_subscr *s, 70 struct tipc_subscr *s,
73 int conid, bool swap, 71 int conid);
74 bool status);
75void tipc_sub_delete(struct tipc_subscription *sub); 72void tipc_sub_delete(struct tipc_subscription *sub);
73
76int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, 74int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
77 u32 found_upper); 75 u32 found_upper);
78void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 76void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
79 u32 found_lower, u32 found_upper, u32 event, 77 u32 found_lower, u32 found_upper,
80 u32 port_ref, u32 node, u32 scope, int must); 78 u32 event, u32 port, u32 node,
81void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, 79 u32 scope, int must);
82 struct tipc_name_seq *out);
83u32 tipc_subscrp_convert_seq_type(u32 type, int swap);
84int tipc_topsrv_start(struct net *net); 80int tipc_topsrv_start(struct net *net);
85void tipc_topsrv_stop(struct net *net); 81void tipc_topsrv_stop(struct net *net);
86 82
87void tipc_subscrp_put(struct tipc_subscription *subscription); 83void tipc_subscrp_put(struct tipc_subscription *subscription);
88void tipc_subscrp_get(struct tipc_subscription *subscription); 84void tipc_subscrp_get(struct tipc_subscription *subscription);
89 85
86#define TIPC_FILTER_MASK (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | TIPC_SUB_CANCEL)
87
88/* tipc_sub_read - return field_ of struct sub_ in host endian format
89 */
90#define tipc_sub_read(sub_, field_) \
91 ({ \
92 struct tipc_subscr *sub__ = sub_; \
93 u32 val__ = (sub__)->field_; \
94 int swap_ = !((sub__)->filter & TIPC_FILTER_MASK); \
95 (swap_ ? swab32(val__) : val__); \
96 })
97
98/* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format
99 */
100#define tipc_evt_write(evt_, field_, val_) \
101 ({ \
102 struct tipc_event *evt__ = evt_; \
103 u32 val__ = val_; \
104 int swap_ = !((evt__)->s.filter & (TIPC_FILTER_MASK)); \
105 (evt__)->field_ = swap_ ? swab32(val__) : val__; \
106 })
107
90#endif 108#endif