aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/subscr.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/subscr.c')
-rw-r--r--net/tipc/subscr.c242
1 files changed, 110 insertions, 132 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 1c147c869c2e..350cca33ee0a 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -40,16 +40,21 @@
40 40
41/** 41/**
42 * struct tipc_subscriber - TIPC network topology subscriber 42 * struct tipc_subscriber - TIPC network topology subscriber
43 * @kref: reference counter to tipc_subscription object
43 * @conid: connection identifier to server connecting to subscriber 44 * @conid: connection identifier to server connecting to subscriber
44 * @lock: control access to subscriber 45 * @lock: control access to subscriber
45 * @subscription_list: list of subscription objects for this subscriber 46 * @subscrp_list: list of subscription objects for this subscriber
46 */ 47 */
47struct tipc_subscriber { 48struct tipc_subscriber {
49 struct kref kref;
48 int conid; 50 int conid;
49 spinlock_t lock; 51 spinlock_t lock;
50 struct list_head subscription_list; 52 struct list_head subscrp_list;
51}; 53};
52 54
55static void tipc_subscrp_delete(struct tipc_subscription *sub);
56static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
57
53/** 58/**
54 * htohl - convert value to endianness used by destination 59 * htohl - convert value to endianness used by destination
55 * @in: value to convert 60 * @in: value to convert
@@ -62,9 +67,9 @@ static u32 htohl(u32 in, int swap)
62 return swap ? swab32(in) : in; 67 return swap ? swab32(in) : in;
63} 68}
64 69
65static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, 70static void tipc_subscrp_send_event(struct tipc_subscription *sub,
66 u32 found_upper, u32 event, u32 port_ref, 71 u32 found_lower, u32 found_upper,
67 u32 node) 72 u32 event, u32 port_ref, u32 node)
68{ 73{
69 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 74 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
70 struct tipc_subscriber *subscriber = sub->subscriber; 75 struct tipc_subscriber *subscriber = sub->subscriber;
@@ -82,12 +87,13 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
82} 87}
83 88
84/** 89/**
85 * tipc_subscr_overlap - test for subscription overlap with the given values 90 * tipc_subscrp_check_overlap - test for subscription overlap with the
91 * given values
86 * 92 *
87 * Returns 1 if there is overlap, otherwise 0. 93 * Returns 1 if there is overlap, otherwise 0.
88 */ 94 */
89int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, 95int tipc_subscrp_check_overlap(struct tipc_subscription *sub, u32 found_lower,
90 u32 found_upper) 96 u32 found_upper)
91{ 97{
92 if (found_lower < sub->seq.lower) 98 if (found_lower < sub->seq.lower)
93 found_lower = sub->seq.lower; 99 found_lower = sub->seq.lower;
@@ -98,138 +104,121 @@ int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
98 return 1; 104 return 1;
99} 105}
100 106
101/** 107void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
102 * tipc_subscr_report_overlap - issue event if there is subscription overlap 108 u32 found_upper, u32 event, u32 port_ref,
103 * 109 u32 node, int must)
104 * Protected by nameseq.lock in name_table.c
105 */
106void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
107 u32 found_upper, u32 event, u32 port_ref,
108 u32 node, int must)
109{ 110{
110 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 111 if (!tipc_subscrp_check_overlap(sub, found_lower, found_upper))
111 return; 112 return;
112 if (!must && !(sub->filter & TIPC_SUB_PORTS)) 113 if (!must && !(sub->filter & TIPC_SUB_PORTS))
113 return; 114 return;
114 115
115 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 116 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
117 node);
116} 118}
117 119
118static void subscr_timeout(unsigned long data) 120static void tipc_subscrp_timeout(unsigned long data)
119{ 121{
120 struct tipc_subscription *sub = (struct tipc_subscription *)data; 122 struct tipc_subscription *sub = (struct tipc_subscription *)data;
121 struct tipc_subscriber *subscriber = sub->subscriber; 123 struct tipc_subscriber *subscriber = sub->subscriber;
122 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
123 124
124 /* The spin lock per subscriber is used to protect its members */ 125 /* Notify subscriber of timeout */
125 spin_lock_bh(&subscriber->lock); 126 tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
127 TIPC_SUBSCR_TIMEOUT, 0, 0);
126 128
127 /* Validate timeout (in case subscription is being cancelled) */ 129 spin_lock_bh(&subscriber->lock);
128 if (sub->timeout == TIPC_WAIT_FOREVER) { 130 tipc_subscrp_delete(sub);
129 spin_unlock_bh(&subscriber->lock); 131 spin_unlock_bh(&subscriber->lock);
130 return;
131 }
132 132
133 /* Unlink subscription from name table */ 133 tipc_subscrb_put(subscriber);
134 tipc_nametbl_unsubscribe(sub); 134}
135 135
136 /* Unlink subscription from subscriber */ 136static void tipc_subscrb_kref_release(struct kref *kref)
137 list_del(&sub->subscription_list); 137{
138 struct tipc_subscriber *subcriber = container_of(kref,
139 struct tipc_subscriber, kref);
138 140
139 spin_unlock_bh(&subscriber->lock); 141 kfree(subcriber);
142}
140 143
141 /* Notify subscriber of timeout */ 144static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
142 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 145{
143 TIPC_SUBSCR_TIMEOUT, 0, 0); 146 kref_put(&subscriber->kref, tipc_subscrb_kref_release);
147}
144 148
145 /* Now destroy subscription */ 149static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
146 kfree(sub); 150{
147 atomic_dec(&tn->subscription_count); 151 kref_get(&subscriber->kref);
148} 152}
149 153
150/** 154static struct tipc_subscriber *tipc_subscrb_create(int conid)
151 * subscr_del - delete a subscription within a subscription list
152 *
153 * Called with subscriber lock held.
154 */
155static void subscr_del(struct tipc_subscription *sub)
156{ 155{
157 struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 156 struct tipc_subscriber *subscriber;
158 157
159 tipc_nametbl_unsubscribe(sub); 158 subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
160 list_del(&sub->subscription_list); 159 if (!subscriber) {
161 kfree(sub); 160 pr_warn("Subscriber rejected, no memory\n");
162 atomic_dec(&tn->subscription_count); 161 return NULL;
162 }
163 kref_init(&subscriber->kref);
164 INIT_LIST_HEAD(&subscriber->subscrp_list);
165 subscriber->conid = conid;
166 spin_lock_init(&subscriber->lock);
167
168 return subscriber;
163} 169}
164 170
165static void subscr_release(struct tipc_subscriber *subscriber) 171static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
166{ 172{
167 struct tipc_subscription *sub; 173 struct tipc_subscription *sub, *temp;
168 struct tipc_subscription *sub_temp;
169 174
170 spin_lock_bh(&subscriber->lock); 175 spin_lock_bh(&subscriber->lock);
171
172 /* Destroy any existing subscriptions for subscriber */ 176 /* Destroy any existing subscriptions for subscriber */
173 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 177 list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
174 subscription_list) { 178 subscrp_list) {
175 if (sub->timeout != TIPC_WAIT_FOREVER) { 179 if (del_timer(&sub->timer)) {
176 spin_unlock_bh(&subscriber->lock); 180 tipc_subscrp_delete(sub);
177 del_timer_sync(&sub->timer); 181 tipc_subscrb_put(subscriber);
178 spin_lock_bh(&subscriber->lock);
179 } 182 }
180 subscr_del(sub);
181 } 183 }
182 spin_unlock_bh(&subscriber->lock); 184 spin_unlock_bh(&subscriber->lock);
183 185
184 /* Now destroy subscriber */ 186 tipc_subscrb_put(subscriber);
185 kfree(subscriber);
186} 187}
187 188
188/** 189static void tipc_subscrp_delete(struct tipc_subscription *sub)
189 * subscr_cancel - handle subscription cancellation request
190 *
191 * Called with subscriber lock held. Routine must temporarily release lock
192 * to enable the subscription timeout routine to finish without deadlocking;
193 * the lock is then reclaimed to allow caller to release it upon return.
194 *
195 * Note that fields of 's' use subscriber's endianness!
196 */
197static void subscr_cancel(struct tipc_subscr *s,
198 struct tipc_subscriber *subscriber)
199{ 190{
200 struct tipc_subscription *sub; 191 struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
201 struct tipc_subscription *sub_temp; 192
202 int found = 0; 193 tipc_nametbl_unsubscribe(sub);
194 list_del(&sub->subscrp_list);
195 kfree(sub);
196 atomic_dec(&tn->subscription_count);
197}
203 198
199static void tipc_subscrp_cancel(struct tipc_subscr *s,
200 struct tipc_subscriber *subscriber)
201{
202 struct tipc_subscription *sub, *temp;
203
204 spin_lock_bh(&subscriber->lock);
204 /* Find first matching subscription, exit if not found */ 205 /* Find first matching subscription, exit if not found */
205 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 206 list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
206 subscription_list) { 207 subscrp_list) {
207 if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { 208 if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
208 found = 1; 209 if (del_timer(&sub->timer)) {
210 tipc_subscrp_delete(sub);
211 tipc_subscrb_put(subscriber);
212 }
209 break; 213 break;
210 } 214 }
211 } 215 }
212 if (!found) 216 spin_unlock_bh(&subscriber->lock);
213 return;
214
215 /* Cancel subscription timer (if used), then delete subscription */
216 if (sub->timeout != TIPC_WAIT_FOREVER) {
217 sub->timeout = TIPC_WAIT_FOREVER;
218 spin_unlock_bh(&subscriber->lock);
219 del_timer_sync(&sub->timer);
220 spin_lock_bh(&subscriber->lock);
221 }
222 subscr_del(sub);
223} 217}
224 218
225/** 219static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s,
226 * subscr_subscribe - create subscription for subscriber 220 struct tipc_subscriber *subscriber,
227 * 221 struct tipc_subscription **sub_p)
228 * Called with subscriber lock held.
229 */
230static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
231 struct tipc_subscriber *subscriber,
232 struct tipc_subscription **sub_p)
233{ 222{
234 struct tipc_net *tn = net_generic(net, tipc_net_id); 223 struct tipc_net *tn = net_generic(net, tipc_net_id);
235 struct tipc_subscription *sub; 224 struct tipc_subscription *sub;
@@ -241,7 +230,7 @@ static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
241 /* Detect & process a subscription cancellation request */ 230 /* Detect & process a subscription cancellation request */
242 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { 231 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
243 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); 232 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
244 subscr_cancel(s, subscriber); 233 tipc_subscrp_cancel(s, subscriber);
245 return 0; 234 return 0;
246 } 235 }
247 236
@@ -273,62 +262,51 @@ static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
273 kfree(sub); 262 kfree(sub);
274 return -EINVAL; 263 return -EINVAL;
275 } 264 }
276 list_add(&sub->subscription_list, &subscriber->subscription_list); 265 spin_lock_bh(&subscriber->lock);
266 list_add(&sub->subscrp_list, &subscriber->subscrp_list);
267 spin_unlock_bh(&subscriber->lock);
277 sub->subscriber = subscriber; 268 sub->subscriber = subscriber;
278 sub->swap = swap; 269 sub->swap = swap;
279 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 270 memcpy(&sub->evt.s, s, sizeof(*s));
280 atomic_inc(&tn->subscription_count); 271 atomic_inc(&tn->subscription_count);
281 if (sub->timeout != TIPC_WAIT_FOREVER) { 272 setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
282 setup_timer(&sub->timer, subscr_timeout, (unsigned long)sub); 273 if (sub->timeout != TIPC_WAIT_FOREVER)
283 mod_timer(&sub->timer, jiffies + sub->timeout); 274 sub->timeout += jiffies;
284 } 275 if (!mod_timer(&sub->timer, sub->timeout))
276 tipc_subscrb_get(subscriber);
285 *sub_p = sub; 277 *sub_p = sub;
286 return 0; 278 return 0;
287} 279}
288 280
289/* Handle one termination request for the subscriber */ 281/* Handle one termination request for the subscriber */
290static void subscr_conn_shutdown_event(int conid, void *usr_data) 282static void tipc_subscrb_shutdown_cb(int conid, void *usr_data)
291{ 283{
292 subscr_release((struct tipc_subscriber *)usr_data); 284 tipc_subscrb_delete((struct tipc_subscriber *)usr_data);
293} 285}
294 286
295/* Handle one request to create a new subscription for the subscriber */ 287/* Handle one request to create a new subscription for the subscriber */
296static void subscr_conn_msg_event(struct net *net, int conid, 288static void tipc_subscrb_rcv_cb(struct net *net, int conid,
297 struct sockaddr_tipc *addr, void *usr_data, 289 struct sockaddr_tipc *addr, void *usr_data,
298 void *buf, size_t len) 290 void *buf, size_t len)
299{ 291{
300 struct tipc_subscriber *subscriber = usr_data; 292 struct tipc_subscriber *subscriber = usr_data;
301 struct tipc_subscription *sub = NULL; 293 struct tipc_subscription *sub = NULL;
302 struct tipc_net *tn = net_generic(net, tipc_net_id); 294 struct tipc_net *tn = net_generic(net, tipc_net_id);
303 295
304 spin_lock_bh(&subscriber->lock); 296 tipc_subscrp_create(net, (struct tipc_subscr *)buf, subscriber, &sub);
305 subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, &sub);
306 if (sub) 297 if (sub)
307 tipc_nametbl_subscribe(sub); 298 tipc_nametbl_subscribe(sub);
308 else 299 else
309 tipc_conn_terminate(tn->topsrv, subscriber->conid); 300 tipc_conn_terminate(tn->topsrv, subscriber->conid);
310 spin_unlock_bh(&subscriber->lock);
311} 301}
312 302
313/* Handle one request to establish a new subscriber */ 303/* Handle one request to establish a new subscriber */
314static void *subscr_named_msg_event(int conid) 304static void *tipc_subscrb_connect_cb(int conid)
315{ 305{
316 struct tipc_subscriber *subscriber; 306 return (void *)tipc_subscrb_create(conid);
317
318 /* Create subscriber object */
319 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
320 if (subscriber == NULL) {
321 pr_warn("Subscriber rejected, no memory\n");
322 return NULL;
323 }
324 INIT_LIST_HEAD(&subscriber->subscription_list);
325 subscriber->conid = conid;
326 spin_lock_init(&subscriber->lock);
327
328 return (void *)subscriber;
329} 307}
330 308
331int tipc_subscr_start(struct net *net) 309int tipc_topsrv_start(struct net *net)
332{ 310{
333 struct tipc_net *tn = net_generic(net, tipc_net_id); 311 struct tipc_net *tn = net_generic(net, tipc_net_id);
334 const char name[] = "topology_server"; 312 const char name[] = "topology_server";
@@ -355,9 +333,9 @@ int tipc_subscr_start(struct net *net)
355 topsrv->imp = TIPC_CRITICAL_IMPORTANCE; 333 topsrv->imp = TIPC_CRITICAL_IMPORTANCE;
356 topsrv->type = SOCK_SEQPACKET; 334 topsrv->type = SOCK_SEQPACKET;
357 topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr); 335 topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
358 topsrv->tipc_conn_recvmsg = subscr_conn_msg_event; 336 topsrv->tipc_conn_recvmsg = tipc_subscrb_rcv_cb;
359 topsrv->tipc_conn_new = subscr_named_msg_event; 337 topsrv->tipc_conn_new = tipc_subscrb_connect_cb;
360 topsrv->tipc_conn_shutdown = subscr_conn_shutdown_event; 338 topsrv->tipc_conn_shutdown = tipc_subscrb_shutdown_cb;
361 339
362 strncpy(topsrv->name, name, strlen(name) + 1); 340 strncpy(topsrv->name, name, strlen(name) + 1);
363 tn->topsrv = topsrv; 341 tn->topsrv = topsrv;
@@ -366,7 +344,7 @@ int tipc_subscr_start(struct net *net)
366 return tipc_server_start(topsrv); 344 return tipc_server_start(topsrv);
367} 345}
368 346
369void tipc_subscr_stop(struct net *net) 347void tipc_topsrv_stop(struct net *net)
370{ 348{
371 struct tipc_net *tn = net_generic(net, tipc_net_id); 349 struct tipc_net *tn = net_generic(net, tipc_net_id);
372 struct tipc_server *topsrv = tn->topsrv; 350 struct tipc_server *topsrv = tn->topsrv;