aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/tipc.h1
-rw-r--r--net/tipc/subscr.c99
2 files changed, 85 insertions, 15 deletions
diff --git a/include/linux/tipc.h b/include/linux/tipc.h
index 243a15f54002..bea469455a0c 100644
--- a/include/linux/tipc.h
+++ b/include/linux/tipc.h
@@ -129,6 +129,7 @@ static inline unsigned int tipc_node(__u32 addr)
129 129
130#define TIPC_SUB_PORTS 0x01 /* filter for port availability */ 130#define TIPC_SUB_PORTS 0x01 /* filter for port availability */
131#define TIPC_SUB_SERVICE 0x02 /* filter for service availability */ 131#define TIPC_SUB_SERVICE 0x02 /* filter for service availability */
132#define TIPC_SUB_CANCEL 0x04 /* cancel a subscription */
132#if 0 133#if 0
133/* The following filter options are not currently implemented */ 134/* The following filter options are not currently implemented */
134#define TIPC_SUB_NO_BIND_EVTS 0x04 /* filter out "publish" events */ 135#define TIPC_SUB_NO_BIND_EVTS 0x04 /* filter out "publish" events */
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index c51600ba5f4a..7a918f12a5df 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -155,7 +155,7 @@ void tipc_subscr_report_overlap(struct subscription *sub,
155 sub->seq.upper, found_lower, found_upper); 155 sub->seq.upper, found_lower, found_upper);
156 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 156 if (!tipc_subscr_overlap(sub, found_lower, found_upper))
157 return; 157 return;
158 if (!must && (sub->filter != TIPC_SUB_PORTS)) 158 if (!must && !(sub->filter & TIPC_SUB_PORTS))
159 return; 159 return;
160 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 160 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
161} 161}
@@ -176,6 +176,13 @@ static void subscr_timeout(struct subscription *sub)
176 if (subscriber == NULL) 176 if (subscriber == NULL)
177 return; 177 return;
178 178
179 /* Validate timeout (in case subscription is being cancelled) */
180
181 if (sub->timeout == TIPC_WAIT_FOREVER) {
182 tipc_ref_unlock(subscriber_ref);
183 return;
184 }
185
179 /* Unlink subscription from name table */ 186 /* Unlink subscription from name table */
180 187
181 tipc_nametbl_unsubscribe(sub); 188 tipc_nametbl_unsubscribe(sub);
@@ -199,6 +206,20 @@ static void subscr_timeout(struct subscription *sub)
199} 206}
200 207
201/** 208/**
209 * subscr_del - delete a subscription within a subscription list
210 *
211 * Called with subscriber locked.
212 */
213
214static void subscr_del(struct subscription *sub)
215{
216 tipc_nametbl_unsubscribe(sub);
217 list_del(&sub->subscription_list);
218 kfree(sub);
219 atomic_dec(&topsrv.subscription_count);
220}
221
222/**
202 * subscr_terminate - terminate communication with a subscriber 223 * subscr_terminate - terminate communication with a subscriber
203 * 224 *
204 * Called with subscriber locked. Routine must temporarily release this lock 225 * Called with subscriber locked. Routine must temporarily release this lock
@@ -227,12 +248,9 @@ static void subscr_terminate(struct subscriber *subscriber)
227 k_cancel_timer(&sub->timer); 248 k_cancel_timer(&sub->timer);
228 k_term_timer(&sub->timer); 249 k_term_timer(&sub->timer);
229 } 250 }
230 tipc_nametbl_unsubscribe(sub); 251 dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n",
231 list_del(&sub->subscription_list);
232 dbg("Term: Removed sub %u,%u,%u from subscriber %x list\n",
233 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 252 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
234 kfree(sub); 253 subscr_del(sub);
235 atomic_dec(&topsrv.subscription_count);
236 } 254 }
237 255
238 /* Sever connection to subscriber */ 256 /* Sever connection to subscriber */
@@ -253,6 +271,49 @@ static void subscr_terminate(struct subscriber *subscriber)
253} 271}
254 272
255/** 273/**
274 * subscr_cancel - handle subscription cancellation request
275 *
276 * Called with subscriber locked. Routine must temporarily release this lock
277 * to enable the subscription timeout routine to finish without deadlocking;
278 * the lock is then reclaimed to allow caller to release it upon return.
279 *
280 * Note that fields of 's' use subscriber's endianness!
281 */
282
283static void subscr_cancel(struct tipc_subscr *s,
284 struct subscriber *subscriber)
285{
286 struct subscription *sub;
287 struct subscription *sub_temp;
288 int found = 0;
289
290 /* Find first matching subscription, exit if not found */
291
292 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
293 subscription_list) {
294 if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
295 found = 1;
296 break;
297 }
298 }
299 if (!found)
300 return;
301
302 /* Cancel subscription timer (if used), then delete subscription */
303
304 if (sub->timeout != TIPC_WAIT_FOREVER) {
305 sub->timeout = TIPC_WAIT_FOREVER;
306 spin_unlock_bh(subscriber->lock);
307 k_cancel_timer(&sub->timer);
308 k_term_timer(&sub->timer);
309 spin_lock_bh(subscriber->lock);
310 }
311 dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n",
312 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
313 subscr_del(sub);
314}
315
316/**
256 * subscr_subscribe - create subscription for subscriber 317 * subscr_subscribe - create subscription for subscriber
257 * 318 *
258 * Called with subscriber locked 319 * Called with subscriber locked
@@ -263,6 +324,21 @@ static void subscr_subscribe(struct tipc_subscr *s,
263{ 324{
264 struct subscription *sub; 325 struct subscription *sub;
265 326
327 /* Determine/update subscriber's endianness */
328
329 if (s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE))
330 subscriber->swap = 0;
331 else
332 subscriber->swap = 1;
333
334 /* Detect & process a subscription cancellation request */
335
336 if (s->filter & htohl(TIPC_SUB_CANCEL, subscriber->swap)) {
337 s->filter &= ~htohl(TIPC_SUB_CANCEL, subscriber->swap);
338 subscr_cancel(s, subscriber);
339 return;
340 }
341
266 /* Refuse subscription if global limit exceeded */ 342 /* Refuse subscription if global limit exceeded */
267 343
268 if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) { 344 if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) {
@@ -281,13 +357,6 @@ static void subscr_subscribe(struct tipc_subscr *s,
281 return; 357 return;
282 } 358 }
283 359
284 /* Determine/update subscriber's endianness */
285
286 if ((s->filter == TIPC_SUB_PORTS) || (s->filter == TIPC_SUB_SERVICE))
287 subscriber->swap = 0;
288 else
289 subscriber->swap = 1;
290
291 /* Initialize subscription object */ 360 /* Initialize subscription object */
292 361
293 memset(sub, 0, sizeof(*sub)); 362 memset(sub, 0, sizeof(*sub));
@@ -296,8 +365,8 @@ static void subscr_subscribe(struct tipc_subscr *s,
296 sub->seq.upper = htohl(s->seq.upper, subscriber->swap); 365 sub->seq.upper = htohl(s->seq.upper, subscriber->swap);
297 sub->timeout = htohl(s->timeout, subscriber->swap); 366 sub->timeout = htohl(s->timeout, subscriber->swap);
298 sub->filter = htohl(s->filter, subscriber->swap); 367 sub->filter = htohl(s->filter, subscriber->swap);
299 if ((((sub->filter != TIPC_SUB_PORTS) 368 if ((!(sub->filter & TIPC_SUB_PORTS)
300 && (sub->filter != TIPC_SUB_SERVICE))) 369 == !(sub->filter & TIPC_SUB_SERVICE))
301 || (sub->seq.lower > sub->seq.upper)) { 370 || (sub->seq.lower > sub->seq.upper)) {
302 warn("Subscription rejected, illegal request\n"); 371 warn("Subscription rejected, illegal request\n");
303 kfree(sub); 372 kfree(sub);