aboutsummaryrefslogtreecommitdiffstats
path: root/net/rxrpc/ar-output.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc/ar-output.c')
-rw-r--r--net/rxrpc/ar-output.c658
1 files changed, 658 insertions, 0 deletions
diff --git a/net/rxrpc/ar-output.c b/net/rxrpc/ar-output.c
new file mode 100644
index 000000000000..67aa9510f09b
--- /dev/null
+++ b/net/rxrpc/ar-output.c
@@ -0,0 +1,658 @@
1/* RxRPC packet transmission
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/net.h>
13#include <linux/skbuff.h>
14#include <linux/circ_buf.h>
15#include <net/sock.h>
16#include <net/af_rxrpc.h>
17#include "ar-internal.h"
18
19int rxrpc_resend_timeout = 4;
20
21static int rxrpc_send_data(struct kiocb *iocb,
22 struct rxrpc_sock *rx,
23 struct rxrpc_call *call,
24 struct msghdr *msg, size_t len);
25
26/*
27 * extract control messages from the sendmsg() control buffer
28 */
29static int rxrpc_sendmsg_cmsg(struct rxrpc_sock *rx, struct msghdr *msg,
30 unsigned long *user_call_ID,
31 enum rxrpc_command *command,
32 u32 *abort_code,
33 bool server)
34{
35 struct cmsghdr *cmsg;
36 int len;
37
38 *command = RXRPC_CMD_SEND_DATA;
39
40 if (msg->msg_controllen == 0)
41 return -EINVAL;
42
43 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
44 if (!CMSG_OK(msg, cmsg))
45 return -EINVAL;
46
47 len = cmsg->cmsg_len - CMSG_ALIGN(sizeof(struct cmsghdr));
48 _debug("CMSG %d, %d, %d",
49 cmsg->cmsg_level, cmsg->cmsg_type, len);
50
51 if (cmsg->cmsg_level != SOL_RXRPC)
52 continue;
53
54 switch (cmsg->cmsg_type) {
55 case RXRPC_USER_CALL_ID:
56 if (msg->msg_flags & MSG_CMSG_COMPAT) {
57 if (len != sizeof(u32))
58 return -EINVAL;
59 *user_call_ID = *(u32 *) CMSG_DATA(cmsg);
60 } else {
61 if (len != sizeof(unsigned long))
62 return -EINVAL;
63 *user_call_ID = *(unsigned long *)
64 CMSG_DATA(cmsg);
65 }
66 _debug("User Call ID %lx", *user_call_ID);
67 break;
68
69 case RXRPC_ABORT:
70 if (*command != RXRPC_CMD_SEND_DATA)
71 return -EINVAL;
72 *command = RXRPC_CMD_SEND_ABORT;
73 if (len != sizeof(*abort_code))
74 return -EINVAL;
75 *abort_code = *(unsigned int *) CMSG_DATA(cmsg);
76 _debug("Abort %x", *abort_code);
77 if (*abort_code == 0)
78 return -EINVAL;
79 break;
80
81 case RXRPC_ACCEPT:
82 if (*command != RXRPC_CMD_SEND_DATA)
83 return -EINVAL;
84 *command = RXRPC_CMD_ACCEPT;
85 if (len != 0)
86 return -EINVAL;
87 if (!server)
88 return -EISCONN;
89 break;
90
91 default:
92 return -EINVAL;
93 }
94 }
95
96 _leave(" = 0");
97 return 0;
98}
99
100/*
101 * abort a call, sending an ABORT packet to the peer
102 */
103static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
104{
105 write_lock_bh(&call->state_lock);
106
107 if (call->state <= RXRPC_CALL_COMPLETE) {
108 call->state = RXRPC_CALL_LOCALLY_ABORTED;
109 call->abort_code = abort_code;
110 set_bit(RXRPC_CALL_ABORT, &call->events);
111 del_timer_sync(&call->resend_timer);
112 del_timer_sync(&call->ack_timer);
113 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
114 clear_bit(RXRPC_CALL_ACK, &call->events);
115 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
116 schedule_work(&call->processor);
117 }
118
119 write_unlock_bh(&call->state_lock);
120}
121
122/*
123 * send a message forming part of a client call through an RxRPC socket
124 * - caller holds the socket locked
125 * - the socket may be either a client socket or a server socket
126 */
127int rxrpc_client_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
128 struct rxrpc_transport *trans, struct msghdr *msg,
129 size_t len)
130{
131 struct rxrpc_conn_bundle *bundle;
132 enum rxrpc_command cmd;
133 struct rxrpc_call *call;
134 unsigned long user_call_ID = 0;
135 struct key *key;
136 __be16 service_id;
137 u32 abort_code = 0;
138 int ret;
139
140 _enter("");
141
142 ASSERT(trans != NULL);
143
144 ret = rxrpc_sendmsg_cmsg(rx, msg, &user_call_ID, &cmd, &abort_code,
145 false);
146 if (ret < 0)
147 return ret;
148
149 bundle = NULL;
150 if (trans) {
151 service_id = rx->service_id;
152 if (msg->msg_name) {
153 struct sockaddr_rxrpc *srx =
154 (struct sockaddr_rxrpc *) msg->msg_name;
155 service_id = htons(srx->srx_service);
156 }
157 key = rx->key;
158 if (key && !rx->key->payload.data)
159 key = NULL;
160 bundle = rxrpc_get_bundle(rx, trans, key, service_id,
161 GFP_KERNEL);
162 if (IS_ERR(bundle))
163 return PTR_ERR(bundle);
164 }
165
166 call = rxrpc_get_client_call(rx, trans, bundle, user_call_ID,
167 abort_code == 0, GFP_KERNEL);
168 if (trans)
169 rxrpc_put_bundle(trans, bundle);
170 if (IS_ERR(call)) {
171 _leave(" = %ld", PTR_ERR(call));
172 return PTR_ERR(call);
173 }
174
175 _debug("CALL %d USR %lx ST %d on CONN %p",
176 call->debug_id, call->user_call_ID, call->state, call->conn);
177
178 if (call->state >= RXRPC_CALL_COMPLETE) {
179 /* it's too late for this call */
180 ret = -ESHUTDOWN;
181 } else if (cmd == RXRPC_CMD_SEND_ABORT) {
182 rxrpc_send_abort(call, abort_code);
183 } else if (cmd != RXRPC_CMD_SEND_DATA) {
184 ret = -EINVAL;
185 } else if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST) {
186 /* request phase complete for this client call */
187 ret = -EPROTO;
188 } else {
189 ret = rxrpc_send_data(iocb, rx, call, msg, len);
190 }
191
192 rxrpc_put_call(call);
193 _leave(" = %d", ret);
194 return ret;
195}
196
197/*
198 * send a message through a server socket
199 * - caller holds the socket locked
200 */
201int rxrpc_server_sendmsg(struct kiocb *iocb, struct rxrpc_sock *rx,
202 struct msghdr *msg, size_t len)
203{
204 enum rxrpc_command cmd;
205 struct rxrpc_call *call;
206 unsigned long user_call_ID = 0;
207 u32 abort_code = 0;
208 int ret;
209
210 _enter("");
211
212 ret = rxrpc_sendmsg_cmsg(rx, msg, &user_call_ID, &cmd, &abort_code,
213 true);
214 if (ret < 0)
215 return ret;
216
217 if (cmd == RXRPC_CMD_ACCEPT)
218 return rxrpc_accept_call(rx, user_call_ID);
219
220 call = rxrpc_find_server_call(rx, user_call_ID);
221 if (!call)
222 return -EBADSLT;
223 if (call->state >= RXRPC_CALL_COMPLETE) {
224 ret = -ESHUTDOWN;
225 goto out;
226 }
227
228 switch (cmd) {
229 case RXRPC_CMD_SEND_DATA:
230 if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
231 call->state != RXRPC_CALL_SERVER_ACK_REQUEST &&
232 call->state != RXRPC_CALL_SERVER_SEND_REPLY) {
233 /* Tx phase not yet begun for this call */
234 ret = -EPROTO;
235 break;
236 }
237
238 ret = rxrpc_send_data(iocb, rx, call, msg, len);
239 break;
240
241 case RXRPC_CMD_SEND_ABORT:
242 rxrpc_send_abort(call, abort_code);
243 break;
244 default:
245 BUG();
246 }
247
248 out:
249 rxrpc_put_call(call);
250 _leave(" = %d", ret);
251 return ret;
252}
253
254/*
255 * send a packet through the transport endpoint
256 */
257int rxrpc_send_packet(struct rxrpc_transport *trans, struct sk_buff *skb)
258{
259 struct kvec iov[1];
260 struct msghdr msg;
261 int ret, opt;
262
263 _enter(",{%d}", skb->len);
264
265 iov[0].iov_base = skb->head;
266 iov[0].iov_len = skb->len;
267
268 msg.msg_name = &trans->peer->srx.transport.sin;
269 msg.msg_namelen = sizeof(trans->peer->srx.transport.sin);
270 msg.msg_control = NULL;
271 msg.msg_controllen = 0;
272 msg.msg_flags = 0;
273
274 /* send the packet with the don't fragment bit set if we currently
275 * think it's small enough */
276 if (skb->len - sizeof(struct rxrpc_header) < trans->peer->maxdata) {
277 down_read(&trans->local->defrag_sem);
278 /* send the packet by UDP
279 * - returns -EMSGSIZE if UDP would have to fragment the packet
280 * to go out of the interface
281 * - in which case, we'll have processed the ICMP error
282 * message and update the peer record
283 */
284 ret = kernel_sendmsg(trans->local->socket, &msg, iov, 1,
285 iov[0].iov_len);
286
287 up_read(&trans->local->defrag_sem);
288 if (ret == -EMSGSIZE)
289 goto send_fragmentable;
290
291 _leave(" = %d [%u]", ret, trans->peer->maxdata);
292 return ret;
293 }
294
295send_fragmentable:
296 /* attempt to send this message with fragmentation enabled */
297 _debug("send fragment");
298
299 down_write(&trans->local->defrag_sem);
300 opt = IP_PMTUDISC_DONT;
301 ret = kernel_setsockopt(trans->local->socket, SOL_IP, IP_MTU_DISCOVER,
302 (char *) &opt, sizeof(opt));
303 if (ret == 0) {
304 ret = kernel_sendmsg(trans->local->socket, &msg, iov, 1,
305 iov[0].iov_len);
306
307 opt = IP_PMTUDISC_DO;
308 kernel_setsockopt(trans->local->socket, SOL_IP,
309 IP_MTU_DISCOVER, (char *) &opt, sizeof(opt));
310 }
311
312 up_write(&trans->local->defrag_sem);
313 _leave(" = %d [frag %u]", ret, trans->peer->maxdata);
314 return ret;
315}
316
317/*
318 * wait for space to appear in the transmit/ACK window
319 * - caller holds the socket locked
320 */
321static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
322 struct rxrpc_call *call,
323 long *timeo)
324{
325 DECLARE_WAITQUEUE(myself, current);
326 int ret;
327
328 _enter(",{%d},%ld",
329 CIRC_SPACE(call->acks_head, call->acks_tail, call->acks_winsz),
330 *timeo);
331
332 add_wait_queue(&call->tx_waitq, &myself);
333
334 for (;;) {
335 set_current_state(TASK_INTERRUPTIBLE);
336 ret = 0;
337 if (CIRC_SPACE(call->acks_head, call->acks_tail,
338 call->acks_winsz) > 0)
339 break;
340 if (signal_pending(current)) {
341 ret = sock_intr_errno(*timeo);
342 break;
343 }
344
345 release_sock(&rx->sk);
346 *timeo = schedule_timeout(*timeo);
347 lock_sock(&rx->sk);
348 }
349
350 remove_wait_queue(&call->tx_waitq, &myself);
351 set_current_state(TASK_RUNNING);
352 _leave(" = %d", ret);
353 return ret;
354}
355
356/*
357 * attempt to schedule an instant Tx resend
358 */
359static inline void rxrpc_instant_resend(struct rxrpc_call *call)
360{
361 read_lock_bh(&call->state_lock);
362 if (try_to_del_timer_sync(&call->resend_timer) >= 0) {
363 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
364 if (call->state < RXRPC_CALL_COMPLETE &&
365 !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
366 schedule_work(&call->processor);
367 }
368 read_unlock_bh(&call->state_lock);
369}
370
371/*
372 * queue a packet for transmission, set the resend timer and attempt
373 * to send the packet immediately
374 */
375static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
376 bool last)
377{
378 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
379 int ret;
380
381 _net("queue skb %p [%d]", skb, call->acks_head);
382
383 ASSERT(call->acks_window != NULL);
384 call->acks_window[call->acks_head] = (unsigned long) skb;
385 smp_wmb();
386 call->acks_head = (call->acks_head + 1) & (call->acks_winsz - 1);
387
388 if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
389 _debug("________awaiting reply/ACK__________");
390 write_lock_bh(&call->state_lock);
391 switch (call->state) {
392 case RXRPC_CALL_CLIENT_SEND_REQUEST:
393 call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
394 break;
395 case RXRPC_CALL_SERVER_ACK_REQUEST:
396 call->state = RXRPC_CALL_SERVER_SEND_REPLY;
397 if (!last)
398 break;
399 case RXRPC_CALL_SERVER_SEND_REPLY:
400 call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
401 break;
402 default:
403 break;
404 }
405 write_unlock_bh(&call->state_lock);
406 }
407
408 _proto("Tx DATA %%%u { #%u }",
409 ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
410
411 sp->need_resend = 0;
412 sp->resend_at = jiffies + rxrpc_resend_timeout * HZ;
413 if (!test_and_set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags)) {
414 _debug("run timer");
415 call->resend_timer.expires = sp->resend_at;
416 add_timer(&call->resend_timer);
417 }
418
419 /* attempt to cancel the rx-ACK timer, deferring reply transmission if
420 * we're ACK'ing the request phase of an incoming call */
421 ret = -EAGAIN;
422 if (try_to_del_timer_sync(&call->ack_timer) >= 0) {
423 /* the packet may be freed by rxrpc_process_call() before this
424 * returns */
425 ret = rxrpc_send_packet(call->conn->trans, skb);
426 _net("sent skb %p", skb);
427 } else {
428 _debug("failed to delete ACK timer");
429 }
430
431 if (ret < 0) {
432 _debug("need instant resend %d", ret);
433 sp->need_resend = 1;
434 rxrpc_instant_resend(call);
435 }
436
437 _leave("");
438}
439
440/*
441 * send data through a socket
442 * - must be called in process context
443 * - caller holds the socket locked
444 */
445static int rxrpc_send_data(struct kiocb *iocb,
446 struct rxrpc_sock *rx,
447 struct rxrpc_call *call,
448 struct msghdr *msg, size_t len)
449{
450 struct rxrpc_skb_priv *sp;
451 unsigned char __user *from;
452 struct sk_buff *skb;
453 struct iovec *iov;
454 struct sock *sk = &rx->sk;
455 long timeo;
456 bool more;
457 int ret, ioc, segment, copied;
458
459 _enter(",,,{%zu},%zu", msg->msg_iovlen, len);
460
461 timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
462
463 /* this should be in poll */
464 clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
465
466 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
467 return -EPIPE;
468
469 iov = msg->msg_iov;
470 ioc = msg->msg_iovlen - 1;
471 from = iov->iov_base;
472 segment = iov->iov_len;
473 iov++;
474 more = msg->msg_flags & MSG_MORE;
475
476 skb = call->tx_pending;
477 call->tx_pending = NULL;
478
479 copied = 0;
480 do {
481 int copy;
482
483 if (segment > len)
484 segment = len;
485
486 _debug("SEGMENT %d @%p", segment, from);
487
488 if (!skb) {
489 size_t size, chunk, max, space;
490
491 _debug("alloc");
492
493 if (CIRC_SPACE(call->acks_head, call->acks_tail,
494 call->acks_winsz) <= 0) {
495 ret = -EAGAIN;
496 if (msg->msg_flags & MSG_DONTWAIT)
497 goto maybe_error;
498 ret = rxrpc_wait_for_tx_window(rx, call,
499 &timeo);
500 if (ret < 0)
501 goto maybe_error;
502 }
503
504 max = call->conn->trans->peer->maxdata;
505 max -= call->conn->security_size;
506 max &= ~(call->conn->size_align - 1UL);
507
508 chunk = max;
509 if (chunk > len)
510 chunk = len;
511
512 space = chunk + call->conn->size_align;
513 space &= ~(call->conn->size_align - 1UL);
514
515 size = space + call->conn->header_size;
516
517 _debug("SIZE: %zu/%zu/%zu", chunk, space, size);
518
519 /* create a buffer that we can retain until it's ACK'd */
520 skb = sock_alloc_send_skb(
521 sk, size, msg->msg_flags & MSG_DONTWAIT, &ret);
522 if (!skb)
523 goto maybe_error;
524
525 rxrpc_new_skb(skb);
526
527 _debug("ALLOC SEND %p", skb);
528
529 ASSERTCMP(skb->mark, ==, 0);
530
531 _debug("HS: %u", call->conn->header_size);
532 skb_reserve(skb, call->conn->header_size);
533 skb->len += call->conn->header_size;
534
535 sp = rxrpc_skb(skb);
536 sp->remain = chunk;
537 if (sp->remain > skb_tailroom(skb))
538 sp->remain = skb_tailroom(skb);
539
540 _net("skb: hr %d, tr %d, hl %d, rm %d",
541 skb_headroom(skb),
542 skb_tailroom(skb),
543 skb_headlen(skb),
544 sp->remain);
545
546 skb->ip_summed = CHECKSUM_UNNECESSARY;
547 }
548
549 _debug("append");
550 sp = rxrpc_skb(skb);
551
552 /* append next segment of data to the current buffer */
553 copy = skb_tailroom(skb);
554 ASSERTCMP(copy, >, 0);
555 if (copy > segment)
556 copy = segment;
557 if (copy > sp->remain)
558 copy = sp->remain;
559
560 _debug("add");
561 ret = skb_add_data(skb, from, copy);
562 _debug("added");
563 if (ret < 0)
564 goto efault;
565 sp->remain -= copy;
566 skb->mark += copy;
567
568 len -= copy;
569 segment -= copy;
570 from += copy;
571 while (segment == 0 && ioc > 0) {
572 from = iov->iov_base;
573 segment = iov->iov_len;
574 iov++;
575 ioc--;
576 }
577 if (len == 0) {
578 segment = 0;
579 ioc = 0;
580 }
581
582 /* check for the far side aborting the call or a network error
583 * occurring */
584 if (call->state > RXRPC_CALL_COMPLETE)
585 goto call_aborted;
586
587 /* add the packet to the send queue if it's now full */
588 if (sp->remain <= 0 || (segment == 0 && !more)) {
589 struct rxrpc_connection *conn = call->conn;
590 size_t pad;
591
592 /* pad out if we're using security */
593 if (conn->security) {
594 pad = conn->security_size + skb->mark;
595 pad = conn->size_align - pad;
596 pad &= conn->size_align - 1;
597 _debug("pad %zu", pad);
598 if (pad)
599 memset(skb_put(skb, pad), 0, pad);
600 }
601
602 sp->hdr.epoch = conn->epoch;
603 sp->hdr.cid = call->cid;
604 sp->hdr.callNumber = call->call_id;
605 sp->hdr.seq =
606 htonl(atomic_inc_return(&call->sequence));
607 sp->hdr.serial =
608 htonl(atomic_inc_return(&conn->serial));
609 sp->hdr.type = RXRPC_PACKET_TYPE_DATA;
610 sp->hdr.userStatus = 0;
611 sp->hdr.securityIndex = conn->security_ix;
612 sp->hdr._rsvd = 0;
613 sp->hdr.serviceId = conn->service_id;
614
615 sp->hdr.flags = conn->out_clientflag;
616 if (len == 0 && !more)
617 sp->hdr.flags |= RXRPC_LAST_PACKET;
618 else if (CIRC_SPACE(call->acks_head, call->acks_tail,
619 call->acks_winsz) > 1)
620 sp->hdr.flags |= RXRPC_MORE_PACKETS;
621
622 ret = rxrpc_secure_packet(
623 call, skb, skb->mark,
624 skb->head + sizeof(struct rxrpc_header));
625 if (ret < 0)
626 goto out;
627
628 memcpy(skb->head, &sp->hdr,
629 sizeof(struct rxrpc_header));
630 rxrpc_queue_packet(call, skb, segment == 0 && !more);
631 skb = NULL;
632 }
633
634 } while (segment > 0);
635
636out:
637 call->tx_pending = skb;
638 _leave(" = %d", ret);
639 return ret;
640
641call_aborted:
642 rxrpc_free_skb(skb);
643 if (call->state == RXRPC_CALL_NETWORK_ERROR)
644 ret = call->conn->trans->peer->net_error;
645 else
646 ret = -ECONNABORTED;
647 _leave(" = %d", ret);
648 return ret;
649
650maybe_error:
651 if (copied)
652 ret = copied;
653 goto out;
654
655efault:
656 ret = -EFAULT;
657 goto out;
658}