aboutsummaryrefslogtreecommitdiffstats
path: root/net/strparser/strparser.c
diff options
context:
space:
mode:
authorTom Herbert <tom@herbertland.com>2016-08-15 17:51:01 -0400
committerDavid S. Miller <davem@davemloft.net>2016-08-17 19:36:23 -0400
commit43a0c6751a322847cb6fa0ab8cbf77a1d08bfc0a (patch)
tree008963333fdd337b5d57c05ffa2ec1d120c93b7f /net/strparser/strparser.c
parentd2d371ae5dd6af9a6a3d7f50b753627c42868409 (diff)
strparser: Stream parser for messages
This patch introduces a utility for parsing application layer protocol messages in a TCP stream. This is a generalization of the mechanism implemented of Kernel Connection Multiplexor. The API includes a context structure, a set of callbacks, utility functions, and a data ready function. A stream parser instance is defined by a strparse structure that is bound to a TCP socket. The function to initialize the structure is: int strp_init(struct strparser *strp, struct sock *csk, struct strp_callbacks *cb); csk is the TCP socket being bound to and cb are the parser callbacks. The upper layer calls strp_tcp_data_ready when data is ready on the lower socket for strparser to process. This should be called from a data_ready callback that is set on the socket: void strp_tcp_data_ready(struct strparser *strp); A parser is bound to a TCP socket by setting data_ready function to strp_tcp_data_ready so that all receive indications on the socket go through the parser. This is assumes that sk_user_data is set to the strparser structure. There are four callbacks. - parse_msg is called to parse the message (returns length or error). - rcv_msg is called when a complete message has been received - read_sock_done is called when data_ready function exits - abort_parser is called to abort the parser The input to parse_msg is an skbuff which contains next message under construction. The backend processing of parse_msg will parse the application layer protocol headers to determine the length of the message in the stream. The possible return values are: >0 : indicates length of successfully parsed message 0 : indicates more data must be received to parse the message -ESTRPIPE : current message should not be processed by the kernel, return control of the socket to userspace which can proceed to read the messages itself other < 0 : Error is parsing, give control back to userspace assuming that synchronzation is lost and the stream is unrecoverable (application expected to close TCP socket) In the case of error return (< 0) strparse will stop the parser and report and error to userspace. The application must deal with the error. To handle the error the strparser is unbound from the TCP socket. If the error indicates that the stream TCP socket is at recoverable point (ESTRPIPE) then the application can read the TCP socket to process the stream. Once the application has dealt with the exceptions in the stream, it may again bind the socket to a strparser to continue data operations. Note that ENODATA may be returned to the application. In this case parse_msg returned -ESTRPIPE, however strparser was unable to maintain synchronization of the stream (i.e. some of the message in question was already read by the parser). strp_pause and strp_unpause are used to provide flow control. For instance, if rcv_msg is called but the upper layer can't immediately consume the message it can hold the message and pause strparser. Signed-off-by: Tom Herbert <tom@herbertland.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/strparser/strparser.c')
-rw-r--r--net/strparser/strparser.c492
1 files changed, 492 insertions, 0 deletions
diff --git a/net/strparser/strparser.c b/net/strparser/strparser.c
new file mode 100644
index 000000000000..fd688c0a7744
--- /dev/null
+++ b/net/strparser/strparser.c
@@ -0,0 +1,492 @@
1/*
2 * Stream Parser
3 *
4 * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2
8 * as published by the Free Software Foundation.
9 */
10
11#include <linux/bpf.h>
12#include <linux/errno.h>
13#include <linux/errqueue.h>
14#include <linux/file.h>
15#include <linux/in.h>
16#include <linux/kernel.h>
17#include <linux/module.h>
18#include <linux/net.h>
19#include <linux/netdevice.h>
20#include <linux/poll.h>
21#include <linux/rculist.h>
22#include <linux/skbuff.h>
23#include <linux/socket.h>
24#include <linux/uaccess.h>
25#include <linux/workqueue.h>
26#include <net/strparser.h>
27#include <net/netns/generic.h>
28#include <net/sock.h>
29#include <net/tcp.h>
30
31static struct workqueue_struct *strp_wq;
32
33struct _strp_rx_msg {
34 /* Internal cb structure. struct strp_rx_msg must be first for passing
35 * to upper layer.
36 */
37 struct strp_rx_msg strp;
38 int accum_len;
39 int early_eaten;
40};
41
42static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb)
43{
44 return (struct _strp_rx_msg *)((void *)skb->cb +
45 offsetof(struct qdisc_skb_cb, data));
46}
47
48/* Lower lock held */
49static void strp_abort_rx_strp(struct strparser *strp, int err)
50{
51 struct sock *csk = strp->sk;
52
53 /* Unrecoverable error in receive */
54
55 del_timer(&strp->rx_msg_timer);
56
57 if (strp->rx_stopped)
58 return;
59
60 strp->rx_stopped = 1;
61
62 /* Report an error on the lower socket */
63 csk->sk_err = err;
64 csk->sk_error_report(csk);
65}
66
67static void strp_start_rx_timer(struct strparser *strp)
68{
69 if (strp->sk->sk_rcvtimeo)
70 mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo);
71}
72
73/* Lower lock held */
74static void strp_parser_err(struct strparser *strp, int err,
75 read_descriptor_t *desc)
76{
77 desc->error = err;
78 kfree_skb(strp->rx_skb_head);
79 strp->rx_skb_head = NULL;
80 strp->cb.abort_parser(strp, err);
81}
82
83/* Lower socket lock held */
84static int strp_tcp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
85 unsigned int orig_offset, size_t orig_len)
86{
87 struct strparser *strp = (struct strparser *)desc->arg.data;
88 struct _strp_rx_msg *rxm;
89 struct sk_buff *head, *skb;
90 size_t eaten = 0, cand_len;
91 ssize_t extra;
92 int err;
93 bool cloned_orig = false;
94
95 if (strp->rx_paused)
96 return 0;
97
98 head = strp->rx_skb_head;
99 if (head) {
100 /* Message already in progress */
101
102 rxm = _strp_rx_msg(head);
103 if (unlikely(rxm->early_eaten)) {
104 /* Already some number of bytes on the receive sock
105 * data saved in rx_skb_head, just indicate they
106 * are consumed.
107 */
108 eaten = orig_len <= rxm->early_eaten ?
109 orig_len : rxm->early_eaten;
110 rxm->early_eaten -= eaten;
111
112 return eaten;
113 }
114
115 if (unlikely(orig_offset)) {
116 /* Getting data with a non-zero offset when a message is
117 * in progress is not expected. If it does happen, we
118 * need to clone and pull since we can't deal with
119 * offsets in the skbs for a message expect in the head.
120 */
121 orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
122 if (!orig_skb) {
123 STRP_STATS_INCR(strp->stats.rx_mem_fail);
124 desc->error = -ENOMEM;
125 return 0;
126 }
127 if (!pskb_pull(orig_skb, orig_offset)) {
128 STRP_STATS_INCR(strp->stats.rx_mem_fail);
129 kfree_skb(orig_skb);
130 desc->error = -ENOMEM;
131 return 0;
132 }
133 cloned_orig = true;
134 orig_offset = 0;
135 }
136
137 if (!strp->rx_skb_nextp) {
138 /* We are going to append to the frags_list of head.
139 * Need to unshare the frag_list.
140 */
141 err = skb_unclone(head, GFP_ATOMIC);
142 if (err) {
143 STRP_STATS_INCR(strp->stats.rx_mem_fail);
144 desc->error = err;
145 return 0;
146 }
147
148 if (unlikely(skb_shinfo(head)->frag_list)) {
149 /* We can't append to an sk_buff that already
150 * has a frag_list. We create a new head, point
151 * the frag_list of that to the old head, and
152 * then are able to use the old head->next for
153 * appending to the message.
154 */
155 if (WARN_ON(head->next)) {
156 desc->error = -EINVAL;
157 return 0;
158 }
159
160 skb = alloc_skb(0, GFP_ATOMIC);
161 if (!skb) {
162 STRP_STATS_INCR(strp->stats.rx_mem_fail);
163 desc->error = -ENOMEM;
164 return 0;
165 }
166 skb->len = head->len;
167 skb->data_len = head->len;
168 skb->truesize = head->truesize;
169 *_strp_rx_msg(skb) = *_strp_rx_msg(head);
170 strp->rx_skb_nextp = &head->next;
171 skb_shinfo(skb)->frag_list = head;
172 strp->rx_skb_head = skb;
173 head = skb;
174 } else {
175 strp->rx_skb_nextp =
176 &skb_shinfo(head)->frag_list;
177 }
178 }
179 }
180
181 while (eaten < orig_len) {
182 /* Always clone since we will consume something */
183 skb = skb_clone(orig_skb, GFP_ATOMIC);
184 if (!skb) {
185 STRP_STATS_INCR(strp->stats.rx_mem_fail);
186 desc->error = -ENOMEM;
187 break;
188 }
189
190 cand_len = orig_len - eaten;
191
192 head = strp->rx_skb_head;
193 if (!head) {
194 head = skb;
195 strp->rx_skb_head = head;
196 /* Will set rx_skb_nextp on next packet if needed */
197 strp->rx_skb_nextp = NULL;
198 rxm = _strp_rx_msg(head);
199 memset(rxm, 0, sizeof(*rxm));
200 rxm->strp.offset = orig_offset + eaten;
201 } else {
202 /* Unclone since we may be appending to an skb that we
203 * already share a frag_list with.
204 */
205 err = skb_unclone(skb, GFP_ATOMIC);
206 if (err) {
207 STRP_STATS_INCR(strp->stats.rx_mem_fail);
208 desc->error = err;
209 break;
210 }
211
212 rxm = _strp_rx_msg(head);
213 *strp->rx_skb_nextp = skb;
214 strp->rx_skb_nextp = &skb->next;
215 head->data_len += skb->len;
216 head->len += skb->len;
217 head->truesize += skb->truesize;
218 }
219
220 if (!rxm->strp.full_len) {
221 ssize_t len;
222
223 len = (*strp->cb.parse_msg)(strp, head);
224
225 if (!len) {
226 /* Need more header to determine length */
227 if (!rxm->accum_len) {
228 /* Start RX timer for new message */
229 strp_start_rx_timer(strp);
230 }
231 rxm->accum_len += cand_len;
232 eaten += cand_len;
233 STRP_STATS_INCR(strp->stats.rx_need_more_hdr);
234 WARN_ON(eaten != orig_len);
235 break;
236 } else if (len < 0) {
237 if (len == -ESTRPIPE && rxm->accum_len) {
238 len = -ENODATA;
239 strp->rx_unrecov_intr = 1;
240 } else {
241 strp->rx_interrupted = 1;
242 }
243 strp_parser_err(strp, err, desc);
244 break;
245 } else if (len > strp->sk->sk_rcvbuf) {
246 /* Message length exceeds maximum allowed */
247 STRP_STATS_INCR(strp->stats.rx_msg_too_big);
248 strp_parser_err(strp, -EMSGSIZE, desc);
249 break;
250 } else if (len <= (ssize_t)head->len -
251 skb->len - rxm->strp.offset) {
252 /* Length must be into new skb (and also
253 * greater than zero)
254 */
255 STRP_STATS_INCR(strp->stats.rx_bad_hdr_len);
256 strp_parser_err(strp, -EPROTO, desc);
257 break;
258 }
259
260 rxm->strp.full_len = len;
261 }
262
263 extra = (ssize_t)(rxm->accum_len + cand_len) -
264 rxm->strp.full_len;
265
266 if (extra < 0) {
267 /* Message not complete yet. */
268 if (rxm->strp.full_len - rxm->accum_len >
269 tcp_inq(strp->sk)) {
270 /* Don't have the whole messages in the socket
271 * buffer. Set strp->rx_need_bytes to wait for
272 * the rest of the message. Also, set "early
273 * eaten" since we've already buffered the skb
274 * but don't consume yet per tcp_read_sock.
275 */
276
277 if (!rxm->accum_len) {
278 /* Start RX timer for new message */
279 strp_start_rx_timer(strp);
280 }
281
282 strp->rx_need_bytes = rxm->strp.full_len -
283 rxm->accum_len;
284 rxm->accum_len += cand_len;
285 rxm->early_eaten = cand_len;
286 STRP_STATS_ADD(strp->stats.rx_bytes, cand_len);
287 desc->count = 0; /* Stop reading socket */
288 break;
289 }
290 rxm->accum_len += cand_len;
291 eaten += cand_len;
292 WARN_ON(eaten != orig_len);
293 break;
294 }
295
296 /* Positive extra indicates ore bytes than needed for the
297 * message
298 */
299
300 WARN_ON(extra > cand_len);
301
302 eaten += (cand_len - extra);
303
304 /* Hurray, we have a new message! */
305 del_timer(&strp->rx_msg_timer);
306 strp->rx_skb_head = NULL;
307 STRP_STATS_INCR(strp->stats.rx_msgs);
308
309 /* Give skb to upper layer */
310 strp->cb.rcv_msg(strp, head);
311
312 if (unlikely(strp->rx_paused)) {
313 /* Upper layer paused strp */
314 break;
315 }
316 }
317
318 if (cloned_orig)
319 kfree_skb(orig_skb);
320
321 STRP_STATS_ADD(strp->stats.rx_bytes, eaten);
322
323 return eaten;
324}
325
326static int default_read_sock_done(struct strparser *strp, int err)
327{
328 return err;
329}
330
331/* Called with lock held on lower socket */
332static int strp_tcp_read_sock(struct strparser *strp)
333{
334 read_descriptor_t desc;
335
336 desc.arg.data = strp;
337 desc.error = 0;
338 desc.count = 1; /* give more than one skb per call */
339
340 /* sk should be locked here, so okay to do tcp_read_sock */
341 tcp_read_sock(strp->sk, &desc, strp_tcp_recv);
342
343 desc.error = strp->cb.read_sock_done(strp, desc.error);
344
345 return desc.error;
346}
347
348/* Lower sock lock held */
349void strp_tcp_data_ready(struct strparser *strp)
350{
351 struct sock *csk = strp->sk;
352
353 if (unlikely(strp->rx_stopped))
354 return;
355
356 /* This check is needed to synchronize with do_strp_rx_work.
357 * do_strp_rx_work acquires a process lock (lock_sock) whereas
358 * the lock held here is bh_lock_sock. The two locks can be
359 * held by different threads at the same time, but bh_lock_sock
360 * allows a thread in BH context to safely check if the process
361 * lock is held. In this case, if the lock is held, queue work.
362 */
363 if (sock_owned_by_user(csk)) {
364 queue_work(strp_wq, &strp->rx_work);
365 return;
366 }
367
368 if (strp->rx_paused)
369 return;
370
371 if (strp->rx_need_bytes) {
372 if (tcp_inq(csk) >= strp->rx_need_bytes)
373 strp->rx_need_bytes = 0;
374 else
375 return;
376 }
377
378 if (strp_tcp_read_sock(strp) == -ENOMEM)
379 queue_work(strp_wq, &strp->rx_work);
380}
381EXPORT_SYMBOL_GPL(strp_tcp_data_ready);
382
383static void do_strp_rx_work(struct strparser *strp)
384{
385 read_descriptor_t rd_desc;
386 struct sock *csk = strp->sk;
387
388 /* We need the read lock to synchronize with strp_tcp_data_ready. We
389 * need the socket lock for calling tcp_read_sock.
390 */
391 lock_sock(csk);
392
393 if (unlikely(csk->sk_user_data != strp))
394 goto out;
395
396 if (unlikely(strp->rx_stopped))
397 goto out;
398
399 if (strp->rx_paused)
400 goto out;
401
402 rd_desc.arg.data = strp;
403
404 if (strp_tcp_read_sock(strp) == -ENOMEM)
405 queue_work(strp_wq, &strp->rx_work);
406
407out:
408 release_sock(csk);
409}
410
411static void strp_rx_work(struct work_struct *w)
412{
413 do_strp_rx_work(container_of(w, struct strparser, rx_work));
414}
415
416static void strp_rx_msg_timeout(unsigned long arg)
417{
418 struct strparser *strp = (struct strparser *)arg;
419
420 /* Message assembly timed out */
421 STRP_STATS_INCR(strp->stats.rx_msg_timeouts);
422 lock_sock(strp->sk);
423 strp->cb.abort_parser(strp, ETIMEDOUT);
424 release_sock(strp->sk);
425}
426
427int strp_init(struct strparser *strp, struct sock *csk,
428 struct strp_callbacks *cb)
429{
430 if (!cb || !cb->rcv_msg || !cb->parse_msg)
431 return -EINVAL;
432
433 memset(strp, 0, sizeof(*strp));
434
435 strp->sk = csk;
436
437 setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout,
438 (unsigned long)strp);
439
440 INIT_WORK(&strp->rx_work, strp_rx_work);
441
442 strp->cb.rcv_msg = cb->rcv_msg;
443 strp->cb.parse_msg = cb->parse_msg;
444 strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done;
445 strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp;
446
447 return 0;
448}
449EXPORT_SYMBOL_GPL(strp_init);
450
451/* strp must already be stopped so that strp_tcp_recv will no longer be called.
452 * Note that strp_done is not called with the lower socket held.
453 */
454void strp_done(struct strparser *strp)
455{
456 WARN_ON(!strp->rx_stopped);
457
458 del_timer_sync(&strp->rx_msg_timer);
459 cancel_work_sync(&strp->rx_work);
460
461 if (strp->rx_skb_head) {
462 kfree_skb(strp->rx_skb_head);
463 strp->rx_skb_head = NULL;
464 }
465}
466EXPORT_SYMBOL_GPL(strp_done);
467
468void strp_stop(struct strparser *strp)
469{
470 strp->rx_stopped = 1;
471}
472EXPORT_SYMBOL_GPL(strp_stop);
473
474void strp_check_rcv(struct strparser *strp)
475{
476 queue_work(strp_wq, &strp->rx_work);
477}
478EXPORT_SYMBOL_GPL(strp_check_rcv);
479
480static int __init strp_mod_init(void)
481{
482 strp_wq = create_singlethread_workqueue("kstrp");
483
484 return 0;
485}
486
487static void __exit strp_mod_exit(void)
488{
489}
490module_init(strp_mod_init);
491module_exit(strp_mod_exit);
492MODULE_LICENSE("GPL");