summaryrefslogtreecommitdiffstats
path: root/net/strparser/strparser.c
diff options
context:
space:
mode:
authorTom Herbert <tom@quantonium.net>2017-07-28 19:22:43 -0400
committerDavid S. Miller <davem@davemloft.net>2017-08-01 18:26:19 -0400
commitbbb03029a899679d73e62d7e6ae80348cc5d0054 (patch)
tree33ac457d4f2fe6464309310015674afb6d553724 /net/strparser/strparser.c
parent20bf50de3028cb15fa81e1d1e63ab6e0c85257fc (diff)
strparser: Generalize strparser
Generalize strparser from more than just being used in conjunction with read_sock. strparser will also be used in the send path with zero proxy. The primary change is to create strp_process function that performs the critical processing on skbs. The documentation is also updated to reflect the new uses. Signed-off-by: Tom Herbert <tom@quantonium.net> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/strparser/strparser.c')
-rw-r--r--net/strparser/strparser.c313
1 files changed, 187 insertions, 126 deletions
diff --git a/net/strparser/strparser.c b/net/strparser/strparser.c
index b5c279b22680..0d18fbc6f870 100644
--- a/net/strparser/strparser.c
+++ b/net/strparser/strparser.c
@@ -29,44 +29,46 @@
29 29
30static struct workqueue_struct *strp_wq; 30static struct workqueue_struct *strp_wq;
31 31
32struct _strp_rx_msg { 32struct _strp_msg {
33 /* Internal cb structure. struct strp_rx_msg must be first for passing 33 /* Internal cb structure. struct strp_msg must be first for passing
34 * to upper layer. 34 * to upper layer.
35 */ 35 */
36 struct strp_rx_msg strp; 36 struct strp_msg strp;
37 int accum_len; 37 int accum_len;
38 int early_eaten; 38 int early_eaten;
39}; 39};
40 40
41static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb) 41static inline struct _strp_msg *_strp_msg(struct sk_buff *skb)
42{ 42{
43 return (struct _strp_rx_msg *)((void *)skb->cb + 43 return (struct _strp_msg *)((void *)skb->cb +
44 offsetof(struct qdisc_skb_cb, data)); 44 offsetof(struct qdisc_skb_cb, data));
45} 45}
46 46
47/* Lower lock held */ 47/* Lower lock held */
48static void strp_abort_rx_strp(struct strparser *strp, int err) 48static void strp_abort_strp(struct strparser *strp, int err)
49{ 49{
50 struct sock *csk = strp->sk;
51
52 /* Unrecoverable error in receive */ 50 /* Unrecoverable error in receive */
53 51
54 del_timer(&strp->rx_msg_timer); 52 del_timer(&strp->msg_timer);
55 53
56 if (strp->rx_stopped) 54 if (strp->stopped)
57 return; 55 return;
58 56
59 strp->rx_stopped = 1; 57 strp->stopped = 1;
58
59 if (strp->sk) {
60 struct sock *sk = strp->sk;
60 61
61 /* Report an error on the lower socket */ 62 /* Report an error on the lower socket */
62 csk->sk_err = err; 63 sk->sk_err = err;
63 csk->sk_error_report(csk); 64 sk->sk_error_report(sk);
65 }
64} 66}
65 67
66static void strp_start_rx_timer(struct strparser *strp) 68static void strp_start_timer(struct strparser *strp, long timeo)
67{ 69{
68 if (strp->sk->sk_rcvtimeo) 70 if (timeo)
69 mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo); 71 mod_timer(&strp->msg_timer, timeo);
70} 72}
71 73
72/* Lower lock held */ 74/* Lower lock held */
@@ -74,46 +76,55 @@ static void strp_parser_err(struct strparser *strp, int err,
74 read_descriptor_t *desc) 76 read_descriptor_t *desc)
75{ 77{
76 desc->error = err; 78 desc->error = err;
77 kfree_skb(strp->rx_skb_head); 79 kfree_skb(strp->skb_head);
78 strp->rx_skb_head = NULL; 80 strp->skb_head = NULL;
79 strp->cb.abort_parser(strp, err); 81 strp->cb.abort_parser(strp, err);
80} 82}
81 83
82static inline int strp_peek_len(struct strparser *strp) 84static inline int strp_peek_len(struct strparser *strp)
83{ 85{
84 struct socket *sock = strp->sk->sk_socket; 86 if (strp->sk) {
87 struct socket *sock = strp->sk->sk_socket;
88
89 return sock->ops->peek_len(sock);
90 }
91
92 /* If we don't have an associated socket there's nothing to peek.
93 * Return int max to avoid stopping the strparser.
94 */
85 95
86 return sock->ops->peek_len(sock); 96 return INT_MAX;
87} 97}
88 98
89/* Lower socket lock held */ 99/* Lower socket lock held */
90static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, 100static int __strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
91 unsigned int orig_offset, size_t orig_len) 101 unsigned int orig_offset, size_t orig_len,
102 size_t max_msg_size, long timeo)
92{ 103{
93 struct strparser *strp = (struct strparser *)desc->arg.data; 104 struct strparser *strp = (struct strparser *)desc->arg.data;
94 struct _strp_rx_msg *rxm; 105 struct _strp_msg *stm;
95 struct sk_buff *head, *skb; 106 struct sk_buff *head, *skb;
96 size_t eaten = 0, cand_len; 107 size_t eaten = 0, cand_len;
97 ssize_t extra; 108 ssize_t extra;
98 int err; 109 int err;
99 bool cloned_orig = false; 110 bool cloned_orig = false;
100 111
101 if (strp->rx_paused) 112 if (strp->paused)
102 return 0; 113 return 0;
103 114
104 head = strp->rx_skb_head; 115 head = strp->skb_head;
105 if (head) { 116 if (head) {
106 /* Message already in progress */ 117 /* Message already in progress */
107 118
108 rxm = _strp_rx_msg(head); 119 stm = _strp_msg(head);
109 if (unlikely(rxm->early_eaten)) { 120 if (unlikely(stm->early_eaten)) {
110 /* Already some number of bytes on the receive sock 121 /* Already some number of bytes on the receive sock
111 * data saved in rx_skb_head, just indicate they 122 * data saved in skb_head, just indicate they
112 * are consumed. 123 * are consumed.
113 */ 124 */
114 eaten = orig_len <= rxm->early_eaten ? 125 eaten = orig_len <= stm->early_eaten ?
115 orig_len : rxm->early_eaten; 126 orig_len : stm->early_eaten;
116 rxm->early_eaten -= eaten; 127 stm->early_eaten -= eaten;
117 128
118 return eaten; 129 return eaten;
119 } 130 }
@@ -126,12 +137,12 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
126 */ 137 */
127 orig_skb = skb_clone(orig_skb, GFP_ATOMIC); 138 orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
128 if (!orig_skb) { 139 if (!orig_skb) {
129 STRP_STATS_INCR(strp->stats.rx_mem_fail); 140 STRP_STATS_INCR(strp->stats.mem_fail);
130 desc->error = -ENOMEM; 141 desc->error = -ENOMEM;
131 return 0; 142 return 0;
132 } 143 }
133 if (!pskb_pull(orig_skb, orig_offset)) { 144 if (!pskb_pull(orig_skb, orig_offset)) {
134 STRP_STATS_INCR(strp->stats.rx_mem_fail); 145 STRP_STATS_INCR(strp->stats.mem_fail);
135 kfree_skb(orig_skb); 146 kfree_skb(orig_skb);
136 desc->error = -ENOMEM; 147 desc->error = -ENOMEM;
137 return 0; 148 return 0;
@@ -140,13 +151,13 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
140 orig_offset = 0; 151 orig_offset = 0;
141 } 152 }
142 153
143 if (!strp->rx_skb_nextp) { 154 if (!strp->skb_nextp) {
144 /* We are going to append to the frags_list of head. 155 /* We are going to append to the frags_list of head.
145 * Need to unshare the frag_list. 156 * Need to unshare the frag_list.
146 */ 157 */
147 err = skb_unclone(head, GFP_ATOMIC); 158 err = skb_unclone(head, GFP_ATOMIC);
148 if (err) { 159 if (err) {
149 STRP_STATS_INCR(strp->stats.rx_mem_fail); 160 STRP_STATS_INCR(strp->stats.mem_fail);
150 desc->error = err; 161 desc->error = err;
151 return 0; 162 return 0;
152 } 163 }
@@ -165,20 +176,20 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
165 176
166 skb = alloc_skb(0, GFP_ATOMIC); 177 skb = alloc_skb(0, GFP_ATOMIC);
167 if (!skb) { 178 if (!skb) {
168 STRP_STATS_INCR(strp->stats.rx_mem_fail); 179 STRP_STATS_INCR(strp->stats.mem_fail);
169 desc->error = -ENOMEM; 180 desc->error = -ENOMEM;
170 return 0; 181 return 0;
171 } 182 }
172 skb->len = head->len; 183 skb->len = head->len;
173 skb->data_len = head->len; 184 skb->data_len = head->len;
174 skb->truesize = head->truesize; 185 skb->truesize = head->truesize;
175 *_strp_rx_msg(skb) = *_strp_rx_msg(head); 186 *_strp_msg(skb) = *_strp_msg(head);
176 strp->rx_skb_nextp = &head->next; 187 strp->skb_nextp = &head->next;
177 skb_shinfo(skb)->frag_list = head; 188 skb_shinfo(skb)->frag_list = head;
178 strp->rx_skb_head = skb; 189 strp->skb_head = skb;
179 head = skb; 190 head = skb;
180 } else { 191 } else {
181 strp->rx_skb_nextp = 192 strp->skb_nextp =
182 &skb_shinfo(head)->frag_list; 193 &skb_shinfo(head)->frag_list;
183 } 194 }
184 } 195 }
@@ -188,112 +199,112 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
188 /* Always clone since we will consume something */ 199 /* Always clone since we will consume something */
189 skb = skb_clone(orig_skb, GFP_ATOMIC); 200 skb = skb_clone(orig_skb, GFP_ATOMIC);
190 if (!skb) { 201 if (!skb) {
191 STRP_STATS_INCR(strp->stats.rx_mem_fail); 202 STRP_STATS_INCR(strp->stats.mem_fail);
192 desc->error = -ENOMEM; 203 desc->error = -ENOMEM;
193 break; 204 break;
194 } 205 }
195 206
196 cand_len = orig_len - eaten; 207 cand_len = orig_len - eaten;
197 208
198 head = strp->rx_skb_head; 209 head = strp->skb_head;
199 if (!head) { 210 if (!head) {
200 head = skb; 211 head = skb;
201 strp->rx_skb_head = head; 212 strp->skb_head = head;
202 /* Will set rx_skb_nextp on next packet if needed */ 213 /* Will set skb_nextp on next packet if needed */
203 strp->rx_skb_nextp = NULL; 214 strp->skb_nextp = NULL;
204 rxm = _strp_rx_msg(head); 215 stm = _strp_msg(head);
205 memset(rxm, 0, sizeof(*rxm)); 216 memset(stm, 0, sizeof(*stm));
206 rxm->strp.offset = orig_offset + eaten; 217 stm->strp.offset = orig_offset + eaten;
207 } else { 218 } else {
208 /* Unclone since we may be appending to an skb that we 219 /* Unclone since we may be appending to an skb that we
209 * already share a frag_list with. 220 * already share a frag_list with.
210 */ 221 */
211 err = skb_unclone(skb, GFP_ATOMIC); 222 err = skb_unclone(skb, GFP_ATOMIC);
212 if (err) { 223 if (err) {
213 STRP_STATS_INCR(strp->stats.rx_mem_fail); 224 STRP_STATS_INCR(strp->stats.mem_fail);
214 desc->error = err; 225 desc->error = err;
215 break; 226 break;
216 } 227 }
217 228
218 rxm = _strp_rx_msg(head); 229 stm = _strp_msg(head);
219 *strp->rx_skb_nextp = skb; 230 *strp->skb_nextp = skb;
220 strp->rx_skb_nextp = &skb->next; 231 strp->skb_nextp = &skb->next;
221 head->data_len += skb->len; 232 head->data_len += skb->len;
222 head->len += skb->len; 233 head->len += skb->len;
223 head->truesize += skb->truesize; 234 head->truesize += skb->truesize;
224 } 235 }
225 236
226 if (!rxm->strp.full_len) { 237 if (!stm->strp.full_len) {
227 ssize_t len; 238 ssize_t len;
228 239
229 len = (*strp->cb.parse_msg)(strp, head); 240 len = (*strp->cb.parse_msg)(strp, head);
230 241
231 if (!len) { 242 if (!len) {
232 /* Need more header to determine length */ 243 /* Need more header to determine length */
233 if (!rxm->accum_len) { 244 if (!stm->accum_len) {
234 /* Start RX timer for new message */ 245 /* Start RX timer for new message */
235 strp_start_rx_timer(strp); 246 strp_start_timer(strp, timeo);
236 } 247 }
237 rxm->accum_len += cand_len; 248 stm->accum_len += cand_len;
238 eaten += cand_len; 249 eaten += cand_len;
239 STRP_STATS_INCR(strp->stats.rx_need_more_hdr); 250 STRP_STATS_INCR(strp->stats.need_more_hdr);
240 WARN_ON(eaten != orig_len); 251 WARN_ON(eaten != orig_len);
241 break; 252 break;
242 } else if (len < 0) { 253 } else if (len < 0) {
243 if (len == -ESTRPIPE && rxm->accum_len) { 254 if (len == -ESTRPIPE && stm->accum_len) {
244 len = -ENODATA; 255 len = -ENODATA;
245 strp->rx_unrecov_intr = 1; 256 strp->unrecov_intr = 1;
246 } else { 257 } else {
247 strp->rx_interrupted = 1; 258 strp->interrupted = 1;
248 } 259 }
249 strp_parser_err(strp, len, desc); 260 strp_parser_err(strp, len, desc);
250 break; 261 break;
251 } else if (len > strp->sk->sk_rcvbuf) { 262 } else if (len > max_msg_size) {
252 /* Message length exceeds maximum allowed */ 263 /* Message length exceeds maximum allowed */
253 STRP_STATS_INCR(strp->stats.rx_msg_too_big); 264 STRP_STATS_INCR(strp->stats.msg_too_big);
254 strp_parser_err(strp, -EMSGSIZE, desc); 265 strp_parser_err(strp, -EMSGSIZE, desc);
255 break; 266 break;
256 } else if (len <= (ssize_t)head->len - 267 } else if (len <= (ssize_t)head->len -
257 skb->len - rxm->strp.offset) { 268 skb->len - stm->strp.offset) {
258 /* Length must be into new skb (and also 269 /* Length must be into new skb (and also
259 * greater than zero) 270 * greater than zero)
260 */ 271 */
261 STRP_STATS_INCR(strp->stats.rx_bad_hdr_len); 272 STRP_STATS_INCR(strp->stats.bad_hdr_len);
262 strp_parser_err(strp, -EPROTO, desc); 273 strp_parser_err(strp, -EPROTO, desc);
263 break; 274 break;
264 } 275 }
265 276
266 rxm->strp.full_len = len; 277 stm->strp.full_len = len;
267 } 278 }
268 279
269 extra = (ssize_t)(rxm->accum_len + cand_len) - 280 extra = (ssize_t)(stm->accum_len + cand_len) -
270 rxm->strp.full_len; 281 stm->strp.full_len;
271 282
272 if (extra < 0) { 283 if (extra < 0) {
273 /* Message not complete yet. */ 284 /* Message not complete yet. */
274 if (rxm->strp.full_len - rxm->accum_len > 285 if (stm->strp.full_len - stm->accum_len >
275 strp_peek_len(strp)) { 286 strp_peek_len(strp)) {
276 /* Don't have the whole messages in the socket 287 /* Don't have the whole message in the socket
277 * buffer. Set strp->rx_need_bytes to wait for 288 * buffer. Set strp->need_bytes to wait for
278 * the rest of the message. Also, set "early 289 * the rest of the message. Also, set "early
279 * eaten" since we've already buffered the skb 290 * eaten" since we've already buffered the skb
280 * but don't consume yet per strp_read_sock. 291 * but don't consume yet per strp_read_sock.
281 */ 292 */
282 293
283 if (!rxm->accum_len) { 294 if (!stm->accum_len) {
284 /* Start RX timer for new message */ 295 /* Start RX timer for new message */
285 strp_start_rx_timer(strp); 296 strp_start_timer(strp, timeo);
286 } 297 }
287 298
288 strp->rx_need_bytes = rxm->strp.full_len - 299 strp->need_bytes = stm->strp.full_len -
289 rxm->accum_len; 300 stm->accum_len;
290 rxm->accum_len += cand_len; 301 stm->accum_len += cand_len;
291 rxm->early_eaten = cand_len; 302 stm->early_eaten = cand_len;
292 STRP_STATS_ADD(strp->stats.rx_bytes, cand_len); 303 STRP_STATS_ADD(strp->stats.bytes, cand_len);
293 desc->count = 0; /* Stop reading socket */ 304 desc->count = 0; /* Stop reading socket */
294 break; 305 break;
295 } 306 }
296 rxm->accum_len += cand_len; 307 stm->accum_len += cand_len;
297 eaten += cand_len; 308 eaten += cand_len;
298 WARN_ON(eaten != orig_len); 309 WARN_ON(eaten != orig_len);
299 break; 310 break;
@@ -308,14 +319,14 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
308 eaten += (cand_len - extra); 319 eaten += (cand_len - extra);
309 320
310 /* Hurray, we have a new message! */ 321 /* Hurray, we have a new message! */
311 del_timer(&strp->rx_msg_timer); 322 del_timer(&strp->msg_timer);
312 strp->rx_skb_head = NULL; 323 strp->skb_head = NULL;
313 STRP_STATS_INCR(strp->stats.rx_msgs); 324 STRP_STATS_INCR(strp->stats.msgs);
314 325
315 /* Give skb to upper layer */ 326 /* Give skb to upper layer */
316 strp->cb.rcv_msg(strp, head); 327 strp->cb.rcv_msg(strp, head);
317 328
318 if (unlikely(strp->rx_paused)) { 329 if (unlikely(strp->paused)) {
319 /* Upper layer paused strp */ 330 /* Upper layer paused strp */
320 break; 331 break;
321 } 332 }
@@ -324,11 +335,33 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
324 if (cloned_orig) 335 if (cloned_orig)
325 kfree_skb(orig_skb); 336 kfree_skb(orig_skb);
326 337
327 STRP_STATS_ADD(strp->stats.rx_bytes, eaten); 338 STRP_STATS_ADD(strp->stats.bytes, eaten);
328 339
329 return eaten; 340 return eaten;
330} 341}
331 342
343int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
344 unsigned int orig_offset, size_t orig_len,
345 size_t max_msg_size, long timeo)
346{
347 read_descriptor_t desc; /* Dummy arg to strp_recv */
348
349 desc.arg.data = strp;
350
351 return __strp_recv(&desc, orig_skb, orig_offset, orig_len,
352 max_msg_size, timeo);
353}
354EXPORT_SYMBOL_GPL(strp_process);
355
356static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
357 unsigned int orig_offset, size_t orig_len)
358{
359 struct strparser *strp = (struct strparser *)desc->arg.data;
360
361 return __strp_recv(desc, orig_skb, orig_offset, orig_len,
362 strp->sk->sk_rcvbuf, strp->sk->sk_rcvtimeo);
363}
364
332static int default_read_sock_done(struct strparser *strp, int err) 365static int default_read_sock_done(struct strparser *strp, int err)
333{ 366{
334 return err; 367 return err;
@@ -355,101 +388,129 @@ static int strp_read_sock(struct strparser *strp)
355/* Lower sock lock held */ 388/* Lower sock lock held */
356void strp_data_ready(struct strparser *strp) 389void strp_data_ready(struct strparser *strp)
357{ 390{
358 if (unlikely(strp->rx_stopped)) 391 if (unlikely(strp->stopped))
359 return; 392 return;
360 393
361 /* This check is needed to synchronize with do_strp_rx_work. 394 /* This check is needed to synchronize with do_strp_work.
362 * do_strp_rx_work acquires a process lock (lock_sock) whereas 395 * do_strp_work acquires a process lock (lock_sock) whereas
363 * the lock held here is bh_lock_sock. The two locks can be 396 * the lock held here is bh_lock_sock. The two locks can be
364 * held by different threads at the same time, but bh_lock_sock 397 * held by different threads at the same time, but bh_lock_sock
365 * allows a thread in BH context to safely check if the process 398 * allows a thread in BH context to safely check if the process
366 * lock is held. In this case, if the lock is held, queue work. 399 * lock is held. In this case, if the lock is held, queue work.
367 */ 400 */
368 if (sock_owned_by_user(strp->sk)) { 401 if (sock_owned_by_user(strp->sk)) {
369 queue_work(strp_wq, &strp->rx_work); 402 queue_work(strp_wq, &strp->work);
370 return; 403 return;
371 } 404 }
372 405
373 if (strp->rx_paused) 406 if (strp->paused)
374 return; 407 return;
375 408
376 if (strp->rx_need_bytes) { 409 if (strp->need_bytes) {
377 if (strp_peek_len(strp) >= strp->rx_need_bytes) 410 if (strp_peek_len(strp) >= strp->need_bytes)
378 strp->rx_need_bytes = 0; 411 strp->need_bytes = 0;
379 else 412 else
380 return; 413 return;
381 } 414 }
382 415
383 if (strp_read_sock(strp) == -ENOMEM) 416 if (strp_read_sock(strp) == -ENOMEM)
384 queue_work(strp_wq, &strp->rx_work); 417 queue_work(strp_wq, &strp->work);
385} 418}
386EXPORT_SYMBOL_GPL(strp_data_ready); 419EXPORT_SYMBOL_GPL(strp_data_ready);
387 420
388static void do_strp_rx_work(struct strparser *strp) 421static void do_strp_work(struct strparser *strp)
389{ 422{
390 read_descriptor_t rd_desc; 423 read_descriptor_t rd_desc;
391 struct sock *csk = strp->sk;
392 424
393 /* We need the read lock to synchronize with strp_data_ready. We 425 /* We need the read lock to synchronize with strp_data_ready. We
394 * need the socket lock for calling strp_read_sock. 426 * need the socket lock for calling strp_read_sock.
395 */ 427 */
396 lock_sock(csk); 428 strp->cb.lock(strp);
397 429
398 if (unlikely(strp->rx_stopped)) 430 if (unlikely(strp->stopped))
399 goto out; 431 goto out;
400 432
401 if (strp->rx_paused) 433 if (strp->paused)
402 goto out; 434 goto out;
403 435
404 rd_desc.arg.data = strp; 436 rd_desc.arg.data = strp;
405 437
406 if (strp_read_sock(strp) == -ENOMEM) 438 if (strp_read_sock(strp) == -ENOMEM)
407 queue_work(strp_wq, &strp->rx_work); 439 queue_work(strp_wq, &strp->work);
408 440
409out: 441out:
410 release_sock(csk); 442 strp->cb.unlock(strp);
411} 443}
412 444
413static void strp_rx_work(struct work_struct *w) 445static void strp_work(struct work_struct *w)
414{ 446{
415 do_strp_rx_work(container_of(w, struct strparser, rx_work)); 447 do_strp_work(container_of(w, struct strparser, work));
416} 448}
417 449
418static void strp_rx_msg_timeout(unsigned long arg) 450static void strp_msg_timeout(unsigned long arg)
419{ 451{
420 struct strparser *strp = (struct strparser *)arg; 452 struct strparser *strp = (struct strparser *)arg;
421 453
422 /* Message assembly timed out */ 454 /* Message assembly timed out */
423 STRP_STATS_INCR(strp->stats.rx_msg_timeouts); 455 STRP_STATS_INCR(strp->stats.msg_timeouts);
424 lock_sock(strp->sk); 456 strp->cb.lock(strp);
425 strp->cb.abort_parser(strp, ETIMEDOUT); 457 strp->cb.abort_parser(strp, ETIMEDOUT);
458 strp->cb.unlock(strp);
459}
460
461static void strp_sock_lock(struct strparser *strp)
462{
463 lock_sock(strp->sk);
464}
465
466static void strp_sock_unlock(struct strparser *strp)
467{
426 release_sock(strp->sk); 468 release_sock(strp->sk);
427} 469}
428 470
429int strp_init(struct strparser *strp, struct sock *csk, 471int strp_init(struct strparser *strp, struct sock *sk,
430 struct strp_callbacks *cb) 472 struct strp_callbacks *cb)
431{ 473{
432 struct socket *sock = csk->sk_socket;
433 474
434 if (!cb || !cb->rcv_msg || !cb->parse_msg) 475 if (!cb || !cb->rcv_msg || !cb->parse_msg)
435 return -EINVAL; 476 return -EINVAL;
436 477
437 if (!sock->ops->read_sock || !sock->ops->peek_len) 478 /* The sk (sock) arg determines the mode of the stream parser.
438 return -EAFNOSUPPORT; 479 *
480 * If the sock is set then the strparser is in receive callback mode.
481 * The upper layer calls strp_data_ready to kick receive processing
482 * and strparser calls the read_sock function on the socket to
483 * get packets.
484 *
485 * If the sock is not set then the strparser is in general mode.
486 * The upper layer calls strp_process for each skb to be parsed.
487 */
439 488
440 memset(strp, 0, sizeof(*strp)); 489 if (sk) {
490 struct socket *sock = sk->sk_socket;
441 491
442 strp->sk = csk; 492 if (!sock->ops->read_sock || !sock->ops->peek_len)
493 return -EAFNOSUPPORT;
494 } else {
495 if (!cb->lock || !cb->unlock)
496 return -EINVAL;
497 }
443 498
444 setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout, 499 memset(strp, 0, sizeof(*strp));
445 (unsigned long)strp);
446 500
447 INIT_WORK(&strp->rx_work, strp_rx_work); 501 strp->sk = sk;
448 502
503 strp->cb.lock = cb->lock ? : strp_sock_lock;
504 strp->cb.unlock = cb->unlock ? : strp_sock_unlock;
449 strp->cb.rcv_msg = cb->rcv_msg; 505 strp->cb.rcv_msg = cb->rcv_msg;
450 strp->cb.parse_msg = cb->parse_msg; 506 strp->cb.parse_msg = cb->parse_msg;
451 strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done; 507 strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done;
452 strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp; 508 strp->cb.abort_parser = cb->abort_parser ? : strp_abort_strp;
509
510 setup_timer(&strp->msg_timer, strp_msg_timeout,
511 (unsigned long)strp);
512
513 INIT_WORK(&strp->work, strp_work);
453 514
454 return 0; 515 return 0;
455} 516}
@@ -457,12 +518,12 @@ EXPORT_SYMBOL_GPL(strp_init);
457 518
458void strp_unpause(struct strparser *strp) 519void strp_unpause(struct strparser *strp)
459{ 520{
460 strp->rx_paused = 0; 521 strp->paused = 0;
461 522
462 /* Sync setting rx_paused with RX work */ 523 /* Sync setting paused with RX work */
463 smp_mb(); 524 smp_mb();
464 525
465 queue_work(strp_wq, &strp->rx_work); 526 queue_work(strp_wq, &strp->work);
466} 527}
467EXPORT_SYMBOL_GPL(strp_unpause); 528EXPORT_SYMBOL_GPL(strp_unpause);
468 529
@@ -471,27 +532,27 @@ EXPORT_SYMBOL_GPL(strp_unpause);
471 */ 532 */
472void strp_done(struct strparser *strp) 533void strp_done(struct strparser *strp)
473{ 534{
474 WARN_ON(!strp->rx_stopped); 535 WARN_ON(!strp->stopped);
475 536
476 del_timer_sync(&strp->rx_msg_timer); 537 del_timer_sync(&strp->msg_timer);
477 cancel_work_sync(&strp->rx_work); 538 cancel_work_sync(&strp->work);
478 539
479 if (strp->rx_skb_head) { 540 if (strp->skb_head) {
480 kfree_skb(strp->rx_skb_head); 541 kfree_skb(strp->skb_head);
481 strp->rx_skb_head = NULL; 542 strp->skb_head = NULL;
482 } 543 }
483} 544}
484EXPORT_SYMBOL_GPL(strp_done); 545EXPORT_SYMBOL_GPL(strp_done);
485 546
486void strp_stop(struct strparser *strp) 547void strp_stop(struct strparser *strp)
487{ 548{
488 strp->rx_stopped = 1; 549 strp->stopped = 1;
489} 550}
490EXPORT_SYMBOL_GPL(strp_stop); 551EXPORT_SYMBOL_GPL(strp_stop);
491 552
492void strp_check_rcv(struct strparser *strp) 553void strp_check_rcv(struct strparser *strp)
493{ 554{
494 queue_work(strp_wq, &strp->rx_work); 555 queue_work(strp_wq, &strp->work);
495} 556}
496EXPORT_SYMBOL_GPL(strp_check_rcv); 557EXPORT_SYMBOL_GPL(strp_check_rcv);
497 558