aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/subscr.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/subscr.c')
-rw-r--r--net/tipc/subscr.c138
1 files changed, 71 insertions, 67 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 0344206b984f..1c147c869c2e 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -50,33 +50,6 @@ struct tipc_subscriber {
50 struct list_head subscription_list; 50 struct list_head subscription_list;
51}; 51};
52 52
53static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
54 void *usr_data, void *buf, size_t len);
55static void *subscr_named_msg_event(int conid);
56static void subscr_conn_shutdown_event(int conid, void *usr_data);
57
58static atomic_t subscription_count = ATOMIC_INIT(0);
59
60static struct sockaddr_tipc topsrv_addr __read_mostly = {
61 .family = AF_TIPC,
62 .addrtype = TIPC_ADDR_NAMESEQ,
63 .addr.nameseq.type = TIPC_TOP_SRV,
64 .addr.nameseq.lower = TIPC_TOP_SRV,
65 .addr.nameseq.upper = TIPC_TOP_SRV,
66 .scope = TIPC_NODE_SCOPE
67};
68
69static struct tipc_server topsrv __read_mostly = {
70 .saddr = &topsrv_addr,
71 .imp = TIPC_CRITICAL_IMPORTANCE,
72 .type = SOCK_SEQPACKET,
73 .max_rcvbuf_size = sizeof(struct tipc_subscr),
74 .name = "topology_server",
75 .tipc_conn_recvmsg = subscr_conn_msg_event,
76 .tipc_conn_new = subscr_named_msg_event,
77 .tipc_conn_shutdown = subscr_conn_shutdown_event,
78};
79
80/** 53/**
81 * htohl - convert value to endianness used by destination 54 * htohl - convert value to endianness used by destination
82 * @in: value to convert 55 * @in: value to convert
@@ -93,6 +66,7 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
93 u32 found_upper, u32 event, u32 port_ref, 66 u32 found_upper, u32 event, u32 port_ref,
94 u32 node) 67 u32 node)
95{ 68{
69 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
96 struct tipc_subscriber *subscriber = sub->subscriber; 70 struct tipc_subscriber *subscriber = sub->subscriber;
97 struct kvec msg_sect; 71 struct kvec msg_sect;
98 72
@@ -103,8 +77,8 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
103 sub->evt.found_upper = htohl(found_upper, sub->swap); 77 sub->evt.found_upper = htohl(found_upper, sub->swap);
104 sub->evt.port.ref = htohl(port_ref, sub->swap); 78 sub->evt.port.ref = htohl(port_ref, sub->swap);
105 sub->evt.port.node = htohl(node, sub->swap); 79 sub->evt.port.node = htohl(node, sub->swap);
106 tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base, 80 tipc_conn_sendmsg(tn->topsrv, subscriber->conid, NULL,
107 msg_sect.iov_len); 81 msg_sect.iov_base, msg_sect.iov_len);
108} 82}
109 83
110/** 84/**
@@ -141,9 +115,11 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
141 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 115 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
142} 116}
143 117
144static void subscr_timeout(struct tipc_subscription *sub) 118static void subscr_timeout(unsigned long data)
145{ 119{
120 struct tipc_subscription *sub = (struct tipc_subscription *)data;
146 struct tipc_subscriber *subscriber = sub->subscriber; 121 struct tipc_subscriber *subscriber = sub->subscriber;
122 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
147 123
148 /* The spin lock per subscriber is used to protect its members */ 124 /* The spin lock per subscriber is used to protect its members */
149 spin_lock_bh(&subscriber->lock); 125 spin_lock_bh(&subscriber->lock);
@@ -167,9 +143,8 @@ static void subscr_timeout(struct tipc_subscription *sub)
167 TIPC_SUBSCR_TIMEOUT, 0, 0); 143 TIPC_SUBSCR_TIMEOUT, 0, 0);
168 144
169 /* Now destroy subscription */ 145 /* Now destroy subscription */
170 k_term_timer(&sub->timer);
171 kfree(sub); 146 kfree(sub);
172 atomic_dec(&subscription_count); 147 atomic_dec(&tn->subscription_count);
173} 148}
174 149
175/** 150/**
@@ -179,20 +154,12 @@ static void subscr_timeout(struct tipc_subscription *sub)
179 */ 154 */
180static void subscr_del(struct tipc_subscription *sub) 155static void subscr_del(struct tipc_subscription *sub)
181{ 156{
157 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
158
182 tipc_nametbl_unsubscribe(sub); 159 tipc_nametbl_unsubscribe(sub);
183 list_del(&sub->subscription_list); 160 list_del(&sub->subscription_list);
184 kfree(sub); 161 kfree(sub);
185 atomic_dec(&subscription_count); 162 atomic_dec(&tn->subscription_count);
186}
187
188/**
189 * subscr_terminate - terminate communication with a subscriber
190 *
191 * Note: Must call it in process context since it might sleep.
192 */
193static void subscr_terminate(struct tipc_subscriber *subscriber)
194{
195 tipc_conn_terminate(&topsrv, subscriber->conid);
196} 163}
197 164
198static void subscr_release(struct tipc_subscriber *subscriber) 165static void subscr_release(struct tipc_subscriber *subscriber)
@@ -207,8 +174,7 @@ static void subscr_release(struct tipc_subscriber *subscriber)
207 subscription_list) { 174 subscription_list) {
208 if (sub->timeout != TIPC_WAIT_FOREVER) { 175 if (sub->timeout != TIPC_WAIT_FOREVER) {
209 spin_unlock_bh(&subscriber->lock); 176 spin_unlock_bh(&subscriber->lock);
210 k_cancel_timer(&sub->timer); 177 del_timer_sync(&sub->timer);
211 k_term_timer(&sub->timer);
212 spin_lock_bh(&subscriber->lock); 178 spin_lock_bh(&subscriber->lock);
213 } 179 }
214 subscr_del(sub); 180 subscr_del(sub);
@@ -250,8 +216,7 @@ static void subscr_cancel(struct tipc_subscr *s,
250 if (sub->timeout != TIPC_WAIT_FOREVER) { 216 if (sub->timeout != TIPC_WAIT_FOREVER) {
251 sub->timeout = TIPC_WAIT_FOREVER; 217 sub->timeout = TIPC_WAIT_FOREVER;
252 spin_unlock_bh(&subscriber->lock); 218 spin_unlock_bh(&subscriber->lock);
253 k_cancel_timer(&sub->timer); 219 del_timer_sync(&sub->timer);
254 k_term_timer(&sub->timer);
255 spin_lock_bh(&subscriber->lock); 220 spin_lock_bh(&subscriber->lock);
256 } 221 }
257 subscr_del(sub); 222 subscr_del(sub);
@@ -262,9 +227,11 @@ static void subscr_cancel(struct tipc_subscr *s,
262 * 227 *
263 * Called with subscriber lock held. 228 * Called with subscriber lock held.
264 */ 229 */
265static int subscr_subscribe(struct tipc_subscr *s, 230static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
266 struct tipc_subscriber *subscriber, 231 struct tipc_subscriber *subscriber,
267 struct tipc_subscription **sub_p) { 232 struct tipc_subscription **sub_p)
233{
234 struct tipc_net *tn = net_generic(net, tipc_net_id);
268 struct tipc_subscription *sub; 235 struct tipc_subscription *sub;
269 int swap; 236 int swap;
270 237
@@ -279,7 +246,7 @@ static int subscr_subscribe(struct tipc_subscr *s,
279 } 246 }
280 247
281 /* Refuse subscription if global limit exceeded */ 248 /* Refuse subscription if global limit exceeded */
282 if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 249 if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
283 pr_warn("Subscription rejected, limit reached (%u)\n", 250 pr_warn("Subscription rejected, limit reached (%u)\n",
284 TIPC_MAX_SUBSCRIPTIONS); 251 TIPC_MAX_SUBSCRIPTIONS);
285 return -EINVAL; 252 return -EINVAL;
@@ -293,10 +260,11 @@ static int subscr_subscribe(struct tipc_subscr *s,
293 } 260 }
294 261
295 /* Initialize subscription object */ 262 /* Initialize subscription object */
263 sub->net = net;
296 sub->seq.type = htohl(s->seq.type, swap); 264 sub->seq.type = htohl(s->seq.type, swap);
297 sub->seq.lower = htohl(s->seq.lower, swap); 265 sub->seq.lower = htohl(s->seq.lower, swap);
298 sub->seq.upper = htohl(s->seq.upper, swap); 266 sub->seq.upper = htohl(s->seq.upper, swap);
299 sub->timeout = htohl(s->timeout, swap); 267 sub->timeout = msecs_to_jiffies(htohl(s->timeout, swap));
300 sub->filter = htohl(s->filter, swap); 268 sub->filter = htohl(s->filter, swap);
301 if ((!(sub->filter & TIPC_SUB_PORTS) == 269 if ((!(sub->filter & TIPC_SUB_PORTS) ==
302 !(sub->filter & TIPC_SUB_SERVICE)) || 270 !(sub->filter & TIPC_SUB_SERVICE)) ||
@@ -309,11 +277,10 @@ static int subscr_subscribe(struct tipc_subscr *s,
309 sub->subscriber = subscriber; 277 sub->subscriber = subscriber;
310 sub->swap = swap; 278 sub->swap = swap;
311 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 279 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
312 atomic_inc(&subscription_count); 280 atomic_inc(&tn->subscription_count);
313 if (sub->timeout != TIPC_WAIT_FOREVER) { 281 if (sub->timeout != TIPC_WAIT_FOREVER) {
314 k_init_timer(&sub->timer, 282 setup_timer(&sub->timer, subscr_timeout, (unsigned long)sub);
315 (Handler)subscr_timeout, (unsigned long)sub); 283 mod_timer(&sub->timer, jiffies + sub->timeout);
316 k_start_timer(&sub->timer, sub->timeout);
317 } 284 }
318 *sub_p = sub; 285 *sub_p = sub;
319 return 0; 286 return 0;
@@ -326,24 +293,23 @@ static void subscr_conn_shutdown_event(int conid, void *usr_data)
326} 293}
327 294
328/* Handle one request to create a new subscription for the subscriber */ 295/* Handle one request to create a new subscription for the subscriber */
329static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, 296static void subscr_conn_msg_event(struct net *net, int conid,
330 void *usr_data, void *buf, size_t len) 297 struct sockaddr_tipc *addr, void *usr_data,
298 void *buf, size_t len)
331{ 299{
332 struct tipc_subscriber *subscriber = usr_data; 300 struct tipc_subscriber *subscriber = usr_data;
333 struct tipc_subscription *sub = NULL; 301 struct tipc_subscription *sub = NULL;
302 struct tipc_net *tn = net_generic(net, tipc_net_id);
334 303
335 spin_lock_bh(&subscriber->lock); 304 spin_lock_bh(&subscriber->lock);
336 if (subscr_subscribe((struct tipc_subscr *)buf, subscriber, &sub) < 0) { 305 subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, &sub);
337 spin_unlock_bh(&subscriber->lock);
338 subscr_terminate(subscriber);
339 return;
340 }
341 if (sub) 306 if (sub)
342 tipc_nametbl_subscribe(sub); 307 tipc_nametbl_subscribe(sub);
308 else
309 tipc_conn_terminate(tn->topsrv, subscriber->conid);
343 spin_unlock_bh(&subscriber->lock); 310 spin_unlock_bh(&subscriber->lock);
344} 311}
345 312
346
347/* Handle one request to establish a new subscriber */ 313/* Handle one request to establish a new subscriber */
348static void *subscr_named_msg_event(int conid) 314static void *subscr_named_msg_event(int conid)
349{ 315{
@@ -362,12 +328,50 @@ static void *subscr_named_msg_event(int conid)
362 return (void *)subscriber; 328 return (void *)subscriber;
363} 329}
364 330
365int tipc_subscr_start(void) 331int tipc_subscr_start(struct net *net)
366{ 332{
367 return tipc_server_start(&topsrv); 333 struct tipc_net *tn = net_generic(net, tipc_net_id);
334 const char name[] = "topology_server";
335 struct tipc_server *topsrv;
336 struct sockaddr_tipc *saddr;
337
338 saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC);
339 if (!saddr)
340 return -ENOMEM;
341 saddr->family = AF_TIPC;
342 saddr->addrtype = TIPC_ADDR_NAMESEQ;
343 saddr->addr.nameseq.type = TIPC_TOP_SRV;
344 saddr->addr.nameseq.lower = TIPC_TOP_SRV;
345 saddr->addr.nameseq.upper = TIPC_TOP_SRV;
346 saddr->scope = TIPC_NODE_SCOPE;
347
348 topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC);
349 if (!topsrv) {
350 kfree(saddr);
351 return -ENOMEM;
352 }
353 topsrv->net = net;
354 topsrv->saddr = saddr;
355 topsrv->imp = TIPC_CRITICAL_IMPORTANCE;
356 topsrv->type = SOCK_SEQPACKET;
357 topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
358 topsrv->tipc_conn_recvmsg = subscr_conn_msg_event;
359 topsrv->tipc_conn_new = subscr_named_msg_event;
360 topsrv->tipc_conn_shutdown = subscr_conn_shutdown_event;
361
362 strncpy(topsrv->name, name, strlen(name) + 1);
363 tn->topsrv = topsrv;
364 atomic_set(&tn->subscription_count, 0);
365
366 return tipc_server_start(topsrv);
368} 367}
369 368
370void tipc_subscr_stop(void) 369void tipc_subscr_stop(struct net *net)
371{ 370{
372 tipc_server_stop(&topsrv); 371 struct tipc_net *tn = net_generic(net, tipc_net_id);
372 struct tipc_server *topsrv = tn->topsrv;
373
374 tipc_server_stop(topsrv);
375 kfree(topsrv->saddr);
376 kfree(topsrv);
373} 377}