diff options
author | Chuck Lever <cel@citi.umich.edu> | 2005-08-11 16:25:23 -0400 |
---|---|---|
committer | Trond Myklebust <Trond.Myklebust@netapp.com> | 2005-09-23 12:38:12 -0400 |
commit | a246b0105bbd9a70a698f69baae2042996f2a0e9 (patch) | |
tree | 6c8831d8579a7fdc5201d3e9c20270cb1420eeda /net/sunrpc/xprt.c | |
parent | 094bb20b9fcab3a1652a77741caba6b78097d622 (diff) |
[PATCH] RPC: introduce client-side transport switch
Move the bulk of client-side socket-specific code into a separate source
file, net/sunrpc/xprtsock.c.
Test-plan:
Millions of fsx operations. Performance characterization such as "sio" or
"iozone". Destructive testing (unplugging the network temporarily, server
reboots). Connectathon with v2, v3, and v4.
Version: Thu, 11 Aug 2005 16:03:38 -0400
Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc/xprt.c')
-rw-r--r-- | net/sunrpc/xprt.c | 916 |
1 files changed, 43 insertions, 873 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 67444f494fea..4342acf4d1cd 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c | |||
@@ -32,37 +32,16 @@ | |||
32 | * tasks that rely on callbacks. | 32 | * tasks that rely on callbacks. |
33 | * | 33 | * |
34 | * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> | 34 | * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> |
35 | * | ||
36 | * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
37 | * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
38 | * TCP NFS related read + write fixes | ||
39 | * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> | ||
40 | * | ||
41 | * Rewrite of larges part of the code in order to stabilize TCP stuff. | ||
42 | * Fix behaviour when socket buffer is full. | ||
43 | * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> | ||
44 | */ | 35 | */ |
45 | 36 | ||
37 | #include <linux/module.h> | ||
38 | |||
46 | #include <linux/types.h> | 39 | #include <linux/types.h> |
47 | #include <linux/slab.h> | 40 | #include <linux/interrupt.h> |
48 | #include <linux/capability.h> | ||
49 | #include <linux/sched.h> | ||
50 | #include <linux/errno.h> | ||
51 | #include <linux/socket.h> | ||
52 | #include <linux/in.h> | ||
53 | #include <linux/net.h> | ||
54 | #include <linux/mm.h> | ||
55 | #include <linux/udp.h> | ||
56 | #include <linux/tcp.h> | ||
57 | #include <linux/sunrpc/clnt.h> | ||
58 | #include <linux/file.h> | ||
59 | #include <linux/workqueue.h> | 41 | #include <linux/workqueue.h> |
60 | #include <linux/random.h> | 42 | #include <linux/random.h> |
61 | 43 | ||
62 | #include <net/sock.h> | 44 | #include <linux/sunrpc/clnt.h> |
63 | #include <net/checksum.h> | ||
64 | #include <net/udp.h> | ||
65 | #include <net/tcp.h> | ||
66 | 45 | ||
67 | /* | 46 | /* |
68 | * Local variables | 47 | * Local variables |
@@ -74,64 +53,17 @@ | |||
74 | #endif | 53 | #endif |
75 | 54 | ||
76 | #define XPRT_MAX_BACKOFF (8) | 55 | #define XPRT_MAX_BACKOFF (8) |
77 | #define XPRT_IDLE_TIMEOUT (5*60*HZ) | ||
78 | #define XPRT_MAX_RESVPORT (800) | ||
79 | 56 | ||
80 | /* | 57 | /* |
81 | * Local functions | 58 | * Local functions |
82 | */ | 59 | */ |
83 | static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); | 60 | static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); |
84 | static inline void do_xprt_reserve(struct rpc_task *); | 61 | static inline void do_xprt_reserve(struct rpc_task *); |
85 | static void xprt_disconnect(struct rpc_xprt *); | ||
86 | static void xprt_connect_status(struct rpc_task *task); | 62 | static void xprt_connect_status(struct rpc_task *task); |
87 | static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, | ||
88 | struct rpc_timeout *to); | ||
89 | static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); | ||
90 | static void xprt_bind_socket(struct rpc_xprt *, struct socket *); | ||
91 | static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); | 63 | static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); |
92 | 64 | ||
93 | static int xprt_clear_backlog(struct rpc_xprt *xprt); | 65 | static int xprt_clear_backlog(struct rpc_xprt *xprt); |
94 | 66 | ||
95 | #ifdef RPC_DEBUG_DATA | ||
96 | /* | ||
97 | * Print the buffer contents (first 128 bytes only--just enough for | ||
98 | * diropres return). | ||
99 | */ | ||
100 | static void | ||
101 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
102 | { | ||
103 | u8 *buf = (u8 *) packet; | ||
104 | int j; | ||
105 | |||
106 | dprintk("RPC: %s\n", msg); | ||
107 | for (j = 0; j < count && j < 128; j += 4) { | ||
108 | if (!(j & 31)) { | ||
109 | if (j) | ||
110 | dprintk("\n"); | ||
111 | dprintk("0x%04x ", j); | ||
112 | } | ||
113 | dprintk("%02x%02x%02x%02x ", | ||
114 | buf[j], buf[j+1], buf[j+2], buf[j+3]); | ||
115 | } | ||
116 | dprintk("\n"); | ||
117 | } | ||
118 | #else | ||
119 | static inline void | ||
120 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
121 | { | ||
122 | /* NOP */ | ||
123 | } | ||
124 | #endif | ||
125 | |||
126 | /* | ||
127 | * Look up RPC transport given an INET socket | ||
128 | */ | ||
129 | static inline struct rpc_xprt * | ||
130 | xprt_from_sock(struct sock *sk) | ||
131 | { | ||
132 | return (struct rpc_xprt *) sk->sk_user_data; | ||
133 | } | ||
134 | |||
135 | /* | 67 | /* |
136 | * Serialize write access to sockets, in order to prevent different | 68 | * Serialize write access to sockets, in order to prevent different |
137 | * requests from interfering with each other. | 69 | * requests from interfering with each other. |
@@ -235,62 +167,6 @@ xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) | |||
235 | } | 167 | } |
236 | 168 | ||
237 | /* | 169 | /* |
238 | * Write data to socket. | ||
239 | */ | ||
240 | static inline int | ||
241 | xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) | ||
242 | { | ||
243 | struct socket *sock = xprt->sock; | ||
244 | struct xdr_buf *xdr = &req->rq_snd_buf; | ||
245 | struct sockaddr *addr = NULL; | ||
246 | int addrlen = 0; | ||
247 | unsigned int skip; | ||
248 | int result; | ||
249 | |||
250 | if (!sock) | ||
251 | return -ENOTCONN; | ||
252 | |||
253 | xprt_pktdump("packet data:", | ||
254 | req->rq_svec->iov_base, | ||
255 | req->rq_svec->iov_len); | ||
256 | |||
257 | /* For UDP, we need to provide an address */ | ||
258 | if (!xprt->stream) { | ||
259 | addr = (struct sockaddr *) &xprt->addr; | ||
260 | addrlen = sizeof(xprt->addr); | ||
261 | } | ||
262 | /* Dont repeat bytes */ | ||
263 | skip = req->rq_bytes_sent; | ||
264 | |||
265 | clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); | ||
266 | result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); | ||
267 | |||
268 | dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); | ||
269 | |||
270 | if (result >= 0) | ||
271 | return result; | ||
272 | |||
273 | switch (result) { | ||
274 | case -ECONNREFUSED: | ||
275 | /* When the server has died, an ICMP port unreachable message | ||
276 | * prompts ECONNREFUSED. | ||
277 | */ | ||
278 | case -EAGAIN: | ||
279 | break; | ||
280 | case -ECONNRESET: | ||
281 | case -ENOTCONN: | ||
282 | case -EPIPE: | ||
283 | /* connection broken */ | ||
284 | if (xprt->stream) | ||
285 | result = -ENOTCONN; | ||
286 | break; | ||
287 | default: | ||
288 | printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); | ||
289 | } | ||
290 | return result; | ||
291 | } | ||
292 | |||
293 | /* | ||
294 | * Van Jacobson congestion avoidance. Check if the congestion window | 170 | * Van Jacobson congestion avoidance. Check if the congestion window |
295 | * overflowed. Put the task to sleep if this is the case. | 171 | * overflowed. Put the task to sleep if this is the case. |
296 | */ | 172 | */ |
@@ -405,48 +281,20 @@ int xprt_adjust_timeout(struct rpc_rqst *req) | |||
405 | return status; | 281 | return status; |
406 | } | 282 | } |
407 | 283 | ||
408 | /* | ||
409 | * Close down a transport socket | ||
410 | */ | ||
411 | static void | ||
412 | xprt_close(struct rpc_xprt *xprt) | ||
413 | { | ||
414 | struct socket *sock = xprt->sock; | ||
415 | struct sock *sk = xprt->inet; | ||
416 | |||
417 | if (!sk) | ||
418 | return; | ||
419 | |||
420 | write_lock_bh(&sk->sk_callback_lock); | ||
421 | xprt->inet = NULL; | ||
422 | xprt->sock = NULL; | ||
423 | |||
424 | sk->sk_user_data = NULL; | ||
425 | sk->sk_data_ready = xprt->old_data_ready; | ||
426 | sk->sk_state_change = xprt->old_state_change; | ||
427 | sk->sk_write_space = xprt->old_write_space; | ||
428 | write_unlock_bh(&sk->sk_callback_lock); | ||
429 | |||
430 | sk->sk_no_check = 0; | ||
431 | |||
432 | sock_release(sock); | ||
433 | } | ||
434 | |||
435 | static void | 284 | static void |
436 | xprt_socket_autoclose(void *args) | 285 | xprt_socket_autoclose(void *args) |
437 | { | 286 | { |
438 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | 287 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; |
439 | 288 | ||
440 | xprt_disconnect(xprt); | 289 | xprt_disconnect(xprt); |
441 | xprt_close(xprt); | 290 | xprt->ops->close(xprt); |
442 | xprt_release_write(xprt, NULL); | 291 | xprt_release_write(xprt, NULL); |
443 | } | 292 | } |
444 | 293 | ||
445 | /* | 294 | /* |
446 | * Mark a transport as disconnected | 295 | * Mark a transport as disconnected |
447 | */ | 296 | */ |
448 | static void | 297 | void xprt_disconnect(struct rpc_xprt *xprt) |
449 | xprt_disconnect(struct rpc_xprt *xprt) | ||
450 | { | 298 | { |
451 | dprintk("RPC: disconnected transport %p\n", xprt); | 299 | dprintk("RPC: disconnected transport %p\n", xprt); |
452 | spin_lock_bh(&xprt->sock_lock); | 300 | spin_lock_bh(&xprt->sock_lock); |
@@ -479,57 +327,6 @@ out_abort: | |||
479 | spin_unlock(&xprt->sock_lock); | 327 | spin_unlock(&xprt->sock_lock); |
480 | } | 328 | } |
481 | 329 | ||
482 | static void xprt_socket_connect(void *args) | ||
483 | { | ||
484 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | ||
485 | struct socket *sock = xprt->sock; | ||
486 | int status = -EIO; | ||
487 | |||
488 | if (xprt->shutdown || xprt->addr.sin_port == 0) | ||
489 | goto out; | ||
490 | |||
491 | /* | ||
492 | * Start by resetting any existing state | ||
493 | */ | ||
494 | xprt_close(xprt); | ||
495 | sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); | ||
496 | if (sock == NULL) { | ||
497 | /* couldn't create socket or bind to reserved port; | ||
498 | * this is likely a permanent error, so cause an abort */ | ||
499 | goto out; | ||
500 | } | ||
501 | xprt_bind_socket(xprt, sock); | ||
502 | xprt_sock_setbufsize(xprt); | ||
503 | |||
504 | status = 0; | ||
505 | if (!xprt->stream) | ||
506 | goto out; | ||
507 | |||
508 | /* | ||
509 | * Tell the socket layer to start connecting... | ||
510 | */ | ||
511 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, | ||
512 | sizeof(xprt->addr), O_NONBLOCK); | ||
513 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", | ||
514 | xprt, -status, xprt_connected(xprt), sock->sk->sk_state); | ||
515 | if (status < 0) { | ||
516 | switch (status) { | ||
517 | case -EINPROGRESS: | ||
518 | case -EALREADY: | ||
519 | goto out_clear; | ||
520 | } | ||
521 | } | ||
522 | out: | ||
523 | if (status < 0) | ||
524 | rpc_wake_up_status(&xprt->pending, status); | ||
525 | else | ||
526 | rpc_wake_up(&xprt->pending); | ||
527 | out_clear: | ||
528 | smp_mb__before_clear_bit(); | ||
529 | clear_bit(XPRT_CONNECTING, &xprt->sockstate); | ||
530 | smp_mb__after_clear_bit(); | ||
531 | } | ||
532 | |||
533 | /* | 330 | /* |
534 | * Attempt to connect a TCP socket. | 331 | * Attempt to connect a TCP socket. |
535 | * | 332 | * |
@@ -552,30 +349,16 @@ void xprt_connect(struct rpc_task *task) | |||
552 | if (!xprt_lock_write(xprt, task)) | 349 | if (!xprt_lock_write(xprt, task)) |
553 | return; | 350 | return; |
554 | if (xprt_connected(xprt)) | 351 | if (xprt_connected(xprt)) |
555 | goto out_write; | 352 | xprt_release_write(xprt, task); |
353 | else { | ||
354 | if (task->tk_rqstp) | ||
355 | task->tk_rqstp->rq_bytes_sent = 0; | ||
556 | 356 | ||
557 | if (task->tk_rqstp) | 357 | task->tk_timeout = RPC_CONNECT_TIMEOUT; |
558 | task->tk_rqstp->rq_bytes_sent = 0; | 358 | rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); |
559 | 359 | xprt->ops->connect(task); | |
560 | task->tk_timeout = RPC_CONNECT_TIMEOUT; | ||
561 | rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); | ||
562 | if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { | ||
563 | /* Note: if we are here due to a dropped connection | ||
564 | * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ | ||
565 | * seconds | ||
566 | */ | ||
567 | if (xprt->sock != NULL) | ||
568 | schedule_delayed_work(&xprt->sock_connect, | ||
569 | RPC_REESTABLISH_TIMEOUT); | ||
570 | else { | ||
571 | schedule_work(&xprt->sock_connect); | ||
572 | if (!RPC_IS_ASYNC(task)) | ||
573 | flush_scheduled_work(); | ||
574 | } | ||
575 | } | 360 | } |
576 | return; | 361 | return; |
577 | out_write: | ||
578 | xprt_release_write(xprt, task); | ||
579 | } | 362 | } |
580 | 363 | ||
581 | /* | 364 | /* |
@@ -624,8 +407,7 @@ xprt_connect_status(struct rpc_task *task) | |||
624 | /* | 407 | /* |
625 | * Look up the RPC request corresponding to a reply, and then lock it. | 408 | * Look up the RPC request corresponding to a reply, and then lock it. |
626 | */ | 409 | */ |
627 | static inline struct rpc_rqst * | 410 | struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) |
628 | xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) | ||
629 | { | 411 | { |
630 | struct list_head *pos; | 412 | struct list_head *pos; |
631 | struct rpc_rqst *req = NULL; | 413 | struct rpc_rqst *req = NULL; |
@@ -644,8 +426,7 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) | |||
644 | * Complete reply received. | 426 | * Complete reply received. |
645 | * The TCP code relies on us to remove the request from xprt->pending. | 427 | * The TCP code relies on us to remove the request from xprt->pending. |
646 | */ | 428 | */ |
647 | static void | 429 | void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) |
648 | xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) | ||
649 | { | 430 | { |
650 | struct rpc_task *task = req->rq_task; | 431 | struct rpc_task *task = req->rq_task; |
651 | struct rpc_clnt *clnt = task->tk_client; | 432 | struct rpc_clnt *clnt = task->tk_client; |
@@ -692,409 +473,6 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) | |||
692 | } | 473 | } |
693 | 474 | ||
694 | /* | 475 | /* |
695 | * Input handler for RPC replies. Called from a bottom half and hence | ||
696 | * atomic. | ||
697 | */ | ||
698 | static void | ||
699 | udp_data_ready(struct sock *sk, int len) | ||
700 | { | ||
701 | struct rpc_task *task; | ||
702 | struct rpc_xprt *xprt; | ||
703 | struct rpc_rqst *rovr; | ||
704 | struct sk_buff *skb; | ||
705 | int err, repsize, copied; | ||
706 | u32 _xid, *xp; | ||
707 | |||
708 | read_lock(&sk->sk_callback_lock); | ||
709 | dprintk("RPC: udp_data_ready...\n"); | ||
710 | if (!(xprt = xprt_from_sock(sk))) { | ||
711 | printk("RPC: udp_data_ready request not found!\n"); | ||
712 | goto out; | ||
713 | } | ||
714 | |||
715 | dprintk("RPC: udp_data_ready client %p\n", xprt); | ||
716 | |||
717 | if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) | ||
718 | goto out; | ||
719 | |||
720 | if (xprt->shutdown) | ||
721 | goto dropit; | ||
722 | |||
723 | repsize = skb->len - sizeof(struct udphdr); | ||
724 | if (repsize < 4) { | ||
725 | printk("RPC: impossible RPC reply size %d!\n", repsize); | ||
726 | goto dropit; | ||
727 | } | ||
728 | |||
729 | /* Copy the XID from the skb... */ | ||
730 | xp = skb_header_pointer(skb, sizeof(struct udphdr), | ||
731 | sizeof(_xid), &_xid); | ||
732 | if (xp == NULL) | ||
733 | goto dropit; | ||
734 | |||
735 | /* Look up and lock the request corresponding to the given XID */ | ||
736 | spin_lock(&xprt->sock_lock); | ||
737 | rovr = xprt_lookup_rqst(xprt, *xp); | ||
738 | if (!rovr) | ||
739 | goto out_unlock; | ||
740 | task = rovr->rq_task; | ||
741 | |||
742 | dprintk("RPC: %4d received reply\n", task->tk_pid); | ||
743 | |||
744 | if ((copied = rovr->rq_private_buf.buflen) > repsize) | ||
745 | copied = repsize; | ||
746 | |||
747 | /* Suck it into the iovec, verify checksum if not done by hw. */ | ||
748 | if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) | ||
749 | goto out_unlock; | ||
750 | |||
751 | /* Something worked... */ | ||
752 | dst_confirm(skb->dst); | ||
753 | |||
754 | xprt_complete_rqst(xprt, rovr, copied); | ||
755 | |||
756 | out_unlock: | ||
757 | spin_unlock(&xprt->sock_lock); | ||
758 | dropit: | ||
759 | skb_free_datagram(sk, skb); | ||
760 | out: | ||
761 | read_unlock(&sk->sk_callback_lock); | ||
762 | } | ||
763 | |||
764 | /* | ||
765 | * Copy from an skb into memory and shrink the skb. | ||
766 | */ | ||
767 | static inline size_t | ||
768 | tcp_copy_data(skb_reader_t *desc, void *p, size_t len) | ||
769 | { | ||
770 | if (len > desc->count) | ||
771 | len = desc->count; | ||
772 | if (skb_copy_bits(desc->skb, desc->offset, p, len)) { | ||
773 | dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", | ||
774 | len, desc->count); | ||
775 | return 0; | ||
776 | } | ||
777 | desc->offset += len; | ||
778 | desc->count -= len; | ||
779 | dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", | ||
780 | len, desc->count); | ||
781 | return len; | ||
782 | } | ||
783 | |||
784 | /* | ||
785 | * TCP read fragment marker | ||
786 | */ | ||
787 | static inline void | ||
788 | tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
789 | { | ||
790 | size_t len, used; | ||
791 | char *p; | ||
792 | |||
793 | p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; | ||
794 | len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; | ||
795 | used = tcp_copy_data(desc, p, len); | ||
796 | xprt->tcp_offset += used; | ||
797 | if (used != len) | ||
798 | return; | ||
799 | xprt->tcp_reclen = ntohl(xprt->tcp_recm); | ||
800 | if (xprt->tcp_reclen & 0x80000000) | ||
801 | xprt->tcp_flags |= XPRT_LAST_FRAG; | ||
802 | else | ||
803 | xprt->tcp_flags &= ~XPRT_LAST_FRAG; | ||
804 | xprt->tcp_reclen &= 0x7fffffff; | ||
805 | xprt->tcp_flags &= ~XPRT_COPY_RECM; | ||
806 | xprt->tcp_offset = 0; | ||
807 | /* Sanity check of the record length */ | ||
808 | if (xprt->tcp_reclen < 4) { | ||
809 | printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); | ||
810 | xprt_disconnect(xprt); | ||
811 | } | ||
812 | dprintk("RPC: reading TCP record fragment of length %d\n", | ||
813 | xprt->tcp_reclen); | ||
814 | } | ||
815 | |||
816 | static void | ||
817 | tcp_check_recm(struct rpc_xprt *xprt) | ||
818 | { | ||
819 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", | ||
820 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); | ||
821 | if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
822 | xprt->tcp_flags |= XPRT_COPY_RECM; | ||
823 | xprt->tcp_offset = 0; | ||
824 | if (xprt->tcp_flags & XPRT_LAST_FRAG) { | ||
825 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
826 | xprt->tcp_flags |= XPRT_COPY_XID; | ||
827 | xprt->tcp_copied = 0; | ||
828 | } | ||
829 | } | ||
830 | } | ||
831 | |||
832 | /* | ||
833 | * TCP read xid | ||
834 | */ | ||
835 | static inline void | ||
836 | tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
837 | { | ||
838 | size_t len, used; | ||
839 | char *p; | ||
840 | |||
841 | len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; | ||
842 | dprintk("RPC: reading XID (%Zu bytes)\n", len); | ||
843 | p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; | ||
844 | used = tcp_copy_data(desc, p, len); | ||
845 | xprt->tcp_offset += used; | ||
846 | if (used != len) | ||
847 | return; | ||
848 | xprt->tcp_flags &= ~XPRT_COPY_XID; | ||
849 | xprt->tcp_flags |= XPRT_COPY_DATA; | ||
850 | xprt->tcp_copied = 4; | ||
851 | dprintk("RPC: reading reply for XID %08x\n", | ||
852 | ntohl(xprt->tcp_xid)); | ||
853 | tcp_check_recm(xprt); | ||
854 | } | ||
855 | |||
856 | /* | ||
857 | * TCP read and complete request | ||
858 | */ | ||
859 | static inline void | ||
860 | tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
861 | { | ||
862 | struct rpc_rqst *req; | ||
863 | struct xdr_buf *rcvbuf; | ||
864 | size_t len; | ||
865 | ssize_t r; | ||
866 | |||
867 | /* Find and lock the request corresponding to this xid */ | ||
868 | spin_lock(&xprt->sock_lock); | ||
869 | req = xprt_lookup_rqst(xprt, xprt->tcp_xid); | ||
870 | if (!req) { | ||
871 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
872 | dprintk("RPC: XID %08x request not found!\n", | ||
873 | ntohl(xprt->tcp_xid)); | ||
874 | spin_unlock(&xprt->sock_lock); | ||
875 | return; | ||
876 | } | ||
877 | |||
878 | rcvbuf = &req->rq_private_buf; | ||
879 | len = desc->count; | ||
880 | if (len > xprt->tcp_reclen - xprt->tcp_offset) { | ||
881 | skb_reader_t my_desc; | ||
882 | |||
883 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
884 | memcpy(&my_desc, desc, sizeof(my_desc)); | ||
885 | my_desc.count = len; | ||
886 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
887 | &my_desc, tcp_copy_data); | ||
888 | desc->count -= r; | ||
889 | desc->offset += r; | ||
890 | } else | ||
891 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
892 | desc, tcp_copy_data); | ||
893 | |||
894 | if (r > 0) { | ||
895 | xprt->tcp_copied += r; | ||
896 | xprt->tcp_offset += r; | ||
897 | } | ||
898 | if (r != len) { | ||
899 | /* Error when copying to the receive buffer, | ||
900 | * usually because we weren't able to allocate | ||
901 | * additional buffer pages. All we can do now | ||
902 | * is turn off XPRT_COPY_DATA, so the request | ||
903 | * will not receive any additional updates, | ||
904 | * and time out. | ||
905 | * Any remaining data from this record will | ||
906 | * be discarded. | ||
907 | */ | ||
908 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
909 | dprintk("RPC: XID %08x truncated request\n", | ||
910 | ntohl(xprt->tcp_xid)); | ||
911 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
912 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
913 | goto out; | ||
914 | } | ||
915 | |||
916 | dprintk("RPC: XID %08x read %Zd bytes\n", | ||
917 | ntohl(xprt->tcp_xid), r); | ||
918 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
919 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
920 | |||
921 | if (xprt->tcp_copied == req->rq_private_buf.buflen) | ||
922 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
923 | else if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
924 | if (xprt->tcp_flags & XPRT_LAST_FRAG) | ||
925 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
926 | } | ||
927 | |||
928 | out: | ||
929 | if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { | ||
930 | dprintk("RPC: %4d received reply complete\n", | ||
931 | req->rq_task->tk_pid); | ||
932 | xprt_complete_rqst(xprt, req, xprt->tcp_copied); | ||
933 | } | ||
934 | spin_unlock(&xprt->sock_lock); | ||
935 | tcp_check_recm(xprt); | ||
936 | } | ||
937 | |||
938 | /* | ||
939 | * TCP discard extra bytes from a short read | ||
940 | */ | ||
941 | static inline void | ||
942 | tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
943 | { | ||
944 | size_t len; | ||
945 | |||
946 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
947 | if (len > desc->count) | ||
948 | len = desc->count; | ||
949 | desc->count -= len; | ||
950 | desc->offset += len; | ||
951 | xprt->tcp_offset += len; | ||
952 | dprintk("RPC: discarded %Zu bytes\n", len); | ||
953 | tcp_check_recm(xprt); | ||
954 | } | ||
955 | |||
956 | /* | ||
957 | * TCP record receive routine | ||
958 | * We first have to grab the record marker, then the XID, then the data. | ||
959 | */ | ||
960 | static int | ||
961 | tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, | ||
962 | unsigned int offset, size_t len) | ||
963 | { | ||
964 | struct rpc_xprt *xprt = rd_desc->arg.data; | ||
965 | skb_reader_t desc = { | ||
966 | .skb = skb, | ||
967 | .offset = offset, | ||
968 | .count = len, | ||
969 | .csum = 0 | ||
970 | }; | ||
971 | |||
972 | dprintk("RPC: tcp_data_recv\n"); | ||
973 | do { | ||
974 | /* Read in a new fragment marker if necessary */ | ||
975 | /* Can we ever really expect to get completely empty fragments? */ | ||
976 | if (xprt->tcp_flags & XPRT_COPY_RECM) { | ||
977 | tcp_read_fraghdr(xprt, &desc); | ||
978 | continue; | ||
979 | } | ||
980 | /* Read in the xid if necessary */ | ||
981 | if (xprt->tcp_flags & XPRT_COPY_XID) { | ||
982 | tcp_read_xid(xprt, &desc); | ||
983 | continue; | ||
984 | } | ||
985 | /* Read in the request data */ | ||
986 | if (xprt->tcp_flags & XPRT_COPY_DATA) { | ||
987 | tcp_read_request(xprt, &desc); | ||
988 | continue; | ||
989 | } | ||
990 | /* Skip over any trailing bytes on short reads */ | ||
991 | tcp_read_discard(xprt, &desc); | ||
992 | } while (desc.count); | ||
993 | dprintk("RPC: tcp_data_recv done\n"); | ||
994 | return len - desc.count; | ||
995 | } | ||
996 | |||
997 | static void tcp_data_ready(struct sock *sk, int bytes) | ||
998 | { | ||
999 | struct rpc_xprt *xprt; | ||
1000 | read_descriptor_t rd_desc; | ||
1001 | |||
1002 | read_lock(&sk->sk_callback_lock); | ||
1003 | dprintk("RPC: tcp_data_ready...\n"); | ||
1004 | if (!(xprt = xprt_from_sock(sk))) { | ||
1005 | printk("RPC: tcp_data_ready socket info not found!\n"); | ||
1006 | goto out; | ||
1007 | } | ||
1008 | if (xprt->shutdown) | ||
1009 | goto out; | ||
1010 | |||
1011 | /* We use rd_desc to pass struct xprt to tcp_data_recv */ | ||
1012 | rd_desc.arg.data = xprt; | ||
1013 | rd_desc.count = 65536; | ||
1014 | tcp_read_sock(sk, &rd_desc, tcp_data_recv); | ||
1015 | out: | ||
1016 | read_unlock(&sk->sk_callback_lock); | ||
1017 | } | ||
1018 | |||
1019 | static void | ||
1020 | tcp_state_change(struct sock *sk) | ||
1021 | { | ||
1022 | struct rpc_xprt *xprt; | ||
1023 | |||
1024 | read_lock(&sk->sk_callback_lock); | ||
1025 | if (!(xprt = xprt_from_sock(sk))) | ||
1026 | goto out; | ||
1027 | dprintk("RPC: tcp_state_change client %p...\n", xprt); | ||
1028 | dprintk("RPC: state %x conn %d dead %d zapped %d\n", | ||
1029 | sk->sk_state, xprt_connected(xprt), | ||
1030 | sock_flag(sk, SOCK_DEAD), | ||
1031 | sock_flag(sk, SOCK_ZAPPED)); | ||
1032 | |||
1033 | switch (sk->sk_state) { | ||
1034 | case TCP_ESTABLISHED: | ||
1035 | spin_lock_bh(&xprt->sock_lock); | ||
1036 | if (!xprt_test_and_set_connected(xprt)) { | ||
1037 | /* Reset TCP record info */ | ||
1038 | xprt->tcp_offset = 0; | ||
1039 | xprt->tcp_reclen = 0; | ||
1040 | xprt->tcp_copied = 0; | ||
1041 | xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; | ||
1042 | rpc_wake_up(&xprt->pending); | ||
1043 | } | ||
1044 | spin_unlock_bh(&xprt->sock_lock); | ||
1045 | break; | ||
1046 | case TCP_SYN_SENT: | ||
1047 | case TCP_SYN_RECV: | ||
1048 | break; | ||
1049 | default: | ||
1050 | xprt_disconnect(xprt); | ||
1051 | break; | ||
1052 | } | ||
1053 | out: | ||
1054 | read_unlock(&sk->sk_callback_lock); | ||
1055 | } | ||
1056 | |||
1057 | /* | ||
1058 | * Called when more output buffer space is available for this socket. | ||
1059 | * We try not to wake our writers until they can make "significant" | ||
1060 | * progress, otherwise we'll waste resources thrashing sock_sendmsg | ||
1061 | * with a bunch of small requests. | ||
1062 | */ | ||
1063 | static void | ||
1064 | xprt_write_space(struct sock *sk) | ||
1065 | { | ||
1066 | struct rpc_xprt *xprt; | ||
1067 | struct socket *sock; | ||
1068 | |||
1069 | read_lock(&sk->sk_callback_lock); | ||
1070 | if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) | ||
1071 | goto out; | ||
1072 | if (xprt->shutdown) | ||
1073 | goto out; | ||
1074 | |||
1075 | /* Wait until we have enough socket memory */ | ||
1076 | if (xprt->stream) { | ||
1077 | /* from net/core/stream.c:sk_stream_write_space */ | ||
1078 | if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) | ||
1079 | goto out; | ||
1080 | } else { | ||
1081 | /* from net/core/sock.c:sock_def_write_space */ | ||
1082 | if (!sock_writeable(sk)) | ||
1083 | goto out; | ||
1084 | } | ||
1085 | |||
1086 | if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) | ||
1087 | goto out; | ||
1088 | |||
1089 | spin_lock_bh(&xprt->sock_lock); | ||
1090 | if (xprt->snd_task) | ||
1091 | rpc_wake_up_task(xprt->snd_task); | ||
1092 | spin_unlock_bh(&xprt->sock_lock); | ||
1093 | out: | ||
1094 | read_unlock(&sk->sk_callback_lock); | ||
1095 | } | ||
1096 | |||
1097 | /* | ||
1098 | * RPC receive timeout handler. | 476 | * RPC receive timeout handler. |
1099 | */ | 477 | */ |
1100 | static void | 478 | static void |
@@ -1161,19 +539,10 @@ xprt_transmit(struct rpc_task *task) | |||
1161 | struct rpc_clnt *clnt = task->tk_client; | 539 | struct rpc_clnt *clnt = task->tk_client; |
1162 | struct rpc_rqst *req = task->tk_rqstp; | 540 | struct rpc_rqst *req = task->tk_rqstp; |
1163 | struct rpc_xprt *xprt = req->rq_xprt; | 541 | struct rpc_xprt *xprt = req->rq_xprt; |
1164 | int status, retry = 0; | 542 | int status; |
1165 | |||
1166 | 543 | ||
1167 | dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); | 544 | dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); |
1168 | 545 | ||
1169 | /* set up everything as needed. */ | ||
1170 | /* Write the record marker */ | ||
1171 | if (xprt->stream) { | ||
1172 | u32 *marker = req->rq_svec[0].iov_base; | ||
1173 | |||
1174 | *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); | ||
1175 | } | ||
1176 | |||
1177 | smp_rmb(); | 546 | smp_rmb(); |
1178 | if (!req->rq_received) { | 547 | if (!req->rq_received) { |
1179 | if (list_empty(&req->rq_list)) { | 548 | if (list_empty(&req->rq_list)) { |
@@ -1191,41 +560,9 @@ xprt_transmit(struct rpc_task *task) | |||
1191 | } else if (!req->rq_bytes_sent) | 560 | } else if (!req->rq_bytes_sent) |
1192 | return; | 561 | return; |
1193 | 562 | ||
1194 | /* Continue transmitting the packet/record. We must be careful | 563 | status = xprt->ops->send_request(task); |
1195 | * to cope with writespace callbacks arriving _after_ we have | 564 | if (!status) |
1196 | * called xprt_sendmsg(). | 565 | goto out_receive; |
1197 | */ | ||
1198 | while (1) { | ||
1199 | req->rq_xtime = jiffies; | ||
1200 | status = xprt_sendmsg(xprt, req); | ||
1201 | |||
1202 | if (status < 0) | ||
1203 | break; | ||
1204 | |||
1205 | if (xprt->stream) { | ||
1206 | req->rq_bytes_sent += status; | ||
1207 | |||
1208 | /* If we've sent the entire packet, immediately | ||
1209 | * reset the count of bytes sent. */ | ||
1210 | if (req->rq_bytes_sent >= req->rq_slen) { | ||
1211 | req->rq_bytes_sent = 0; | ||
1212 | goto out_receive; | ||
1213 | } | ||
1214 | } else { | ||
1215 | if (status >= req->rq_slen) | ||
1216 | goto out_receive; | ||
1217 | status = -EAGAIN; | ||
1218 | break; | ||
1219 | } | ||
1220 | |||
1221 | dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", | ||
1222 | task->tk_pid, req->rq_slen - req->rq_bytes_sent, | ||
1223 | req->rq_slen); | ||
1224 | |||
1225 | status = -EAGAIN; | ||
1226 | if (retry++ > 50) | ||
1227 | break; | ||
1228 | } | ||
1229 | 566 | ||
1230 | /* Note: at this point, task->tk_sleeping has not yet been set, | 567 | /* Note: at this point, task->tk_sleeping has not yet been set, |
1231 | * hence there is no danger of the waking up task being put on | 568 | * hence there is no danger of the waking up task being put on |
@@ -1234,26 +571,10 @@ xprt_transmit(struct rpc_task *task) | |||
1234 | task->tk_status = status; | 571 | task->tk_status = status; |
1235 | 572 | ||
1236 | switch (status) { | 573 | switch (status) { |
1237 | case -EAGAIN: | ||
1238 | if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { | ||
1239 | /* Protect against races with xprt_write_space */ | ||
1240 | spin_lock_bh(&xprt->sock_lock); | ||
1241 | /* Don't race with disconnect */ | ||
1242 | if (!xprt_connected(xprt)) | ||
1243 | task->tk_status = -ENOTCONN; | ||
1244 | else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { | ||
1245 | task->tk_timeout = req->rq_timeout; | ||
1246 | rpc_sleep_on(&xprt->pending, task, NULL, NULL); | ||
1247 | } | ||
1248 | spin_unlock_bh(&xprt->sock_lock); | ||
1249 | return; | ||
1250 | } | ||
1251 | /* Keep holding the socket if it is blocked */ | ||
1252 | rpc_delay(task, HZ>>4); | ||
1253 | return; | ||
1254 | case -ECONNREFUSED: | 574 | case -ECONNREFUSED: |
1255 | task->tk_timeout = RPC_REESTABLISH_TIMEOUT; | 575 | task->tk_timeout = RPC_REESTABLISH_TIMEOUT; |
1256 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); | 576 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); |
577 | case -EAGAIN: | ||
1257 | case -ENOTCONN: | 578 | case -ENOTCONN: |
1258 | return; | 579 | return; |
1259 | default: | 580 | default: |
@@ -1367,7 +688,8 @@ xprt_release(struct rpc_task *task) | |||
1367 | list_del(&req->rq_list); | 688 | list_del(&req->rq_list); |
1368 | xprt->last_used = jiffies; | 689 | xprt->last_used = jiffies; |
1369 | if (list_empty(&xprt->recv) && !xprt->shutdown) | 690 | if (list_empty(&xprt->recv) && !xprt->shutdown) |
1370 | mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); | 691 | mod_timer(&xprt->timer, |
692 | xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT); | ||
1371 | spin_unlock_bh(&xprt->sock_lock); | 693 | spin_unlock_bh(&xprt->sock_lock); |
1372 | task->tk_rqstp = NULL; | 694 | task->tk_rqstp = NULL; |
1373 | memset(req, 0, sizeof(*req)); /* mark unused */ | 695 | memset(req, 0, sizeof(*req)); /* mark unused */ |
@@ -1381,18 +703,6 @@ xprt_release(struct rpc_task *task) | |||
1381 | } | 703 | } |
1382 | 704 | ||
1383 | /* | 705 | /* |
1384 | * Set default timeout parameters | ||
1385 | */ | ||
1386 | static void | ||
1387 | xprt_default_timeout(struct rpc_timeout *to, int proto) | ||
1388 | { | ||
1389 | if (proto == IPPROTO_UDP) | ||
1390 | xprt_set_timeout(to, 5, 5 * HZ); | ||
1391 | else | ||
1392 | xprt_set_timeout(to, 2, 60 * HZ); | ||
1393 | } | ||
1394 | |||
1395 | /* | ||
1396 | * Set constant timeout | 706 | * Set constant timeout |
1397 | */ | 707 | */ |
1398 | void | 708 | void |
@@ -1405,68 +715,51 @@ xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) | |||
1405 | to->to_exponential = 0; | 715 | to->to_exponential = 0; |
1406 | } | 716 | } |
1407 | 717 | ||
1408 | unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
1409 | unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
1410 | |||
1411 | /* | 718 | /* |
1412 | * Initialize an RPC client | 719 | * Initialize an RPC client |
1413 | */ | 720 | */ |
1414 | static struct rpc_xprt * | 721 | static struct rpc_xprt * |
1415 | xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | 722 | xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) |
1416 | { | 723 | { |
724 | int result; | ||
1417 | struct rpc_xprt *xprt; | 725 | struct rpc_xprt *xprt; |
1418 | unsigned int entries; | ||
1419 | size_t slot_table_size; | ||
1420 | struct rpc_rqst *req; | 726 | struct rpc_rqst *req; |
1421 | 727 | ||
1422 | dprintk("RPC: setting up %s transport...\n", | ||
1423 | proto == IPPROTO_UDP? "UDP" : "TCP"); | ||
1424 | |||
1425 | entries = (proto == IPPROTO_TCP)? | ||
1426 | xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; | ||
1427 | |||
1428 | if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) | 728 | if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) |
1429 | return ERR_PTR(-ENOMEM); | 729 | return ERR_PTR(-ENOMEM); |
1430 | memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ | 730 | memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ |
1431 | xprt->max_reqs = entries; | ||
1432 | slot_table_size = entries * sizeof(xprt->slot[0]); | ||
1433 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
1434 | if (xprt->slot == NULL) { | ||
1435 | kfree(xprt); | ||
1436 | return ERR_PTR(-ENOMEM); | ||
1437 | } | ||
1438 | memset(xprt->slot, 0, slot_table_size); | ||
1439 | 731 | ||
1440 | xprt->addr = *ap; | 732 | xprt->addr = *ap; |
1441 | xprt->prot = proto; | 733 | |
1442 | xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; | 734 | switch (proto) { |
1443 | if (xprt->stream) { | 735 | case IPPROTO_UDP: |
1444 | xprt->cwnd = RPC_MAXCWND(xprt); | 736 | result = xs_setup_udp(xprt, to); |
1445 | xprt->nocong = 1; | 737 | break; |
1446 | xprt->max_payload = (1U << 31) - 1; | 738 | case IPPROTO_TCP: |
1447 | } else { | 739 | result = xs_setup_tcp(xprt, to); |
1448 | xprt->cwnd = RPC_INITCWND; | 740 | break; |
1449 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); | 741 | default: |
742 | printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", | ||
743 | proto); | ||
744 | result = -EIO; | ||
745 | break; | ||
746 | } | ||
747 | if (result) { | ||
748 | kfree(xprt); | ||
749 | return ERR_PTR(result); | ||
1450 | } | 750 | } |
751 | |||
1451 | spin_lock_init(&xprt->sock_lock); | 752 | spin_lock_init(&xprt->sock_lock); |
1452 | spin_lock_init(&xprt->xprt_lock); | 753 | spin_lock_init(&xprt->xprt_lock); |
1453 | init_waitqueue_head(&xprt->cong_wait); | 754 | init_waitqueue_head(&xprt->cong_wait); |
1454 | 755 | ||
1455 | INIT_LIST_HEAD(&xprt->free); | 756 | INIT_LIST_HEAD(&xprt->free); |
1456 | INIT_LIST_HEAD(&xprt->recv); | 757 | INIT_LIST_HEAD(&xprt->recv); |
1457 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | ||
1458 | INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); | 758 | INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); |
1459 | init_timer(&xprt->timer); | 759 | init_timer(&xprt->timer); |
1460 | xprt->timer.function = xprt_init_autodisconnect; | 760 | xprt->timer.function = xprt_init_autodisconnect; |
1461 | xprt->timer.data = (unsigned long) xprt; | 761 | xprt->timer.data = (unsigned long) xprt; |
1462 | xprt->last_used = jiffies; | 762 | xprt->last_used = jiffies; |
1463 | xprt->port = XPRT_MAX_RESVPORT; | ||
1464 | |||
1465 | /* Set timeout parameters */ | ||
1466 | if (to) { | ||
1467 | xprt->timeout = *to; | ||
1468 | } else | ||
1469 | xprt_default_timeout(&xprt->timeout, xprt->prot); | ||
1470 | 763 | ||
1471 | rpc_init_wait_queue(&xprt->pending, "xprt_pending"); | 764 | rpc_init_wait_queue(&xprt->pending, "xprt_pending"); |
1472 | rpc_init_wait_queue(&xprt->sending, "xprt_sending"); | 765 | rpc_init_wait_queue(&xprt->sending, "xprt_sending"); |
@@ -1474,14 +767,11 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | |||
1474 | rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); | 767 | rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); |
1475 | 768 | ||
1476 | /* initialize free list */ | 769 | /* initialize free list */ |
1477 | for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) | 770 | for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) |
1478 | list_add(&req->rq_list, &xprt->free); | 771 | list_add(&req->rq_list, &xprt->free); |
1479 | 772 | ||
1480 | xprt_init_xid(xprt); | 773 | xprt_init_xid(xprt); |
1481 | 774 | ||
1482 | /* Check whether we want to use a reserved port */ | ||
1483 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
1484 | |||
1485 | dprintk("RPC: created transport %p with %u slots\n", xprt, | 775 | dprintk("RPC: created transport %p with %u slots\n", xprt, |
1486 | xprt->max_reqs); | 776 | xprt->max_reqs); |
1487 | 777 | ||
@@ -1489,120 +779,6 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | |||
1489 | } | 779 | } |
1490 | 780 | ||
1491 | /* | 781 | /* |
1492 | * Bind to a reserved port | ||
1493 | */ | ||
1494 | static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) | ||
1495 | { | ||
1496 | struct sockaddr_in myaddr = { | ||
1497 | .sin_family = AF_INET, | ||
1498 | }; | ||
1499 | int err, port; | ||
1500 | |||
1501 | /* Were we already bound to a given port? Try to reuse it */ | ||
1502 | port = xprt->port; | ||
1503 | do { | ||
1504 | myaddr.sin_port = htons(port); | ||
1505 | err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, | ||
1506 | sizeof(myaddr)); | ||
1507 | if (err == 0) { | ||
1508 | xprt->port = port; | ||
1509 | return 0; | ||
1510 | } | ||
1511 | if (--port == 0) | ||
1512 | port = XPRT_MAX_RESVPORT; | ||
1513 | } while (err == -EADDRINUSE && port != xprt->port); | ||
1514 | |||
1515 | printk("RPC: Can't bind to reserved port (%d).\n", -err); | ||
1516 | return err; | ||
1517 | } | ||
1518 | |||
1519 | static void | ||
1520 | xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) | ||
1521 | { | ||
1522 | struct sock *sk = sock->sk; | ||
1523 | |||
1524 | if (xprt->inet) | ||
1525 | return; | ||
1526 | |||
1527 | write_lock_bh(&sk->sk_callback_lock); | ||
1528 | sk->sk_user_data = xprt; | ||
1529 | xprt->old_data_ready = sk->sk_data_ready; | ||
1530 | xprt->old_state_change = sk->sk_state_change; | ||
1531 | xprt->old_write_space = sk->sk_write_space; | ||
1532 | if (xprt->prot == IPPROTO_UDP) { | ||
1533 | sk->sk_data_ready = udp_data_ready; | ||
1534 | sk->sk_no_check = UDP_CSUM_NORCV; | ||
1535 | xprt_set_connected(xprt); | ||
1536 | } else { | ||
1537 | tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ | ||
1538 | sk->sk_data_ready = tcp_data_ready; | ||
1539 | sk->sk_state_change = tcp_state_change; | ||
1540 | xprt_clear_connected(xprt); | ||
1541 | } | ||
1542 | sk->sk_write_space = xprt_write_space; | ||
1543 | |||
1544 | /* Reset to new socket */ | ||
1545 | xprt->sock = sock; | ||
1546 | xprt->inet = sk; | ||
1547 | write_unlock_bh(&sk->sk_callback_lock); | ||
1548 | |||
1549 | return; | ||
1550 | } | ||
1551 | |||
1552 | /* | ||
1553 | * Set socket buffer length | ||
1554 | */ | ||
1555 | void | ||
1556 | xprt_sock_setbufsize(struct rpc_xprt *xprt) | ||
1557 | { | ||
1558 | struct sock *sk = xprt->inet; | ||
1559 | |||
1560 | if (xprt->stream) | ||
1561 | return; | ||
1562 | if (xprt->rcvsize) { | ||
1563 | sk->sk_userlocks |= SOCK_RCVBUF_LOCK; | ||
1564 | sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; | ||
1565 | } | ||
1566 | if (xprt->sndsize) { | ||
1567 | sk->sk_userlocks |= SOCK_SNDBUF_LOCK; | ||
1568 | sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; | ||
1569 | sk->sk_write_space(sk); | ||
1570 | } | ||
1571 | } | ||
1572 | |||
1573 | /* | ||
1574 | * Datastream sockets are created here, but xprt_connect will create | ||
1575 | * and connect stream sockets. | ||
1576 | */ | ||
1577 | static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) | ||
1578 | { | ||
1579 | struct socket *sock; | ||
1580 | int type, err; | ||
1581 | |||
1582 | dprintk("RPC: xprt_create_socket(%s %d)\n", | ||
1583 | (proto == IPPROTO_UDP)? "udp" : "tcp", proto); | ||
1584 | |||
1585 | type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; | ||
1586 | |||
1587 | if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { | ||
1588 | printk("RPC: can't create socket (%d).\n", -err); | ||
1589 | return NULL; | ||
1590 | } | ||
1591 | |||
1592 | /* If the caller has the capability, bind to a reserved port */ | ||
1593 | if (resvport && xprt_bindresvport(xprt, sock) < 0) { | ||
1594 | printk("RPC: can't bind to reserved port.\n"); | ||
1595 | goto failed; | ||
1596 | } | ||
1597 | |||
1598 | return sock; | ||
1599 | |||
1600 | failed: | ||
1601 | sock_release(sock); | ||
1602 | return NULL; | ||
1603 | } | ||
1604 | |||
1605 | /* | ||
1606 | * Create an RPC client transport given the protocol and peer address. | 782 | * Create an RPC client transport given the protocol and peer address. |
1607 | */ | 783 | */ |
1608 | struct rpc_xprt * | 784 | struct rpc_xprt * |
@@ -1631,10 +807,6 @@ xprt_shutdown(struct rpc_xprt *xprt) | |||
1631 | rpc_wake_up(&xprt->backlog); | 807 | rpc_wake_up(&xprt->backlog); |
1632 | wake_up(&xprt->cong_wait); | 808 | wake_up(&xprt->cong_wait); |
1633 | del_timer_sync(&xprt->timer); | 809 | del_timer_sync(&xprt->timer); |
1634 | |||
1635 | /* synchronously wait for connect worker to finish */ | ||
1636 | cancel_delayed_work(&xprt->sock_connect); | ||
1637 | flush_scheduled_work(); | ||
1638 | } | 810 | } |
1639 | 811 | ||
1640 | /* | 812 | /* |
@@ -1655,9 +827,7 @@ xprt_destroy(struct rpc_xprt *xprt) | |||
1655 | { | 827 | { |
1656 | dprintk("RPC: destroying transport %p\n", xprt); | 828 | dprintk("RPC: destroying transport %p\n", xprt); |
1657 | xprt_shutdown(xprt); | 829 | xprt_shutdown(xprt); |
1658 | xprt_disconnect(xprt); | 830 | xprt->ops->destroy(xprt); |
1659 | xprt_close(xprt); | ||
1660 | kfree(xprt->slot); | ||
1661 | kfree(xprt); | 831 | kfree(xprt); |
1662 | 832 | ||
1663 | return 0; | 833 | return 0; |