diff options
author | Chuck Lever <cel@citi.umich.edu> | 2005-08-11 16:25:23 -0400 |
---|---|---|
committer | Trond Myklebust <Trond.Myklebust@netapp.com> | 2005-09-23 12:38:12 -0400 |
commit | a246b0105bbd9a70a698f69baae2042996f2a0e9 (patch) | |
tree | 6c8831d8579a7fdc5201d3e9c20270cb1420eeda /net/sunrpc | |
parent | 094bb20b9fcab3a1652a77741caba6b78097d622 (diff) |
[PATCH] RPC: introduce client-side transport switch
Move the bulk of client-side socket-specific code into a separate source
file, net/sunrpc/xprtsock.c.
Test-plan:
Millions of fsx operations. Performance characterization such as "sio" or
"iozone". Destructive testing (unplugging the network temporarily, server
reboots). Connectathon with v2, v3, and v4.
Version: Thu, 11 Aug 2005 16:03:38 -0400
Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 3 | ||||
-rw-r--r-- | net/sunrpc/sysctl.c | 3 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 102 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 916 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 1021 |
6 files changed, 1070 insertions, 977 deletions
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index f0a955627177..cdcab9ca4c60 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile | |||
@@ -6,7 +6,7 @@ | |||
6 | obj-$(CONFIG_SUNRPC) += sunrpc.o | 6 | obj-$(CONFIG_SUNRPC) += sunrpc.o |
7 | obj-$(CONFIG_SUNRPC_GSS) += auth_gss/ | 7 | obj-$(CONFIG_SUNRPC_GSS) += auth_gss/ |
8 | 8 | ||
9 | sunrpc-y := clnt.o xprt.o socklib.o sched.o \ | 9 | sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ |
10 | auth.o auth_null.o auth_unix.o \ | 10 | auth.o auth_null.o auth_unix.o \ |
11 | svc.o svcsock.o svcauth.o svcauth_unix.o \ | 11 | svc.o svcsock.o svcauth.o svcauth_unix.o \ |
12 | pmap_clnt.o timer.o xdr.o \ | 12 | pmap_clnt.o timer.o xdr.o \ |
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 2d3cf0a52d82..ab50c3c9e6a8 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c | |||
@@ -525,8 +525,7 @@ rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize | |||
525 | xprt->rcvsize = 0; | 525 | xprt->rcvsize = 0; |
526 | if (rcvsize) | 526 | if (rcvsize) |
527 | xprt->rcvsize = rcvsize + RPC_SLACK_SPACE; | 527 | xprt->rcvsize = rcvsize + RPC_SLACK_SPACE; |
528 | if (xprt_connected(xprt)) | 528 | xprt->ops->set_buffer_size(xprt); |
529 | xprt_sock_setbufsize(xprt); | ||
530 | } | 529 | } |
531 | 530 | ||
532 | /* | 531 | /* |
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c index 1b9616a12e24..ef483262f17f 100644 --- a/net/sunrpc/sysctl.c +++ b/net/sunrpc/sysctl.c | |||
@@ -119,6 +119,9 @@ done: | |||
119 | return 0; | 119 | return 0; |
120 | } | 120 | } |
121 | 121 | ||
122 | unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
123 | unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
124 | |||
122 | static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; | 125 | static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; |
123 | static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; | 126 | static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; |
124 | 127 | ||
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 9cc12aeed22c..32df43372ee9 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c | |||
@@ -6,15 +6,12 @@ | |||
6 | * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de> | 6 | * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de> |
7 | */ | 7 | */ |
8 | 8 | ||
9 | #include <linux/module.h> | ||
9 | #include <linux/types.h> | 10 | #include <linux/types.h> |
10 | #include <linux/socket.h> | ||
11 | #include <linux/string.h> | 11 | #include <linux/string.h> |
12 | #include <linux/kernel.h> | 12 | #include <linux/kernel.h> |
13 | #include <linux/pagemap.h> | 13 | #include <linux/pagemap.h> |
14 | #include <linux/errno.h> | 14 | #include <linux/errno.h> |
15 | #include <linux/in.h> | ||
16 | #include <linux/net.h> | ||
17 | #include <net/sock.h> | ||
18 | #include <linux/sunrpc/xdr.h> | 15 | #include <linux/sunrpc/xdr.h> |
19 | #include <linux/sunrpc/msg_prot.h> | 16 | #include <linux/sunrpc/msg_prot.h> |
20 | 17 | ||
@@ -177,103 +174,6 @@ xdr_inline_pages(struct xdr_buf *xdr, unsigned int offset, | |||
177 | } | 174 | } |
178 | 175 | ||
179 | 176 | ||
180 | int | ||
181 | xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, | ||
182 | struct xdr_buf *xdr, unsigned int base, int msgflags) | ||
183 | { | ||
184 | struct page **ppage = xdr->pages; | ||
185 | unsigned int len, pglen = xdr->page_len; | ||
186 | int err, ret = 0; | ||
187 | ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); | ||
188 | |||
189 | len = xdr->head[0].iov_len; | ||
190 | if (base < len || (addr != NULL && base == 0)) { | ||
191 | struct kvec iov = { | ||
192 | .iov_base = xdr->head[0].iov_base + base, | ||
193 | .iov_len = len - base, | ||
194 | }; | ||
195 | struct msghdr msg = { | ||
196 | .msg_name = addr, | ||
197 | .msg_namelen = addrlen, | ||
198 | .msg_flags = msgflags, | ||
199 | }; | ||
200 | if (xdr->len > len) | ||
201 | msg.msg_flags |= MSG_MORE; | ||
202 | |||
203 | if (iov.iov_len != 0) | ||
204 | err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); | ||
205 | else | ||
206 | err = kernel_sendmsg(sock, &msg, NULL, 0, 0); | ||
207 | if (ret == 0) | ||
208 | ret = err; | ||
209 | else if (err > 0) | ||
210 | ret += err; | ||
211 | if (err != iov.iov_len) | ||
212 | goto out; | ||
213 | base = 0; | ||
214 | } else | ||
215 | base -= len; | ||
216 | |||
217 | if (pglen == 0) | ||
218 | goto copy_tail; | ||
219 | if (base >= pglen) { | ||
220 | base -= pglen; | ||
221 | goto copy_tail; | ||
222 | } | ||
223 | if (base || xdr->page_base) { | ||
224 | pglen -= base; | ||
225 | base += xdr->page_base; | ||
226 | ppage += base >> PAGE_CACHE_SHIFT; | ||
227 | base &= ~PAGE_CACHE_MASK; | ||
228 | } | ||
229 | |||
230 | sendpage = sock->ops->sendpage ? : sock_no_sendpage; | ||
231 | do { | ||
232 | int flags = msgflags; | ||
233 | |||
234 | len = PAGE_CACHE_SIZE; | ||
235 | if (base) | ||
236 | len -= base; | ||
237 | if (pglen < len) | ||
238 | len = pglen; | ||
239 | |||
240 | if (pglen != len || xdr->tail[0].iov_len != 0) | ||
241 | flags |= MSG_MORE; | ||
242 | |||
243 | /* Hmm... We might be dealing with highmem pages */ | ||
244 | if (PageHighMem(*ppage)) | ||
245 | sendpage = sock_no_sendpage; | ||
246 | err = sendpage(sock, *ppage, base, len, flags); | ||
247 | if (ret == 0) | ||
248 | ret = err; | ||
249 | else if (err > 0) | ||
250 | ret += err; | ||
251 | if (err != len) | ||
252 | goto out; | ||
253 | base = 0; | ||
254 | ppage++; | ||
255 | } while ((pglen -= len) != 0); | ||
256 | copy_tail: | ||
257 | len = xdr->tail[0].iov_len; | ||
258 | if (base < len) { | ||
259 | struct kvec iov = { | ||
260 | .iov_base = xdr->tail[0].iov_base + base, | ||
261 | .iov_len = len - base, | ||
262 | }; | ||
263 | struct msghdr msg = { | ||
264 | .msg_flags = msgflags, | ||
265 | }; | ||
266 | err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); | ||
267 | if (ret == 0) | ||
268 | ret = err; | ||
269 | else if (err > 0) | ||
270 | ret += err; | ||
271 | } | ||
272 | out: | ||
273 | return ret; | ||
274 | } | ||
275 | |||
276 | |||
277 | /* | 177 | /* |
278 | * Helper routines for doing 'memmove' like operations on a struct xdr_buf | 178 | * Helper routines for doing 'memmove' like operations on a struct xdr_buf |
279 | * | 179 | * |
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 67444f494fea..4342acf4d1cd 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c | |||
@@ -32,37 +32,16 @@ | |||
32 | * tasks that rely on callbacks. | 32 | * tasks that rely on callbacks. |
33 | * | 33 | * |
34 | * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> | 34 | * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> |
35 | * | ||
36 | * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
37 | * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
38 | * TCP NFS related read + write fixes | ||
39 | * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> | ||
40 | * | ||
41 | * Rewrite of larges part of the code in order to stabilize TCP stuff. | ||
42 | * Fix behaviour when socket buffer is full. | ||
43 | * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> | ||
44 | */ | 35 | */ |
45 | 36 | ||
37 | #include <linux/module.h> | ||
38 | |||
46 | #include <linux/types.h> | 39 | #include <linux/types.h> |
47 | #include <linux/slab.h> | 40 | #include <linux/interrupt.h> |
48 | #include <linux/capability.h> | ||
49 | #include <linux/sched.h> | ||
50 | #include <linux/errno.h> | ||
51 | #include <linux/socket.h> | ||
52 | #include <linux/in.h> | ||
53 | #include <linux/net.h> | ||
54 | #include <linux/mm.h> | ||
55 | #include <linux/udp.h> | ||
56 | #include <linux/tcp.h> | ||
57 | #include <linux/sunrpc/clnt.h> | ||
58 | #include <linux/file.h> | ||
59 | #include <linux/workqueue.h> | 41 | #include <linux/workqueue.h> |
60 | #include <linux/random.h> | 42 | #include <linux/random.h> |
61 | 43 | ||
62 | #include <net/sock.h> | 44 | #include <linux/sunrpc/clnt.h> |
63 | #include <net/checksum.h> | ||
64 | #include <net/udp.h> | ||
65 | #include <net/tcp.h> | ||
66 | 45 | ||
67 | /* | 46 | /* |
68 | * Local variables | 47 | * Local variables |
@@ -74,64 +53,17 @@ | |||
74 | #endif | 53 | #endif |
75 | 54 | ||
76 | #define XPRT_MAX_BACKOFF (8) | 55 | #define XPRT_MAX_BACKOFF (8) |
77 | #define XPRT_IDLE_TIMEOUT (5*60*HZ) | ||
78 | #define XPRT_MAX_RESVPORT (800) | ||
79 | 56 | ||
80 | /* | 57 | /* |
81 | * Local functions | 58 | * Local functions |
82 | */ | 59 | */ |
83 | static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); | 60 | static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); |
84 | static inline void do_xprt_reserve(struct rpc_task *); | 61 | static inline void do_xprt_reserve(struct rpc_task *); |
85 | static void xprt_disconnect(struct rpc_xprt *); | ||
86 | static void xprt_connect_status(struct rpc_task *task); | 62 | static void xprt_connect_status(struct rpc_task *task); |
87 | static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, | ||
88 | struct rpc_timeout *to); | ||
89 | static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); | ||
90 | static void xprt_bind_socket(struct rpc_xprt *, struct socket *); | ||
91 | static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); | 63 | static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); |
92 | 64 | ||
93 | static int xprt_clear_backlog(struct rpc_xprt *xprt); | 65 | static int xprt_clear_backlog(struct rpc_xprt *xprt); |
94 | 66 | ||
95 | #ifdef RPC_DEBUG_DATA | ||
96 | /* | ||
97 | * Print the buffer contents (first 128 bytes only--just enough for | ||
98 | * diropres return). | ||
99 | */ | ||
100 | static void | ||
101 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
102 | { | ||
103 | u8 *buf = (u8 *) packet; | ||
104 | int j; | ||
105 | |||
106 | dprintk("RPC: %s\n", msg); | ||
107 | for (j = 0; j < count && j < 128; j += 4) { | ||
108 | if (!(j & 31)) { | ||
109 | if (j) | ||
110 | dprintk("\n"); | ||
111 | dprintk("0x%04x ", j); | ||
112 | } | ||
113 | dprintk("%02x%02x%02x%02x ", | ||
114 | buf[j], buf[j+1], buf[j+2], buf[j+3]); | ||
115 | } | ||
116 | dprintk("\n"); | ||
117 | } | ||
118 | #else | ||
119 | static inline void | ||
120 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
121 | { | ||
122 | /* NOP */ | ||
123 | } | ||
124 | #endif | ||
125 | |||
126 | /* | ||
127 | * Look up RPC transport given an INET socket | ||
128 | */ | ||
129 | static inline struct rpc_xprt * | ||
130 | xprt_from_sock(struct sock *sk) | ||
131 | { | ||
132 | return (struct rpc_xprt *) sk->sk_user_data; | ||
133 | } | ||
134 | |||
135 | /* | 67 | /* |
136 | * Serialize write access to sockets, in order to prevent different | 68 | * Serialize write access to sockets, in order to prevent different |
137 | * requests from interfering with each other. | 69 | * requests from interfering with each other. |
@@ -235,62 +167,6 @@ xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) | |||
235 | } | 167 | } |
236 | 168 | ||
237 | /* | 169 | /* |
238 | * Write data to socket. | ||
239 | */ | ||
240 | static inline int | ||
241 | xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) | ||
242 | { | ||
243 | struct socket *sock = xprt->sock; | ||
244 | struct xdr_buf *xdr = &req->rq_snd_buf; | ||
245 | struct sockaddr *addr = NULL; | ||
246 | int addrlen = 0; | ||
247 | unsigned int skip; | ||
248 | int result; | ||
249 | |||
250 | if (!sock) | ||
251 | return -ENOTCONN; | ||
252 | |||
253 | xprt_pktdump("packet data:", | ||
254 | req->rq_svec->iov_base, | ||
255 | req->rq_svec->iov_len); | ||
256 | |||
257 | /* For UDP, we need to provide an address */ | ||
258 | if (!xprt->stream) { | ||
259 | addr = (struct sockaddr *) &xprt->addr; | ||
260 | addrlen = sizeof(xprt->addr); | ||
261 | } | ||
262 | /* Dont repeat bytes */ | ||
263 | skip = req->rq_bytes_sent; | ||
264 | |||
265 | clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); | ||
266 | result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); | ||
267 | |||
268 | dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); | ||
269 | |||
270 | if (result >= 0) | ||
271 | return result; | ||
272 | |||
273 | switch (result) { | ||
274 | case -ECONNREFUSED: | ||
275 | /* When the server has died, an ICMP port unreachable message | ||
276 | * prompts ECONNREFUSED. | ||
277 | */ | ||
278 | case -EAGAIN: | ||
279 | break; | ||
280 | case -ECONNRESET: | ||
281 | case -ENOTCONN: | ||
282 | case -EPIPE: | ||
283 | /* connection broken */ | ||
284 | if (xprt->stream) | ||
285 | result = -ENOTCONN; | ||
286 | break; | ||
287 | default: | ||
288 | printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); | ||
289 | } | ||
290 | return result; | ||
291 | } | ||
292 | |||
293 | /* | ||
294 | * Van Jacobson congestion avoidance. Check if the congestion window | 170 | * Van Jacobson congestion avoidance. Check if the congestion window |
295 | * overflowed. Put the task to sleep if this is the case. | 171 | * overflowed. Put the task to sleep if this is the case. |
296 | */ | 172 | */ |
@@ -405,48 +281,20 @@ int xprt_adjust_timeout(struct rpc_rqst *req) | |||
405 | return status; | 281 | return status; |
406 | } | 282 | } |
407 | 283 | ||
408 | /* | ||
409 | * Close down a transport socket | ||
410 | */ | ||
411 | static void | ||
412 | xprt_close(struct rpc_xprt *xprt) | ||
413 | { | ||
414 | struct socket *sock = xprt->sock; | ||
415 | struct sock *sk = xprt->inet; | ||
416 | |||
417 | if (!sk) | ||
418 | return; | ||
419 | |||
420 | write_lock_bh(&sk->sk_callback_lock); | ||
421 | xprt->inet = NULL; | ||
422 | xprt->sock = NULL; | ||
423 | |||
424 | sk->sk_user_data = NULL; | ||
425 | sk->sk_data_ready = xprt->old_data_ready; | ||
426 | sk->sk_state_change = xprt->old_state_change; | ||
427 | sk->sk_write_space = xprt->old_write_space; | ||
428 | write_unlock_bh(&sk->sk_callback_lock); | ||
429 | |||
430 | sk->sk_no_check = 0; | ||
431 | |||
432 | sock_release(sock); | ||
433 | } | ||
434 | |||
435 | static void | 284 | static void |
436 | xprt_socket_autoclose(void *args) | 285 | xprt_socket_autoclose(void *args) |
437 | { | 286 | { |
438 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | 287 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; |
439 | 288 | ||
440 | xprt_disconnect(xprt); | 289 | xprt_disconnect(xprt); |
441 | xprt_close(xprt); | 290 | xprt->ops->close(xprt); |
442 | xprt_release_write(xprt, NULL); | 291 | xprt_release_write(xprt, NULL); |
443 | } | 292 | } |
444 | 293 | ||
445 | /* | 294 | /* |
446 | * Mark a transport as disconnected | 295 | * Mark a transport as disconnected |
447 | */ | 296 | */ |
448 | static void | 297 | void xprt_disconnect(struct rpc_xprt *xprt) |
449 | xprt_disconnect(struct rpc_xprt *xprt) | ||
450 | { | 298 | { |
451 | dprintk("RPC: disconnected transport %p\n", xprt); | 299 | dprintk("RPC: disconnected transport %p\n", xprt); |
452 | spin_lock_bh(&xprt->sock_lock); | 300 | spin_lock_bh(&xprt->sock_lock); |
@@ -479,57 +327,6 @@ out_abort: | |||
479 | spin_unlock(&xprt->sock_lock); | 327 | spin_unlock(&xprt->sock_lock); |
480 | } | 328 | } |
481 | 329 | ||
482 | static void xprt_socket_connect(void *args) | ||
483 | { | ||
484 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | ||
485 | struct socket *sock = xprt->sock; | ||
486 | int status = -EIO; | ||
487 | |||
488 | if (xprt->shutdown || xprt->addr.sin_port == 0) | ||
489 | goto out; | ||
490 | |||
491 | /* | ||
492 | * Start by resetting any existing state | ||
493 | */ | ||
494 | xprt_close(xprt); | ||
495 | sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); | ||
496 | if (sock == NULL) { | ||
497 | /* couldn't create socket or bind to reserved port; | ||
498 | * this is likely a permanent error, so cause an abort */ | ||
499 | goto out; | ||
500 | } | ||
501 | xprt_bind_socket(xprt, sock); | ||
502 | xprt_sock_setbufsize(xprt); | ||
503 | |||
504 | status = 0; | ||
505 | if (!xprt->stream) | ||
506 | goto out; | ||
507 | |||
508 | /* | ||
509 | * Tell the socket layer to start connecting... | ||
510 | */ | ||
511 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, | ||
512 | sizeof(xprt->addr), O_NONBLOCK); | ||
513 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", | ||
514 | xprt, -status, xprt_connected(xprt), sock->sk->sk_state); | ||
515 | if (status < 0) { | ||
516 | switch (status) { | ||
517 | case -EINPROGRESS: | ||
518 | case -EALREADY: | ||
519 | goto out_clear; | ||
520 | } | ||
521 | } | ||
522 | out: | ||
523 | if (status < 0) | ||
524 | rpc_wake_up_status(&xprt->pending, status); | ||
525 | else | ||
526 | rpc_wake_up(&xprt->pending); | ||
527 | out_clear: | ||
528 | smp_mb__before_clear_bit(); | ||
529 | clear_bit(XPRT_CONNECTING, &xprt->sockstate); | ||
530 | smp_mb__after_clear_bit(); | ||
531 | } | ||
532 | |||
533 | /* | 330 | /* |
534 | * Attempt to connect a TCP socket. | 331 | * Attempt to connect a TCP socket. |
535 | * | 332 | * |
@@ -552,30 +349,16 @@ void xprt_connect(struct rpc_task *task) | |||
552 | if (!xprt_lock_write(xprt, task)) | 349 | if (!xprt_lock_write(xprt, task)) |
553 | return; | 350 | return; |
554 | if (xprt_connected(xprt)) | 351 | if (xprt_connected(xprt)) |
555 | goto out_write; | 352 | xprt_release_write(xprt, task); |
353 | else { | ||
354 | if (task->tk_rqstp) | ||
355 | task->tk_rqstp->rq_bytes_sent = 0; | ||
556 | 356 | ||
557 | if (task->tk_rqstp) | 357 | task->tk_timeout = RPC_CONNECT_TIMEOUT; |
558 | task->tk_rqstp->rq_bytes_sent = 0; | 358 | rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); |
559 | 359 | xprt->ops->connect(task); | |
560 | task->tk_timeout = RPC_CONNECT_TIMEOUT; | ||
561 | rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); | ||
562 | if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { | ||
563 | /* Note: if we are here due to a dropped connection | ||
564 | * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ | ||
565 | * seconds | ||
566 | */ | ||
567 | if (xprt->sock != NULL) | ||
568 | schedule_delayed_work(&xprt->sock_connect, | ||
569 | RPC_REESTABLISH_TIMEOUT); | ||
570 | else { | ||
571 | schedule_work(&xprt->sock_connect); | ||
572 | if (!RPC_IS_ASYNC(task)) | ||
573 | flush_scheduled_work(); | ||
574 | } | ||
575 | } | 360 | } |
576 | return; | 361 | return; |
577 | out_write: | ||
578 | xprt_release_write(xprt, task); | ||
579 | } | 362 | } |
580 | 363 | ||
581 | /* | 364 | /* |
@@ -624,8 +407,7 @@ xprt_connect_status(struct rpc_task *task) | |||
624 | /* | 407 | /* |
625 | * Look up the RPC request corresponding to a reply, and then lock it. | 408 | * Look up the RPC request corresponding to a reply, and then lock it. |
626 | */ | 409 | */ |
627 | static inline struct rpc_rqst * | 410 | struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) |
628 | xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) | ||
629 | { | 411 | { |
630 | struct list_head *pos; | 412 | struct list_head *pos; |
631 | struct rpc_rqst *req = NULL; | 413 | struct rpc_rqst *req = NULL; |
@@ -644,8 +426,7 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) | |||
644 | * Complete reply received. | 426 | * Complete reply received. |
645 | * The TCP code relies on us to remove the request from xprt->pending. | 427 | * The TCP code relies on us to remove the request from xprt->pending. |
646 | */ | 428 | */ |
647 | static void | 429 | void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) |
648 | xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) | ||
649 | { | 430 | { |
650 | struct rpc_task *task = req->rq_task; | 431 | struct rpc_task *task = req->rq_task; |
651 | struct rpc_clnt *clnt = task->tk_client; | 432 | struct rpc_clnt *clnt = task->tk_client; |
@@ -692,409 +473,6 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) | |||
692 | } | 473 | } |
693 | 474 | ||
694 | /* | 475 | /* |
695 | * Input handler for RPC replies. Called from a bottom half and hence | ||
696 | * atomic. | ||
697 | */ | ||
698 | static void | ||
699 | udp_data_ready(struct sock *sk, int len) | ||
700 | { | ||
701 | struct rpc_task *task; | ||
702 | struct rpc_xprt *xprt; | ||
703 | struct rpc_rqst *rovr; | ||
704 | struct sk_buff *skb; | ||
705 | int err, repsize, copied; | ||
706 | u32 _xid, *xp; | ||
707 | |||
708 | read_lock(&sk->sk_callback_lock); | ||
709 | dprintk("RPC: udp_data_ready...\n"); | ||
710 | if (!(xprt = xprt_from_sock(sk))) { | ||
711 | printk("RPC: udp_data_ready request not found!\n"); | ||
712 | goto out; | ||
713 | } | ||
714 | |||
715 | dprintk("RPC: udp_data_ready client %p\n", xprt); | ||
716 | |||
717 | if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) | ||
718 | goto out; | ||
719 | |||
720 | if (xprt->shutdown) | ||
721 | goto dropit; | ||
722 | |||
723 | repsize = skb->len - sizeof(struct udphdr); | ||
724 | if (repsize < 4) { | ||
725 | printk("RPC: impossible RPC reply size %d!\n", repsize); | ||
726 | goto dropit; | ||
727 | } | ||
728 | |||
729 | /* Copy the XID from the skb... */ | ||
730 | xp = skb_header_pointer(skb, sizeof(struct udphdr), | ||
731 | sizeof(_xid), &_xid); | ||
732 | if (xp == NULL) | ||
733 | goto dropit; | ||
734 | |||
735 | /* Look up and lock the request corresponding to the given XID */ | ||
736 | spin_lock(&xprt->sock_lock); | ||
737 | rovr = xprt_lookup_rqst(xprt, *xp); | ||
738 | if (!rovr) | ||
739 | goto out_unlock; | ||
740 | task = rovr->rq_task; | ||
741 | |||
742 | dprintk("RPC: %4d received reply\n", task->tk_pid); | ||
743 | |||
744 | if ((copied = rovr->rq_private_buf.buflen) > repsize) | ||
745 | copied = repsize; | ||
746 | |||
747 | /* Suck it into the iovec, verify checksum if not done by hw. */ | ||
748 | if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) | ||
749 | goto out_unlock; | ||
750 | |||
751 | /* Something worked... */ | ||
752 | dst_confirm(skb->dst); | ||
753 | |||
754 | xprt_complete_rqst(xprt, rovr, copied); | ||
755 | |||
756 | out_unlock: | ||
757 | spin_unlock(&xprt->sock_lock); | ||
758 | dropit: | ||
759 | skb_free_datagram(sk, skb); | ||
760 | out: | ||
761 | read_unlock(&sk->sk_callback_lock); | ||
762 | } | ||
763 | |||
764 | /* | ||
765 | * Copy from an skb into memory and shrink the skb. | ||
766 | */ | ||
767 | static inline size_t | ||
768 | tcp_copy_data(skb_reader_t *desc, void *p, size_t len) | ||
769 | { | ||
770 | if (len > desc->count) | ||
771 | len = desc->count; | ||
772 | if (skb_copy_bits(desc->skb, desc->offset, p, len)) { | ||
773 | dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", | ||
774 | len, desc->count); | ||
775 | return 0; | ||
776 | } | ||
777 | desc->offset += len; | ||
778 | desc->count -= len; | ||
779 | dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", | ||
780 | len, desc->count); | ||
781 | return len; | ||
782 | } | ||
783 | |||
784 | /* | ||
785 | * TCP read fragment marker | ||
786 | */ | ||
787 | static inline void | ||
788 | tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
789 | { | ||
790 | size_t len, used; | ||
791 | char *p; | ||
792 | |||
793 | p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; | ||
794 | len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; | ||
795 | used = tcp_copy_data(desc, p, len); | ||
796 | xprt->tcp_offset += used; | ||
797 | if (used != len) | ||
798 | return; | ||
799 | xprt->tcp_reclen = ntohl(xprt->tcp_recm); | ||
800 | if (xprt->tcp_reclen & 0x80000000) | ||
801 | xprt->tcp_flags |= XPRT_LAST_FRAG; | ||
802 | else | ||
803 | xprt->tcp_flags &= ~XPRT_LAST_FRAG; | ||
804 | xprt->tcp_reclen &= 0x7fffffff; | ||
805 | xprt->tcp_flags &= ~XPRT_COPY_RECM; | ||
806 | xprt->tcp_offset = 0; | ||
807 | /* Sanity check of the record length */ | ||
808 | if (xprt->tcp_reclen < 4) { | ||
809 | printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); | ||
810 | xprt_disconnect(xprt); | ||
811 | } | ||
812 | dprintk("RPC: reading TCP record fragment of length %d\n", | ||
813 | xprt->tcp_reclen); | ||
814 | } | ||
815 | |||
816 | static void | ||
817 | tcp_check_recm(struct rpc_xprt *xprt) | ||
818 | { | ||
819 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", | ||
820 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); | ||
821 | if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
822 | xprt->tcp_flags |= XPRT_COPY_RECM; | ||
823 | xprt->tcp_offset = 0; | ||
824 | if (xprt->tcp_flags & XPRT_LAST_FRAG) { | ||
825 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
826 | xprt->tcp_flags |= XPRT_COPY_XID; | ||
827 | xprt->tcp_copied = 0; | ||
828 | } | ||
829 | } | ||
830 | } | ||
831 | |||
832 | /* | ||
833 | * TCP read xid | ||
834 | */ | ||
835 | static inline void | ||
836 | tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
837 | { | ||
838 | size_t len, used; | ||
839 | char *p; | ||
840 | |||
841 | len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; | ||
842 | dprintk("RPC: reading XID (%Zu bytes)\n", len); | ||
843 | p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; | ||
844 | used = tcp_copy_data(desc, p, len); | ||
845 | xprt->tcp_offset += used; | ||
846 | if (used != len) | ||
847 | return; | ||
848 | xprt->tcp_flags &= ~XPRT_COPY_XID; | ||
849 | xprt->tcp_flags |= XPRT_COPY_DATA; | ||
850 | xprt->tcp_copied = 4; | ||
851 | dprintk("RPC: reading reply for XID %08x\n", | ||
852 | ntohl(xprt->tcp_xid)); | ||
853 | tcp_check_recm(xprt); | ||
854 | } | ||
855 | |||
856 | /* | ||
857 | * TCP read and complete request | ||
858 | */ | ||
859 | static inline void | ||
860 | tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
861 | { | ||
862 | struct rpc_rqst *req; | ||
863 | struct xdr_buf *rcvbuf; | ||
864 | size_t len; | ||
865 | ssize_t r; | ||
866 | |||
867 | /* Find and lock the request corresponding to this xid */ | ||
868 | spin_lock(&xprt->sock_lock); | ||
869 | req = xprt_lookup_rqst(xprt, xprt->tcp_xid); | ||
870 | if (!req) { | ||
871 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
872 | dprintk("RPC: XID %08x request not found!\n", | ||
873 | ntohl(xprt->tcp_xid)); | ||
874 | spin_unlock(&xprt->sock_lock); | ||
875 | return; | ||
876 | } | ||
877 | |||
878 | rcvbuf = &req->rq_private_buf; | ||
879 | len = desc->count; | ||
880 | if (len > xprt->tcp_reclen - xprt->tcp_offset) { | ||
881 | skb_reader_t my_desc; | ||
882 | |||
883 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
884 | memcpy(&my_desc, desc, sizeof(my_desc)); | ||
885 | my_desc.count = len; | ||
886 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
887 | &my_desc, tcp_copy_data); | ||
888 | desc->count -= r; | ||
889 | desc->offset += r; | ||
890 | } else | ||
891 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
892 | desc, tcp_copy_data); | ||
893 | |||
894 | if (r > 0) { | ||
895 | xprt->tcp_copied += r; | ||
896 | xprt->tcp_offset += r; | ||
897 | } | ||
898 | if (r != len) { | ||
899 | /* Error when copying to the receive buffer, | ||
900 | * usually because we weren't able to allocate | ||
901 | * additional buffer pages. All we can do now | ||
902 | * is turn off XPRT_COPY_DATA, so the request | ||
903 | * will not receive any additional updates, | ||
904 | * and time out. | ||
905 | * Any remaining data from this record will | ||
906 | * be discarded. | ||
907 | */ | ||
908 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
909 | dprintk("RPC: XID %08x truncated request\n", | ||
910 | ntohl(xprt->tcp_xid)); | ||
911 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
912 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
913 | goto out; | ||
914 | } | ||
915 | |||
916 | dprintk("RPC: XID %08x read %Zd bytes\n", | ||
917 | ntohl(xprt->tcp_xid), r); | ||
918 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
919 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
920 | |||
921 | if (xprt->tcp_copied == req->rq_private_buf.buflen) | ||
922 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
923 | else if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
924 | if (xprt->tcp_flags & XPRT_LAST_FRAG) | ||
925 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
926 | } | ||
927 | |||
928 | out: | ||
929 | if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { | ||
930 | dprintk("RPC: %4d received reply complete\n", | ||
931 | req->rq_task->tk_pid); | ||
932 | xprt_complete_rqst(xprt, req, xprt->tcp_copied); | ||
933 | } | ||
934 | spin_unlock(&xprt->sock_lock); | ||
935 | tcp_check_recm(xprt); | ||
936 | } | ||
937 | |||
938 | /* | ||
939 | * TCP discard extra bytes from a short read | ||
940 | */ | ||
941 | static inline void | ||
942 | tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
943 | { | ||
944 | size_t len; | ||
945 | |||
946 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
947 | if (len > desc->count) | ||
948 | len = desc->count; | ||
949 | desc->count -= len; | ||
950 | desc->offset += len; | ||
951 | xprt->tcp_offset += len; | ||
952 | dprintk("RPC: discarded %Zu bytes\n", len); | ||
953 | tcp_check_recm(xprt); | ||
954 | } | ||
955 | |||
956 | /* | ||
957 | * TCP record receive routine | ||
958 | * We first have to grab the record marker, then the XID, then the data. | ||
959 | */ | ||
960 | static int | ||
961 | tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, | ||
962 | unsigned int offset, size_t len) | ||
963 | { | ||
964 | struct rpc_xprt *xprt = rd_desc->arg.data; | ||
965 | skb_reader_t desc = { | ||
966 | .skb = skb, | ||
967 | .offset = offset, | ||
968 | .count = len, | ||
969 | .csum = 0 | ||
970 | }; | ||
971 | |||
972 | dprintk("RPC: tcp_data_recv\n"); | ||
973 | do { | ||
974 | /* Read in a new fragment marker if necessary */ | ||
975 | /* Can we ever really expect to get completely empty fragments? */ | ||
976 | if (xprt->tcp_flags & XPRT_COPY_RECM) { | ||
977 | tcp_read_fraghdr(xprt, &desc); | ||
978 | continue; | ||
979 | } | ||
980 | /* Read in the xid if necessary */ | ||
981 | if (xprt->tcp_flags & XPRT_COPY_XID) { | ||
982 | tcp_read_xid(xprt, &desc); | ||
983 | continue; | ||
984 | } | ||
985 | /* Read in the request data */ | ||
986 | if (xprt->tcp_flags & XPRT_COPY_DATA) { | ||
987 | tcp_read_request(xprt, &desc); | ||
988 | continue; | ||
989 | } | ||
990 | /* Skip over any trailing bytes on short reads */ | ||
991 | tcp_read_discard(xprt, &desc); | ||
992 | } while (desc.count); | ||
993 | dprintk("RPC: tcp_data_recv done\n"); | ||
994 | return len - desc.count; | ||
995 | } | ||
996 | |||
997 | static void tcp_data_ready(struct sock *sk, int bytes) | ||
998 | { | ||
999 | struct rpc_xprt *xprt; | ||
1000 | read_descriptor_t rd_desc; | ||
1001 | |||
1002 | read_lock(&sk->sk_callback_lock); | ||
1003 | dprintk("RPC: tcp_data_ready...\n"); | ||
1004 | if (!(xprt = xprt_from_sock(sk))) { | ||
1005 | printk("RPC: tcp_data_ready socket info not found!\n"); | ||
1006 | goto out; | ||
1007 | } | ||
1008 | if (xprt->shutdown) | ||
1009 | goto out; | ||
1010 | |||
1011 | /* We use rd_desc to pass struct xprt to tcp_data_recv */ | ||
1012 | rd_desc.arg.data = xprt; | ||
1013 | rd_desc.count = 65536; | ||
1014 | tcp_read_sock(sk, &rd_desc, tcp_data_recv); | ||
1015 | out: | ||
1016 | read_unlock(&sk->sk_callback_lock); | ||
1017 | } | ||
1018 | |||
1019 | static void | ||
1020 | tcp_state_change(struct sock *sk) | ||
1021 | { | ||
1022 | struct rpc_xprt *xprt; | ||
1023 | |||
1024 | read_lock(&sk->sk_callback_lock); | ||
1025 | if (!(xprt = xprt_from_sock(sk))) | ||
1026 | goto out; | ||
1027 | dprintk("RPC: tcp_state_change client %p...\n", xprt); | ||
1028 | dprintk("RPC: state %x conn %d dead %d zapped %d\n", | ||
1029 | sk->sk_state, xprt_connected(xprt), | ||
1030 | sock_flag(sk, SOCK_DEAD), | ||
1031 | sock_flag(sk, SOCK_ZAPPED)); | ||
1032 | |||
1033 | switch (sk->sk_state) { | ||
1034 | case TCP_ESTABLISHED: | ||
1035 | spin_lock_bh(&xprt->sock_lock); | ||
1036 | if (!xprt_test_and_set_connected(xprt)) { | ||
1037 | /* Reset TCP record info */ | ||
1038 | xprt->tcp_offset = 0; | ||
1039 | xprt->tcp_reclen = 0; | ||
1040 | xprt->tcp_copied = 0; | ||
1041 | xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; | ||
1042 | rpc_wake_up(&xprt->pending); | ||
1043 | } | ||
1044 | spin_unlock_bh(&xprt->sock_lock); | ||
1045 | break; | ||
1046 | case TCP_SYN_SENT: | ||
1047 | case TCP_SYN_RECV: | ||
1048 | break; | ||
1049 | default: | ||
1050 | xprt_disconnect(xprt); | ||
1051 | break; | ||
1052 | } | ||
1053 | out: | ||
1054 | read_unlock(&sk->sk_callback_lock); | ||
1055 | } | ||
1056 | |||
1057 | /* | ||
1058 | * Called when more output buffer space is available for this socket. | ||
1059 | * We try not to wake our writers until they can make "significant" | ||
1060 | * progress, otherwise we'll waste resources thrashing sock_sendmsg | ||
1061 | * with a bunch of small requests. | ||
1062 | */ | ||
1063 | static void | ||
1064 | xprt_write_space(struct sock *sk) | ||
1065 | { | ||
1066 | struct rpc_xprt *xprt; | ||
1067 | struct socket *sock; | ||
1068 | |||
1069 | read_lock(&sk->sk_callback_lock); | ||
1070 | if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) | ||
1071 | goto out; | ||
1072 | if (xprt->shutdown) | ||
1073 | goto out; | ||
1074 | |||
1075 | /* Wait until we have enough socket memory */ | ||
1076 | if (xprt->stream) { | ||
1077 | /* from net/core/stream.c:sk_stream_write_space */ | ||
1078 | if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) | ||
1079 | goto out; | ||
1080 | } else { | ||
1081 | /* from net/core/sock.c:sock_def_write_space */ | ||
1082 | if (!sock_writeable(sk)) | ||
1083 | goto out; | ||
1084 | } | ||
1085 | |||
1086 | if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) | ||
1087 | goto out; | ||
1088 | |||
1089 | spin_lock_bh(&xprt->sock_lock); | ||
1090 | if (xprt->snd_task) | ||
1091 | rpc_wake_up_task(xprt->snd_task); | ||
1092 | spin_unlock_bh(&xprt->sock_lock); | ||
1093 | out: | ||
1094 | read_unlock(&sk->sk_callback_lock); | ||
1095 | } | ||
1096 | |||
1097 | /* | ||
1098 | * RPC receive timeout handler. | 476 | * RPC receive timeout handler. |
1099 | */ | 477 | */ |
1100 | static void | 478 | static void |
@@ -1161,19 +539,10 @@ xprt_transmit(struct rpc_task *task) | |||
1161 | struct rpc_clnt *clnt = task->tk_client; | 539 | struct rpc_clnt *clnt = task->tk_client; |
1162 | struct rpc_rqst *req = task->tk_rqstp; | 540 | struct rpc_rqst *req = task->tk_rqstp; |
1163 | struct rpc_xprt *xprt = req->rq_xprt; | 541 | struct rpc_xprt *xprt = req->rq_xprt; |
1164 | int status, retry = 0; | 542 | int status; |
1165 | |||
1166 | 543 | ||
1167 | dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); | 544 | dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); |
1168 | 545 | ||
1169 | /* set up everything as needed. */ | ||
1170 | /* Write the record marker */ | ||
1171 | if (xprt->stream) { | ||
1172 | u32 *marker = req->rq_svec[0].iov_base; | ||
1173 | |||
1174 | *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); | ||
1175 | } | ||
1176 | |||
1177 | smp_rmb(); | 546 | smp_rmb(); |
1178 | if (!req->rq_received) { | 547 | if (!req->rq_received) { |
1179 | if (list_empty(&req->rq_list)) { | 548 | if (list_empty(&req->rq_list)) { |
@@ -1191,41 +560,9 @@ xprt_transmit(struct rpc_task *task) | |||
1191 | } else if (!req->rq_bytes_sent) | 560 | } else if (!req->rq_bytes_sent) |
1192 | return; | 561 | return; |
1193 | 562 | ||
1194 | /* Continue transmitting the packet/record. We must be careful | 563 | status = xprt->ops->send_request(task); |
1195 | * to cope with writespace callbacks arriving _after_ we have | 564 | if (!status) |
1196 | * called xprt_sendmsg(). | 565 | goto out_receive; |
1197 | */ | ||
1198 | while (1) { | ||
1199 | req->rq_xtime = jiffies; | ||
1200 | status = xprt_sendmsg(xprt, req); | ||
1201 | |||
1202 | if (status < 0) | ||
1203 | break; | ||
1204 | |||
1205 | if (xprt->stream) { | ||
1206 | req->rq_bytes_sent += status; | ||
1207 | |||
1208 | /* If we've sent the entire packet, immediately | ||
1209 | * reset the count of bytes sent. */ | ||
1210 | if (req->rq_bytes_sent >= req->rq_slen) { | ||
1211 | req->rq_bytes_sent = 0; | ||
1212 | goto out_receive; | ||
1213 | } | ||
1214 | } else { | ||
1215 | if (status >= req->rq_slen) | ||
1216 | goto out_receive; | ||
1217 | status = -EAGAIN; | ||
1218 | break; | ||
1219 | } | ||
1220 | |||
1221 | dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", | ||
1222 | task->tk_pid, req->rq_slen - req->rq_bytes_sent, | ||
1223 | req->rq_slen); | ||
1224 | |||
1225 | status = -EAGAIN; | ||
1226 | if (retry++ > 50) | ||
1227 | break; | ||
1228 | } | ||
1229 | 566 | ||
1230 | /* Note: at this point, task->tk_sleeping has not yet been set, | 567 | /* Note: at this point, task->tk_sleeping has not yet been set, |
1231 | * hence there is no danger of the waking up task being put on | 568 | * hence there is no danger of the waking up task being put on |
@@ -1234,26 +571,10 @@ xprt_transmit(struct rpc_task *task) | |||
1234 | task->tk_status = status; | 571 | task->tk_status = status; |
1235 | 572 | ||
1236 | switch (status) { | 573 | switch (status) { |
1237 | case -EAGAIN: | ||
1238 | if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { | ||
1239 | /* Protect against races with xprt_write_space */ | ||
1240 | spin_lock_bh(&xprt->sock_lock); | ||
1241 | /* Don't race with disconnect */ | ||
1242 | if (!xprt_connected(xprt)) | ||
1243 | task->tk_status = -ENOTCONN; | ||
1244 | else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { | ||
1245 | task->tk_timeout = req->rq_timeout; | ||
1246 | rpc_sleep_on(&xprt->pending, task, NULL, NULL); | ||
1247 | } | ||
1248 | spin_unlock_bh(&xprt->sock_lock); | ||
1249 | return; | ||
1250 | } | ||
1251 | /* Keep holding the socket if it is blocked */ | ||
1252 | rpc_delay(task, HZ>>4); | ||
1253 | return; | ||
1254 | case -ECONNREFUSED: | 574 | case -ECONNREFUSED: |
1255 | task->tk_timeout = RPC_REESTABLISH_TIMEOUT; | 575 | task->tk_timeout = RPC_REESTABLISH_TIMEOUT; |
1256 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); | 576 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); |
577 | case -EAGAIN: | ||
1257 | case -ENOTCONN: | 578 | case -ENOTCONN: |
1258 | return; | 579 | return; |
1259 | default: | 580 | default: |
@@ -1367,7 +688,8 @@ xprt_release(struct rpc_task *task) | |||
1367 | list_del(&req->rq_list); | 688 | list_del(&req->rq_list); |
1368 | xprt->last_used = jiffies; | 689 | xprt->last_used = jiffies; |
1369 | if (list_empty(&xprt->recv) && !xprt->shutdown) | 690 | if (list_empty(&xprt->recv) && !xprt->shutdown) |
1370 | mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); | 691 | mod_timer(&xprt->timer, |
692 | xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT); | ||
1371 | spin_unlock_bh(&xprt->sock_lock); | 693 | spin_unlock_bh(&xprt->sock_lock); |
1372 | task->tk_rqstp = NULL; | 694 | task->tk_rqstp = NULL; |
1373 | memset(req, 0, sizeof(*req)); /* mark unused */ | 695 | memset(req, 0, sizeof(*req)); /* mark unused */ |
@@ -1381,18 +703,6 @@ xprt_release(struct rpc_task *task) | |||
1381 | } | 703 | } |
1382 | 704 | ||
1383 | /* | 705 | /* |
1384 | * Set default timeout parameters | ||
1385 | */ | ||
1386 | static void | ||
1387 | xprt_default_timeout(struct rpc_timeout *to, int proto) | ||
1388 | { | ||
1389 | if (proto == IPPROTO_UDP) | ||
1390 | xprt_set_timeout(to, 5, 5 * HZ); | ||
1391 | else | ||
1392 | xprt_set_timeout(to, 2, 60 * HZ); | ||
1393 | } | ||
1394 | |||
1395 | /* | ||
1396 | * Set constant timeout | 706 | * Set constant timeout |
1397 | */ | 707 | */ |
1398 | void | 708 | void |
@@ -1405,68 +715,51 @@ xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) | |||
1405 | to->to_exponential = 0; | 715 | to->to_exponential = 0; |
1406 | } | 716 | } |
1407 | 717 | ||
1408 | unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
1409 | unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
1410 | |||
1411 | /* | 718 | /* |
1412 | * Initialize an RPC client | 719 | * Initialize an RPC client |
1413 | */ | 720 | */ |
1414 | static struct rpc_xprt * | 721 | static struct rpc_xprt * |
1415 | xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | 722 | xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) |
1416 | { | 723 | { |
724 | int result; | ||
1417 | struct rpc_xprt *xprt; | 725 | struct rpc_xprt *xprt; |
1418 | unsigned int entries; | ||
1419 | size_t slot_table_size; | ||
1420 | struct rpc_rqst *req; | 726 | struct rpc_rqst *req; |
1421 | 727 | ||
1422 | dprintk("RPC: setting up %s transport...\n", | ||
1423 | proto == IPPROTO_UDP? "UDP" : "TCP"); | ||
1424 | |||
1425 | entries = (proto == IPPROTO_TCP)? | ||
1426 | xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; | ||
1427 | |||
1428 | if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) | 728 | if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) |
1429 | return ERR_PTR(-ENOMEM); | 729 | return ERR_PTR(-ENOMEM); |
1430 | memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ | 730 | memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ |
1431 | xprt->max_reqs = entries; | ||
1432 | slot_table_size = entries * sizeof(xprt->slot[0]); | ||
1433 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
1434 | if (xprt->slot == NULL) { | ||
1435 | kfree(xprt); | ||
1436 | return ERR_PTR(-ENOMEM); | ||
1437 | } | ||
1438 | memset(xprt->slot, 0, slot_table_size); | ||
1439 | 731 | ||
1440 | xprt->addr = *ap; | 732 | xprt->addr = *ap; |
1441 | xprt->prot = proto; | 733 | |
1442 | xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; | 734 | switch (proto) { |
1443 | if (xprt->stream) { | 735 | case IPPROTO_UDP: |
1444 | xprt->cwnd = RPC_MAXCWND(xprt); | 736 | result = xs_setup_udp(xprt, to); |
1445 | xprt->nocong = 1; | 737 | break; |
1446 | xprt->max_payload = (1U << 31) - 1; | 738 | case IPPROTO_TCP: |
1447 | } else { | 739 | result = xs_setup_tcp(xprt, to); |
1448 | xprt->cwnd = RPC_INITCWND; | 740 | break; |
1449 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); | 741 | default: |
742 | printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", | ||
743 | proto); | ||
744 | result = -EIO; | ||
745 | break; | ||
746 | } | ||
747 | if (result) { | ||
748 | kfree(xprt); | ||
749 | return ERR_PTR(result); | ||
1450 | } | 750 | } |
751 | |||
1451 | spin_lock_init(&xprt->sock_lock); | 752 | spin_lock_init(&xprt->sock_lock); |
1452 | spin_lock_init(&xprt->xprt_lock); | 753 | spin_lock_init(&xprt->xprt_lock); |
1453 | init_waitqueue_head(&xprt->cong_wait); | 754 | init_waitqueue_head(&xprt->cong_wait); |
1454 | 755 | ||
1455 | INIT_LIST_HEAD(&xprt->free); | 756 | INIT_LIST_HEAD(&xprt->free); |
1456 | INIT_LIST_HEAD(&xprt->recv); | 757 | INIT_LIST_HEAD(&xprt->recv); |
1457 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | ||
1458 | INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); | 758 | INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); |
1459 | init_timer(&xprt->timer); | 759 | init_timer(&xprt->timer); |
1460 | xprt->timer.function = xprt_init_autodisconnect; | 760 | xprt->timer.function = xprt_init_autodisconnect; |
1461 | xprt->timer.data = (unsigned long) xprt; | 761 | xprt->timer.data = (unsigned long) xprt; |
1462 | xprt->last_used = jiffies; | 762 | xprt->last_used = jiffies; |
1463 | xprt->port = XPRT_MAX_RESVPORT; | ||
1464 | |||
1465 | /* Set timeout parameters */ | ||
1466 | if (to) { | ||
1467 | xprt->timeout = *to; | ||
1468 | } else | ||
1469 | xprt_default_timeout(&xprt->timeout, xprt->prot); | ||
1470 | 763 | ||
1471 | rpc_init_wait_queue(&xprt->pending, "xprt_pending"); | 764 | rpc_init_wait_queue(&xprt->pending, "xprt_pending"); |
1472 | rpc_init_wait_queue(&xprt->sending, "xprt_sending"); | 765 | rpc_init_wait_queue(&xprt->sending, "xprt_sending"); |
@@ -1474,14 +767,11 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | |||
1474 | rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); | 767 | rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); |
1475 | 768 | ||
1476 | /* initialize free list */ | 769 | /* initialize free list */ |
1477 | for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) | 770 | for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) |
1478 | list_add(&req->rq_list, &xprt->free); | 771 | list_add(&req->rq_list, &xprt->free); |
1479 | 772 | ||
1480 | xprt_init_xid(xprt); | 773 | xprt_init_xid(xprt); |
1481 | 774 | ||
1482 | /* Check whether we want to use a reserved port */ | ||
1483 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
1484 | |||
1485 | dprintk("RPC: created transport %p with %u slots\n", xprt, | 775 | dprintk("RPC: created transport %p with %u slots\n", xprt, |
1486 | xprt->max_reqs); | 776 | xprt->max_reqs); |
1487 | 777 | ||
@@ -1489,120 +779,6 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | |||
1489 | } | 779 | } |
1490 | 780 | ||
1491 | /* | 781 | /* |
1492 | * Bind to a reserved port | ||
1493 | */ | ||
1494 | static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) | ||
1495 | { | ||
1496 | struct sockaddr_in myaddr = { | ||
1497 | .sin_family = AF_INET, | ||
1498 | }; | ||
1499 | int err, port; | ||
1500 | |||
1501 | /* Were we already bound to a given port? Try to reuse it */ | ||
1502 | port = xprt->port; | ||
1503 | do { | ||
1504 | myaddr.sin_port = htons(port); | ||
1505 | err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, | ||
1506 | sizeof(myaddr)); | ||
1507 | if (err == 0) { | ||
1508 | xprt->port = port; | ||
1509 | return 0; | ||
1510 | } | ||
1511 | if (--port == 0) | ||
1512 | port = XPRT_MAX_RESVPORT; | ||
1513 | } while (err == -EADDRINUSE && port != xprt->port); | ||
1514 | |||
1515 | printk("RPC: Can't bind to reserved port (%d).\n", -err); | ||
1516 | return err; | ||
1517 | } | ||
1518 | |||
1519 | static void | ||
1520 | xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) | ||
1521 | { | ||
1522 | struct sock *sk = sock->sk; | ||
1523 | |||
1524 | if (xprt->inet) | ||
1525 | return; | ||
1526 | |||
1527 | write_lock_bh(&sk->sk_callback_lock); | ||
1528 | sk->sk_user_data = xprt; | ||
1529 | xprt->old_data_ready = sk->sk_data_ready; | ||
1530 | xprt->old_state_change = sk->sk_state_change; | ||
1531 | xprt->old_write_space = sk->sk_write_space; | ||
1532 | if (xprt->prot == IPPROTO_UDP) { | ||
1533 | sk->sk_data_ready = udp_data_ready; | ||
1534 | sk->sk_no_check = UDP_CSUM_NORCV; | ||
1535 | xprt_set_connected(xprt); | ||
1536 | } else { | ||
1537 | tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ | ||
1538 | sk->sk_data_ready = tcp_data_ready; | ||
1539 | sk->sk_state_change = tcp_state_change; | ||
1540 | xprt_clear_connected(xprt); | ||
1541 | } | ||
1542 | sk->sk_write_space = xprt_write_space; | ||
1543 | |||
1544 | /* Reset to new socket */ | ||
1545 | xprt->sock = sock; | ||
1546 | xprt->inet = sk; | ||
1547 | write_unlock_bh(&sk->sk_callback_lock); | ||
1548 | |||
1549 | return; | ||
1550 | } | ||
1551 | |||
1552 | /* | ||
1553 | * Set socket buffer length | ||
1554 | */ | ||
1555 | void | ||
1556 | xprt_sock_setbufsize(struct rpc_xprt *xprt) | ||
1557 | { | ||
1558 | struct sock *sk = xprt->inet; | ||
1559 | |||
1560 | if (xprt->stream) | ||
1561 | return; | ||
1562 | if (xprt->rcvsize) { | ||
1563 | sk->sk_userlocks |= SOCK_RCVBUF_LOCK; | ||
1564 | sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; | ||
1565 | } | ||
1566 | if (xprt->sndsize) { | ||
1567 | sk->sk_userlocks |= SOCK_SNDBUF_LOCK; | ||
1568 | sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; | ||
1569 | sk->sk_write_space(sk); | ||
1570 | } | ||
1571 | } | ||
1572 | |||
1573 | /* | ||
1574 | * Datastream sockets are created here, but xprt_connect will create | ||
1575 | * and connect stream sockets. | ||
1576 | */ | ||
1577 | static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) | ||
1578 | { | ||
1579 | struct socket *sock; | ||
1580 | int type, err; | ||
1581 | |||
1582 | dprintk("RPC: xprt_create_socket(%s %d)\n", | ||
1583 | (proto == IPPROTO_UDP)? "udp" : "tcp", proto); | ||
1584 | |||
1585 | type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; | ||
1586 | |||
1587 | if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { | ||
1588 | printk("RPC: can't create socket (%d).\n", -err); | ||
1589 | return NULL; | ||
1590 | } | ||
1591 | |||
1592 | /* If the caller has the capability, bind to a reserved port */ | ||
1593 | if (resvport && xprt_bindresvport(xprt, sock) < 0) { | ||
1594 | printk("RPC: can't bind to reserved port.\n"); | ||
1595 | goto failed; | ||
1596 | } | ||
1597 | |||
1598 | return sock; | ||
1599 | |||
1600 | failed: | ||
1601 | sock_release(sock); | ||
1602 | return NULL; | ||
1603 | } | ||
1604 | |||
1605 | /* | ||
1606 | * Create an RPC client transport given the protocol and peer address. | 782 | * Create an RPC client transport given the protocol and peer address. |
1607 | */ | 783 | */ |
1608 | struct rpc_xprt * | 784 | struct rpc_xprt * |
@@ -1631,10 +807,6 @@ xprt_shutdown(struct rpc_xprt *xprt) | |||
1631 | rpc_wake_up(&xprt->backlog); | 807 | rpc_wake_up(&xprt->backlog); |
1632 | wake_up(&xprt->cong_wait); | 808 | wake_up(&xprt->cong_wait); |
1633 | del_timer_sync(&xprt->timer); | 809 | del_timer_sync(&xprt->timer); |
1634 | |||
1635 | /* synchronously wait for connect worker to finish */ | ||
1636 | cancel_delayed_work(&xprt->sock_connect); | ||
1637 | flush_scheduled_work(); | ||
1638 | } | 810 | } |
1639 | 811 | ||
1640 | /* | 812 | /* |
@@ -1655,9 +827,7 @@ xprt_destroy(struct rpc_xprt *xprt) | |||
1655 | { | 827 | { |
1656 | dprintk("RPC: destroying transport %p\n", xprt); | 828 | dprintk("RPC: destroying transport %p\n", xprt); |
1657 | xprt_shutdown(xprt); | 829 | xprt_shutdown(xprt); |
1658 | xprt_disconnect(xprt); | 830 | xprt->ops->destroy(xprt); |
1659 | xprt_close(xprt); | ||
1660 | kfree(xprt->slot); | ||
1661 | kfree(xprt); | 831 | kfree(xprt); |
1662 | 832 | ||
1663 | return 0; | 833 | return 0; |
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c new file mode 100644 index 000000000000..fa1180ac4823 --- /dev/null +++ b/net/sunrpc/xprtsock.c | |||
@@ -0,0 +1,1021 @@ | |||
1 | /* | ||
2 | * linux/net/sunrpc/xprtsock.c | ||
3 | * | ||
4 | * Client-side transport implementation for sockets. | ||
5 | * | ||
6 | * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
7 | * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
8 | * TCP NFS related read + write fixes | ||
9 | * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> | ||
10 | * | ||
11 | * Rewrite of larges part of the code in order to stabilize TCP stuff. | ||
12 | * Fix behaviour when socket buffer is full. | ||
13 | * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> | ||
14 | */ | ||
15 | |||
16 | #include <linux/types.h> | ||
17 | #include <linux/slab.h> | ||
18 | #include <linux/capability.h> | ||
19 | #include <linux/sched.h> | ||
20 | #include <linux/pagemap.h> | ||
21 | #include <linux/errno.h> | ||
22 | #include <linux/socket.h> | ||
23 | #include <linux/in.h> | ||
24 | #include <linux/net.h> | ||
25 | #include <linux/mm.h> | ||
26 | #include <linux/udp.h> | ||
27 | #include <linux/tcp.h> | ||
28 | #include <linux/sunrpc/clnt.h> | ||
29 | #include <linux/file.h> | ||
30 | |||
31 | #include <net/sock.h> | ||
32 | #include <net/checksum.h> | ||
33 | #include <net/udp.h> | ||
34 | #include <net/tcp.h> | ||
35 | |||
36 | #ifdef RPC_DEBUG | ||
37 | # undef RPC_DEBUG_DATA | ||
38 | # define RPCDBG_FACILITY RPCDBG_XPRT | ||
39 | #endif | ||
40 | |||
41 | #define XPRT_MAX_RESVPORT (800) | ||
42 | |||
43 | #ifdef RPC_DEBUG_DATA | ||
44 | /* | ||
45 | * Print the buffer contents (first 128 bytes only--just enough for | ||
46 | * diropres return). | ||
47 | */ | ||
48 | static void | ||
49 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
50 | { | ||
51 | u8 *buf = (u8 *) packet; | ||
52 | int j; | ||
53 | |||
54 | dprintk("RPC: %s\n", msg); | ||
55 | for (j = 0; j < count && j < 128; j += 4) { | ||
56 | if (!(j & 31)) { | ||
57 | if (j) | ||
58 | dprintk("\n"); | ||
59 | dprintk("0x%04x ", j); | ||
60 | } | ||
61 | dprintk("%02x%02x%02x%02x ", | ||
62 | buf[j], buf[j+1], buf[j+2], buf[j+3]); | ||
63 | } | ||
64 | dprintk("\n"); | ||
65 | } | ||
66 | #else | ||
67 | static inline void | ||
68 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
69 | { | ||
70 | /* NOP */ | ||
71 | } | ||
72 | #endif | ||
73 | |||
74 | /* | ||
75 | * Look up RPC transport given an INET socket | ||
76 | */ | ||
77 | static inline struct rpc_xprt * | ||
78 | xprt_from_sock(struct sock *sk) | ||
79 | { | ||
80 | return (struct rpc_xprt *) sk->sk_user_data; | ||
81 | } | ||
82 | |||
83 | static int | ||
84 | xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, | ||
85 | struct xdr_buf *xdr, unsigned int base, int msgflags) | ||
86 | { | ||
87 | struct page **ppage = xdr->pages; | ||
88 | unsigned int len, pglen = xdr->page_len; | ||
89 | int err, ret = 0; | ||
90 | ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); | ||
91 | |||
92 | len = xdr->head[0].iov_len; | ||
93 | if (base < len || (addr != NULL && base == 0)) { | ||
94 | struct kvec iov = { | ||
95 | .iov_base = xdr->head[0].iov_base + base, | ||
96 | .iov_len = len - base, | ||
97 | }; | ||
98 | struct msghdr msg = { | ||
99 | .msg_name = addr, | ||
100 | .msg_namelen = addrlen, | ||
101 | .msg_flags = msgflags, | ||
102 | }; | ||
103 | if (xdr->len > len) | ||
104 | msg.msg_flags |= MSG_MORE; | ||
105 | |||
106 | if (iov.iov_len != 0) | ||
107 | err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); | ||
108 | else | ||
109 | err = kernel_sendmsg(sock, &msg, NULL, 0, 0); | ||
110 | if (ret == 0) | ||
111 | ret = err; | ||
112 | else if (err > 0) | ||
113 | ret += err; | ||
114 | if (err != iov.iov_len) | ||
115 | goto out; | ||
116 | base = 0; | ||
117 | } else | ||
118 | base -= len; | ||
119 | |||
120 | if (pglen == 0) | ||
121 | goto copy_tail; | ||
122 | if (base >= pglen) { | ||
123 | base -= pglen; | ||
124 | goto copy_tail; | ||
125 | } | ||
126 | if (base || xdr->page_base) { | ||
127 | pglen -= base; | ||
128 | base += xdr->page_base; | ||
129 | ppage += base >> PAGE_CACHE_SHIFT; | ||
130 | base &= ~PAGE_CACHE_MASK; | ||
131 | } | ||
132 | |||
133 | sendpage = sock->ops->sendpage ? : sock_no_sendpage; | ||
134 | do { | ||
135 | int flags = msgflags; | ||
136 | |||
137 | len = PAGE_CACHE_SIZE; | ||
138 | if (base) | ||
139 | len -= base; | ||
140 | if (pglen < len) | ||
141 | len = pglen; | ||
142 | |||
143 | if (pglen != len || xdr->tail[0].iov_len != 0) | ||
144 | flags |= MSG_MORE; | ||
145 | |||
146 | /* Hmm... We might be dealing with highmem pages */ | ||
147 | if (PageHighMem(*ppage)) | ||
148 | sendpage = sock_no_sendpage; | ||
149 | err = sendpage(sock, *ppage, base, len, flags); | ||
150 | if (ret == 0) | ||
151 | ret = err; | ||
152 | else if (err > 0) | ||
153 | ret += err; | ||
154 | if (err != len) | ||
155 | goto out; | ||
156 | base = 0; | ||
157 | ppage++; | ||
158 | } while ((pglen -= len) != 0); | ||
159 | copy_tail: | ||
160 | len = xdr->tail[0].iov_len; | ||
161 | if (base < len) { | ||
162 | struct kvec iov = { | ||
163 | .iov_base = xdr->tail[0].iov_base + base, | ||
164 | .iov_len = len - base, | ||
165 | }; | ||
166 | struct msghdr msg = { | ||
167 | .msg_flags = msgflags, | ||
168 | }; | ||
169 | err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); | ||
170 | if (ret == 0) | ||
171 | ret = err; | ||
172 | else if (err > 0) | ||
173 | ret += err; | ||
174 | } | ||
175 | out: | ||
176 | return ret; | ||
177 | } | ||
178 | |||
179 | /* | ||
180 | * Write data to socket. | ||
181 | */ | ||
182 | static inline int | ||
183 | xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) | ||
184 | { | ||
185 | struct socket *sock = xprt->sock; | ||
186 | struct xdr_buf *xdr = &req->rq_snd_buf; | ||
187 | struct sockaddr *addr = NULL; | ||
188 | int addrlen = 0; | ||
189 | unsigned int skip; | ||
190 | int result; | ||
191 | |||
192 | if (!sock) | ||
193 | return -ENOTCONN; | ||
194 | |||
195 | xprt_pktdump("packet data:", | ||
196 | req->rq_svec->iov_base, | ||
197 | req->rq_svec->iov_len); | ||
198 | |||
199 | /* For UDP, we need to provide an address */ | ||
200 | if (!xprt->stream) { | ||
201 | addr = (struct sockaddr *) &xprt->addr; | ||
202 | addrlen = sizeof(xprt->addr); | ||
203 | } | ||
204 | /* Dont repeat bytes */ | ||
205 | skip = req->rq_bytes_sent; | ||
206 | |||
207 | clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); | ||
208 | result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); | ||
209 | |||
210 | dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); | ||
211 | |||
212 | if (result >= 0) | ||
213 | return result; | ||
214 | |||
215 | switch (result) { | ||
216 | case -ECONNREFUSED: | ||
217 | /* When the server has died, an ICMP port unreachable message | ||
218 | * prompts ECONNREFUSED. | ||
219 | */ | ||
220 | case -EAGAIN: | ||
221 | break; | ||
222 | case -ECONNRESET: | ||
223 | case -ENOTCONN: | ||
224 | case -EPIPE: | ||
225 | /* connection broken */ | ||
226 | if (xprt->stream) | ||
227 | result = -ENOTCONN; | ||
228 | break; | ||
229 | default: | ||
230 | printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); | ||
231 | } | ||
232 | return result; | ||
233 | } | ||
234 | |||
235 | static int | ||
236 | xprt_send_request(struct rpc_task *task) | ||
237 | { | ||
238 | struct rpc_rqst *req = task->tk_rqstp; | ||
239 | struct rpc_xprt *xprt = req->rq_xprt; | ||
240 | int status, retry = 0; | ||
241 | |||
242 | /* set up everything as needed. */ | ||
243 | /* Write the record marker */ | ||
244 | if (xprt->stream) { | ||
245 | u32 *marker = req->rq_svec[0].iov_base; | ||
246 | |||
247 | *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); | ||
248 | } | ||
249 | |||
250 | /* Continue transmitting the packet/record. We must be careful | ||
251 | * to cope with writespace callbacks arriving _after_ we have | ||
252 | * called xprt_sendmsg(). | ||
253 | */ | ||
254 | while (1) { | ||
255 | req->rq_xtime = jiffies; | ||
256 | status = xprt_sendmsg(xprt, req); | ||
257 | |||
258 | if (status < 0) | ||
259 | break; | ||
260 | |||
261 | if (xprt->stream) { | ||
262 | req->rq_bytes_sent += status; | ||
263 | |||
264 | /* If we've sent the entire packet, immediately | ||
265 | * reset the count of bytes sent. */ | ||
266 | if (req->rq_bytes_sent >= req->rq_slen) { | ||
267 | req->rq_bytes_sent = 0; | ||
268 | return 0; | ||
269 | } | ||
270 | } else { | ||
271 | if (status >= req->rq_slen) | ||
272 | return 0; | ||
273 | status = -EAGAIN; | ||
274 | break; | ||
275 | } | ||
276 | |||
277 | dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", | ||
278 | task->tk_pid, req->rq_slen - req->rq_bytes_sent, | ||
279 | req->rq_slen); | ||
280 | |||
281 | status = -EAGAIN; | ||
282 | if (retry++ > 50) | ||
283 | break; | ||
284 | } | ||
285 | |||
286 | if (status == -EAGAIN) { | ||
287 | if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { | ||
288 | /* Protect against races with xprt_write_space */ | ||
289 | spin_lock_bh(&xprt->sock_lock); | ||
290 | /* Don't race with disconnect */ | ||
291 | if (!xprt_connected(xprt)) | ||
292 | task->tk_status = -ENOTCONN; | ||
293 | else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { | ||
294 | task->tk_timeout = req->rq_timeout; | ||
295 | rpc_sleep_on(&xprt->pending, task, NULL, NULL); | ||
296 | } | ||
297 | spin_unlock_bh(&xprt->sock_lock); | ||
298 | return status; | ||
299 | } | ||
300 | /* Keep holding the socket if it is blocked */ | ||
301 | rpc_delay(task, HZ>>4); | ||
302 | } | ||
303 | return status; | ||
304 | } | ||
305 | |||
306 | /* | ||
307 | * Close down a transport socket | ||
308 | */ | ||
309 | static void | ||
310 | xprt_close(struct rpc_xprt *xprt) | ||
311 | { | ||
312 | struct socket *sock = xprt->sock; | ||
313 | struct sock *sk = xprt->inet; | ||
314 | |||
315 | if (!sk) | ||
316 | return; | ||
317 | |||
318 | write_lock_bh(&sk->sk_callback_lock); | ||
319 | xprt->inet = NULL; | ||
320 | xprt->sock = NULL; | ||
321 | |||
322 | sk->sk_user_data = NULL; | ||
323 | sk->sk_data_ready = xprt->old_data_ready; | ||
324 | sk->sk_state_change = xprt->old_state_change; | ||
325 | sk->sk_write_space = xprt->old_write_space; | ||
326 | write_unlock_bh(&sk->sk_callback_lock); | ||
327 | |||
328 | sk->sk_no_check = 0; | ||
329 | |||
330 | sock_release(sock); | ||
331 | } | ||
332 | |||
333 | static void xprt_socket_destroy(struct rpc_xprt *xprt) | ||
334 | { | ||
335 | cancel_delayed_work(&xprt->sock_connect); | ||
336 | flush_scheduled_work(); | ||
337 | |||
338 | xprt_disconnect(xprt); | ||
339 | xprt_close(xprt); | ||
340 | kfree(xprt->slot); | ||
341 | } | ||
342 | |||
343 | /* | ||
344 | * Input handler for RPC replies. Called from a bottom half and hence | ||
345 | * atomic. | ||
346 | */ | ||
347 | static void | ||
348 | udp_data_ready(struct sock *sk, int len) | ||
349 | { | ||
350 | struct rpc_task *task; | ||
351 | struct rpc_xprt *xprt; | ||
352 | struct rpc_rqst *rovr; | ||
353 | struct sk_buff *skb; | ||
354 | int err, repsize, copied; | ||
355 | u32 _xid, *xp; | ||
356 | |||
357 | read_lock(&sk->sk_callback_lock); | ||
358 | dprintk("RPC: udp_data_ready...\n"); | ||
359 | if (!(xprt = xprt_from_sock(sk))) { | ||
360 | printk("RPC: udp_data_ready request not found!\n"); | ||
361 | goto out; | ||
362 | } | ||
363 | |||
364 | dprintk("RPC: udp_data_ready client %p\n", xprt); | ||
365 | |||
366 | if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) | ||
367 | goto out; | ||
368 | |||
369 | if (xprt->shutdown) | ||
370 | goto dropit; | ||
371 | |||
372 | repsize = skb->len - sizeof(struct udphdr); | ||
373 | if (repsize < 4) { | ||
374 | printk("RPC: impossible RPC reply size %d!\n", repsize); | ||
375 | goto dropit; | ||
376 | } | ||
377 | |||
378 | /* Copy the XID from the skb... */ | ||
379 | xp = skb_header_pointer(skb, sizeof(struct udphdr), | ||
380 | sizeof(_xid), &_xid); | ||
381 | if (xp == NULL) | ||
382 | goto dropit; | ||
383 | |||
384 | /* Look up and lock the request corresponding to the given XID */ | ||
385 | spin_lock(&xprt->sock_lock); | ||
386 | rovr = xprt_lookup_rqst(xprt, *xp); | ||
387 | if (!rovr) | ||
388 | goto out_unlock; | ||
389 | task = rovr->rq_task; | ||
390 | |||
391 | dprintk("RPC: %4d received reply\n", task->tk_pid); | ||
392 | |||
393 | if ((copied = rovr->rq_private_buf.buflen) > repsize) | ||
394 | copied = repsize; | ||
395 | |||
396 | /* Suck it into the iovec, verify checksum if not done by hw. */ | ||
397 | if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) | ||
398 | goto out_unlock; | ||
399 | |||
400 | /* Something worked... */ | ||
401 | dst_confirm(skb->dst); | ||
402 | |||
403 | xprt_complete_rqst(xprt, rovr, copied); | ||
404 | |||
405 | out_unlock: | ||
406 | spin_unlock(&xprt->sock_lock); | ||
407 | dropit: | ||
408 | skb_free_datagram(sk, skb); | ||
409 | out: | ||
410 | read_unlock(&sk->sk_callback_lock); | ||
411 | } | ||
412 | |||
413 | /* | ||
414 | * Copy from an skb into memory and shrink the skb. | ||
415 | */ | ||
416 | static inline size_t | ||
417 | tcp_copy_data(skb_reader_t *desc, void *p, size_t len) | ||
418 | { | ||
419 | if (len > desc->count) | ||
420 | len = desc->count; | ||
421 | if (skb_copy_bits(desc->skb, desc->offset, p, len)) { | ||
422 | dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", | ||
423 | len, desc->count); | ||
424 | return 0; | ||
425 | } | ||
426 | desc->offset += len; | ||
427 | desc->count -= len; | ||
428 | dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", | ||
429 | len, desc->count); | ||
430 | return len; | ||
431 | } | ||
432 | |||
433 | /* | ||
434 | * TCP read fragment marker | ||
435 | */ | ||
436 | static inline void | ||
437 | tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
438 | { | ||
439 | size_t len, used; | ||
440 | char *p; | ||
441 | |||
442 | p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; | ||
443 | len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; | ||
444 | used = tcp_copy_data(desc, p, len); | ||
445 | xprt->tcp_offset += used; | ||
446 | if (used != len) | ||
447 | return; | ||
448 | xprt->tcp_reclen = ntohl(xprt->tcp_recm); | ||
449 | if (xprt->tcp_reclen & 0x80000000) | ||
450 | xprt->tcp_flags |= XPRT_LAST_FRAG; | ||
451 | else | ||
452 | xprt->tcp_flags &= ~XPRT_LAST_FRAG; | ||
453 | xprt->tcp_reclen &= 0x7fffffff; | ||
454 | xprt->tcp_flags &= ~XPRT_COPY_RECM; | ||
455 | xprt->tcp_offset = 0; | ||
456 | /* Sanity check of the record length */ | ||
457 | if (xprt->tcp_reclen < 4) { | ||
458 | printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); | ||
459 | xprt_disconnect(xprt); | ||
460 | } | ||
461 | dprintk("RPC: reading TCP record fragment of length %d\n", | ||
462 | xprt->tcp_reclen); | ||
463 | } | ||
464 | |||
465 | static void | ||
466 | tcp_check_recm(struct rpc_xprt *xprt) | ||
467 | { | ||
468 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", | ||
469 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); | ||
470 | if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
471 | xprt->tcp_flags |= XPRT_COPY_RECM; | ||
472 | xprt->tcp_offset = 0; | ||
473 | if (xprt->tcp_flags & XPRT_LAST_FRAG) { | ||
474 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
475 | xprt->tcp_flags |= XPRT_COPY_XID; | ||
476 | xprt->tcp_copied = 0; | ||
477 | } | ||
478 | } | ||
479 | } | ||
480 | |||
481 | /* | ||
482 | * TCP read xid | ||
483 | */ | ||
484 | static inline void | ||
485 | tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
486 | { | ||
487 | size_t len, used; | ||
488 | char *p; | ||
489 | |||
490 | len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; | ||
491 | dprintk("RPC: reading XID (%Zu bytes)\n", len); | ||
492 | p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; | ||
493 | used = tcp_copy_data(desc, p, len); | ||
494 | xprt->tcp_offset += used; | ||
495 | if (used != len) | ||
496 | return; | ||
497 | xprt->tcp_flags &= ~XPRT_COPY_XID; | ||
498 | xprt->tcp_flags |= XPRT_COPY_DATA; | ||
499 | xprt->tcp_copied = 4; | ||
500 | dprintk("RPC: reading reply for XID %08x\n", | ||
501 | ntohl(xprt->tcp_xid)); | ||
502 | tcp_check_recm(xprt); | ||
503 | } | ||
504 | |||
505 | /* | ||
506 | * TCP read and complete request | ||
507 | */ | ||
508 | static inline void | ||
509 | tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
510 | { | ||
511 | struct rpc_rqst *req; | ||
512 | struct xdr_buf *rcvbuf; | ||
513 | size_t len; | ||
514 | ssize_t r; | ||
515 | |||
516 | /* Find and lock the request corresponding to this xid */ | ||
517 | spin_lock(&xprt->sock_lock); | ||
518 | req = xprt_lookup_rqst(xprt, xprt->tcp_xid); | ||
519 | if (!req) { | ||
520 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
521 | dprintk("RPC: XID %08x request not found!\n", | ||
522 | ntohl(xprt->tcp_xid)); | ||
523 | spin_unlock(&xprt->sock_lock); | ||
524 | return; | ||
525 | } | ||
526 | |||
527 | rcvbuf = &req->rq_private_buf; | ||
528 | len = desc->count; | ||
529 | if (len > xprt->tcp_reclen - xprt->tcp_offset) { | ||
530 | skb_reader_t my_desc; | ||
531 | |||
532 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
533 | memcpy(&my_desc, desc, sizeof(my_desc)); | ||
534 | my_desc.count = len; | ||
535 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
536 | &my_desc, tcp_copy_data); | ||
537 | desc->count -= r; | ||
538 | desc->offset += r; | ||
539 | } else | ||
540 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
541 | desc, tcp_copy_data); | ||
542 | |||
543 | if (r > 0) { | ||
544 | xprt->tcp_copied += r; | ||
545 | xprt->tcp_offset += r; | ||
546 | } | ||
547 | if (r != len) { | ||
548 | /* Error when copying to the receive buffer, | ||
549 | * usually because we weren't able to allocate | ||
550 | * additional buffer pages. All we can do now | ||
551 | * is turn off XPRT_COPY_DATA, so the request | ||
552 | * will not receive any additional updates, | ||
553 | * and time out. | ||
554 | * Any remaining data from this record will | ||
555 | * be discarded. | ||
556 | */ | ||
557 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
558 | dprintk("RPC: XID %08x truncated request\n", | ||
559 | ntohl(xprt->tcp_xid)); | ||
560 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
561 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
562 | goto out; | ||
563 | } | ||
564 | |||
565 | dprintk("RPC: XID %08x read %Zd bytes\n", | ||
566 | ntohl(xprt->tcp_xid), r); | ||
567 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
568 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
569 | |||
570 | if (xprt->tcp_copied == req->rq_private_buf.buflen) | ||
571 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
572 | else if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
573 | if (xprt->tcp_flags & XPRT_LAST_FRAG) | ||
574 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
575 | } | ||
576 | |||
577 | out: | ||
578 | if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { | ||
579 | dprintk("RPC: %4d received reply complete\n", | ||
580 | req->rq_task->tk_pid); | ||
581 | xprt_complete_rqst(xprt, req, xprt->tcp_copied); | ||
582 | } | ||
583 | spin_unlock(&xprt->sock_lock); | ||
584 | tcp_check_recm(xprt); | ||
585 | } | ||
586 | |||
587 | /* | ||
588 | * TCP discard extra bytes from a short read | ||
589 | */ | ||
590 | static inline void | ||
591 | tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
592 | { | ||
593 | size_t len; | ||
594 | |||
595 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
596 | if (len > desc->count) | ||
597 | len = desc->count; | ||
598 | desc->count -= len; | ||
599 | desc->offset += len; | ||
600 | xprt->tcp_offset += len; | ||
601 | dprintk("RPC: discarded %Zu bytes\n", len); | ||
602 | tcp_check_recm(xprt); | ||
603 | } | ||
604 | |||
605 | /* | ||
606 | * TCP record receive routine | ||
607 | * We first have to grab the record marker, then the XID, then the data. | ||
608 | */ | ||
609 | static int | ||
610 | tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, | ||
611 | unsigned int offset, size_t len) | ||
612 | { | ||
613 | struct rpc_xprt *xprt = rd_desc->arg.data; | ||
614 | skb_reader_t desc = { | ||
615 | .skb = skb, | ||
616 | .offset = offset, | ||
617 | .count = len, | ||
618 | .csum = 0 | ||
619 | }; | ||
620 | |||
621 | dprintk("RPC: tcp_data_recv\n"); | ||
622 | do { | ||
623 | /* Read in a new fragment marker if necessary */ | ||
624 | /* Can we ever really expect to get completely empty fragments? */ | ||
625 | if (xprt->tcp_flags & XPRT_COPY_RECM) { | ||
626 | tcp_read_fraghdr(xprt, &desc); | ||
627 | continue; | ||
628 | } | ||
629 | /* Read in the xid if necessary */ | ||
630 | if (xprt->tcp_flags & XPRT_COPY_XID) { | ||
631 | tcp_read_xid(xprt, &desc); | ||
632 | continue; | ||
633 | } | ||
634 | /* Read in the request data */ | ||
635 | if (xprt->tcp_flags & XPRT_COPY_DATA) { | ||
636 | tcp_read_request(xprt, &desc); | ||
637 | continue; | ||
638 | } | ||
639 | /* Skip over any trailing bytes on short reads */ | ||
640 | tcp_read_discard(xprt, &desc); | ||
641 | } while (desc.count); | ||
642 | dprintk("RPC: tcp_data_recv done\n"); | ||
643 | return len - desc.count; | ||
644 | } | ||
645 | |||
646 | static void tcp_data_ready(struct sock *sk, int bytes) | ||
647 | { | ||
648 | struct rpc_xprt *xprt; | ||
649 | read_descriptor_t rd_desc; | ||
650 | |||
651 | read_lock(&sk->sk_callback_lock); | ||
652 | dprintk("RPC: tcp_data_ready...\n"); | ||
653 | if (!(xprt = xprt_from_sock(sk))) { | ||
654 | printk("RPC: tcp_data_ready socket info not found!\n"); | ||
655 | goto out; | ||
656 | } | ||
657 | if (xprt->shutdown) | ||
658 | goto out; | ||
659 | |||
660 | /* We use rd_desc to pass struct xprt to tcp_data_recv */ | ||
661 | rd_desc.arg.data = xprt; | ||
662 | rd_desc.count = 65536; | ||
663 | tcp_read_sock(sk, &rd_desc, tcp_data_recv); | ||
664 | out: | ||
665 | read_unlock(&sk->sk_callback_lock); | ||
666 | } | ||
667 | |||
668 | static void | ||
669 | tcp_state_change(struct sock *sk) | ||
670 | { | ||
671 | struct rpc_xprt *xprt; | ||
672 | |||
673 | read_lock(&sk->sk_callback_lock); | ||
674 | if (!(xprt = xprt_from_sock(sk))) | ||
675 | goto out; | ||
676 | dprintk("RPC: tcp_state_change client %p...\n", xprt); | ||
677 | dprintk("RPC: state %x conn %d dead %d zapped %d\n", | ||
678 | sk->sk_state, xprt_connected(xprt), | ||
679 | sock_flag(sk, SOCK_DEAD), | ||
680 | sock_flag(sk, SOCK_ZAPPED)); | ||
681 | |||
682 | switch (sk->sk_state) { | ||
683 | case TCP_ESTABLISHED: | ||
684 | spin_lock_bh(&xprt->sock_lock); | ||
685 | if (!xprt_test_and_set_connected(xprt)) { | ||
686 | /* Reset TCP record info */ | ||
687 | xprt->tcp_offset = 0; | ||
688 | xprt->tcp_reclen = 0; | ||
689 | xprt->tcp_copied = 0; | ||
690 | xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; | ||
691 | rpc_wake_up(&xprt->pending); | ||
692 | } | ||
693 | spin_unlock_bh(&xprt->sock_lock); | ||
694 | break; | ||
695 | case TCP_SYN_SENT: | ||
696 | case TCP_SYN_RECV: | ||
697 | break; | ||
698 | default: | ||
699 | xprt_disconnect(xprt); | ||
700 | break; | ||
701 | } | ||
702 | out: | ||
703 | read_unlock(&sk->sk_callback_lock); | ||
704 | } | ||
705 | |||
706 | /* | ||
707 | * Called when more output buffer space is available for this socket. | ||
708 | * We try not to wake our writers until they can make "significant" | ||
709 | * progress, otherwise we'll waste resources thrashing sock_sendmsg | ||
710 | * with a bunch of small requests. | ||
711 | */ | ||
712 | static void | ||
713 | xprt_write_space(struct sock *sk) | ||
714 | { | ||
715 | struct rpc_xprt *xprt; | ||
716 | struct socket *sock; | ||
717 | |||
718 | read_lock(&sk->sk_callback_lock); | ||
719 | if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) | ||
720 | goto out; | ||
721 | if (xprt->shutdown) | ||
722 | goto out; | ||
723 | |||
724 | /* Wait until we have enough socket memory */ | ||
725 | if (xprt->stream) { | ||
726 | /* from net/core/stream.c:sk_stream_write_space */ | ||
727 | if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) | ||
728 | goto out; | ||
729 | } else { | ||
730 | /* from net/core/sock.c:sock_def_write_space */ | ||
731 | if (!sock_writeable(sk)) | ||
732 | goto out; | ||
733 | } | ||
734 | |||
735 | if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) | ||
736 | goto out; | ||
737 | |||
738 | spin_lock_bh(&xprt->sock_lock); | ||
739 | if (xprt->snd_task) | ||
740 | rpc_wake_up_task(xprt->snd_task); | ||
741 | spin_unlock_bh(&xprt->sock_lock); | ||
742 | out: | ||
743 | read_unlock(&sk->sk_callback_lock); | ||
744 | } | ||
745 | |||
746 | /* | ||
747 | * Set socket buffer length | ||
748 | */ | ||
749 | static void | ||
750 | xprt_sock_setbufsize(struct rpc_xprt *xprt) | ||
751 | { | ||
752 | struct sock *sk = xprt->inet; | ||
753 | |||
754 | if (xprt->stream) | ||
755 | return; | ||
756 | if (xprt->rcvsize) { | ||
757 | sk->sk_userlocks |= SOCK_RCVBUF_LOCK; | ||
758 | sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; | ||
759 | } | ||
760 | if (xprt->sndsize) { | ||
761 | sk->sk_userlocks |= SOCK_SNDBUF_LOCK; | ||
762 | sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; | ||
763 | sk->sk_write_space(sk); | ||
764 | } | ||
765 | } | ||
766 | |||
767 | /* | ||
768 | * Bind to a reserved port | ||
769 | */ | ||
770 | static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) | ||
771 | { | ||
772 | struct sockaddr_in myaddr = { | ||
773 | .sin_family = AF_INET, | ||
774 | }; | ||
775 | int err, port; | ||
776 | |||
777 | /* Were we already bound to a given port? Try to reuse it */ | ||
778 | port = xprt->port; | ||
779 | do { | ||
780 | myaddr.sin_port = htons(port); | ||
781 | err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, | ||
782 | sizeof(myaddr)); | ||
783 | if (err == 0) { | ||
784 | xprt->port = port; | ||
785 | return 0; | ||
786 | } | ||
787 | if (--port == 0) | ||
788 | port = XPRT_MAX_RESVPORT; | ||
789 | } while (err == -EADDRINUSE && port != xprt->port); | ||
790 | |||
791 | printk("RPC: Can't bind to reserved port (%d).\n", -err); | ||
792 | return err; | ||
793 | } | ||
794 | |||
795 | static void | ||
796 | xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) | ||
797 | { | ||
798 | struct sock *sk = sock->sk; | ||
799 | |||
800 | if (xprt->inet) | ||
801 | return; | ||
802 | |||
803 | write_lock_bh(&sk->sk_callback_lock); | ||
804 | sk->sk_user_data = xprt; | ||
805 | xprt->old_data_ready = sk->sk_data_ready; | ||
806 | xprt->old_state_change = sk->sk_state_change; | ||
807 | xprt->old_write_space = sk->sk_write_space; | ||
808 | if (xprt->prot == IPPROTO_UDP) { | ||
809 | sk->sk_data_ready = udp_data_ready; | ||
810 | sk->sk_no_check = UDP_CSUM_NORCV; | ||
811 | xprt_set_connected(xprt); | ||
812 | } else { | ||
813 | tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ | ||
814 | sk->sk_data_ready = tcp_data_ready; | ||
815 | sk->sk_state_change = tcp_state_change; | ||
816 | xprt_clear_connected(xprt); | ||
817 | } | ||
818 | sk->sk_write_space = xprt_write_space; | ||
819 | |||
820 | /* Reset to new socket */ | ||
821 | xprt->sock = sock; | ||
822 | xprt->inet = sk; | ||
823 | write_unlock_bh(&sk->sk_callback_lock); | ||
824 | |||
825 | return; | ||
826 | } | ||
827 | |||
828 | /* | ||
829 | * Datastream sockets are created here, but xprt_connect will create | ||
830 | * and connect stream sockets. | ||
831 | */ | ||
832 | static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) | ||
833 | { | ||
834 | struct socket *sock; | ||
835 | int type, err; | ||
836 | |||
837 | dprintk("RPC: xprt_create_socket(%s %d)\n", | ||
838 | (proto == IPPROTO_UDP)? "udp" : "tcp", proto); | ||
839 | |||
840 | type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; | ||
841 | |||
842 | if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { | ||
843 | printk("RPC: can't create socket (%d).\n", -err); | ||
844 | return NULL; | ||
845 | } | ||
846 | |||
847 | /* If the caller has the capability, bind to a reserved port */ | ||
848 | if (resvport && xprt_bindresvport(xprt, sock) < 0) { | ||
849 | printk("RPC: can't bind to reserved port.\n"); | ||
850 | goto failed; | ||
851 | } | ||
852 | |||
853 | return sock; | ||
854 | |||
855 | failed: | ||
856 | sock_release(sock); | ||
857 | return NULL; | ||
858 | } | ||
859 | |||
860 | static void xprt_socket_connect(void *args) | ||
861 | { | ||
862 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | ||
863 | struct socket *sock = xprt->sock; | ||
864 | int status = -EIO; | ||
865 | |||
866 | if (xprt->shutdown || xprt->addr.sin_port == 0) | ||
867 | goto out; | ||
868 | |||
869 | /* | ||
870 | * Start by resetting any existing state | ||
871 | */ | ||
872 | xprt_close(xprt); | ||
873 | sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); | ||
874 | if (sock == NULL) { | ||
875 | /* couldn't create socket or bind to reserved port; | ||
876 | * this is likely a permanent error, so cause an abort */ | ||
877 | goto out; | ||
878 | } | ||
879 | xprt_bind_socket(xprt, sock); | ||
880 | xprt_sock_setbufsize(xprt); | ||
881 | |||
882 | status = 0; | ||
883 | if (!xprt->stream) | ||
884 | goto out; | ||
885 | |||
886 | /* | ||
887 | * Tell the socket layer to start connecting... | ||
888 | */ | ||
889 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, | ||
890 | sizeof(xprt->addr), O_NONBLOCK); | ||
891 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", | ||
892 | xprt, -status, xprt_connected(xprt), sock->sk->sk_state); | ||
893 | if (status < 0) { | ||
894 | switch (status) { | ||
895 | case -EINPROGRESS: | ||
896 | case -EALREADY: | ||
897 | goto out_clear; | ||
898 | } | ||
899 | } | ||
900 | out: | ||
901 | if (status < 0) | ||
902 | rpc_wake_up_status(&xprt->pending, status); | ||
903 | else | ||
904 | rpc_wake_up(&xprt->pending); | ||
905 | out_clear: | ||
906 | smp_mb__before_clear_bit(); | ||
907 | clear_bit(XPRT_CONNECTING, &xprt->sockstate); | ||
908 | smp_mb__after_clear_bit(); | ||
909 | } | ||
910 | |||
911 | static void | ||
912 | xprt_connect_sock(struct rpc_task *task) | ||
913 | { | ||
914 | struct rpc_xprt *xprt = task->tk_xprt; | ||
915 | |||
916 | if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { | ||
917 | /* Note: if we are here due to a dropped connection | ||
918 | * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ | ||
919 | * seconds | ||
920 | */ | ||
921 | if (xprt->sock != NULL) | ||
922 | schedule_delayed_work(&xprt->sock_connect, | ||
923 | RPC_REESTABLISH_TIMEOUT); | ||
924 | else { | ||
925 | schedule_work(&xprt->sock_connect); | ||
926 | /* flush_scheduled_work can sleep... */ | ||
927 | if (!RPC_IS_ASYNC(task)) | ||
928 | flush_scheduled_work(); | ||
929 | } | ||
930 | } | ||
931 | } | ||
932 | |||
933 | /* | ||
934 | * Set default timeout parameters | ||
935 | */ | ||
936 | static void | ||
937 | xprt_default_timeout(struct rpc_timeout *to, int proto) | ||
938 | { | ||
939 | if (proto == IPPROTO_UDP) | ||
940 | xprt_set_timeout(to, 5, 5 * HZ); | ||
941 | else | ||
942 | xprt_set_timeout(to, 2, 60 * HZ); | ||
943 | } | ||
944 | |||
945 | static struct rpc_xprt_ops xprt_socket_ops = { | ||
946 | .set_buffer_size = xprt_sock_setbufsize, | ||
947 | .connect = xprt_connect_sock, | ||
948 | .send_request = xprt_send_request, | ||
949 | .close = xprt_close, | ||
950 | .destroy = xprt_socket_destroy, | ||
951 | }; | ||
952 | |||
953 | extern unsigned int xprt_udp_slot_table_entries; | ||
954 | extern unsigned int xprt_tcp_slot_table_entries; | ||
955 | |||
956 | int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) | ||
957 | { | ||
958 | size_t slot_table_size; | ||
959 | |||
960 | dprintk("RPC: setting up udp-ipv4 transport...\n"); | ||
961 | |||
962 | xprt->max_reqs = xprt_udp_slot_table_entries; | ||
963 | slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); | ||
964 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
965 | if (xprt->slot == NULL) | ||
966 | return -ENOMEM; | ||
967 | memset(xprt->slot, 0, slot_table_size); | ||
968 | |||
969 | xprt->prot = IPPROTO_UDP; | ||
970 | xprt->port = XPRT_MAX_RESVPORT; | ||
971 | xprt->stream = 0; | ||
972 | xprt->nocong = 0; | ||
973 | xprt->cwnd = RPC_INITCWND; | ||
974 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
975 | /* XXX: header size can vary due to auth type, IPv6, etc. */ | ||
976 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); | ||
977 | |||
978 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | ||
979 | |||
980 | xprt->ops = &xprt_socket_ops; | ||
981 | |||
982 | if (to) | ||
983 | xprt->timeout = *to; | ||
984 | else | ||
985 | xprt_default_timeout(to, xprt->prot); | ||
986 | |||
987 | return 0; | ||
988 | } | ||
989 | |||
990 | int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) | ||
991 | { | ||
992 | size_t slot_table_size; | ||
993 | |||
994 | dprintk("RPC: setting up tcp-ipv4 transport...\n"); | ||
995 | |||
996 | xprt->max_reqs = xprt_tcp_slot_table_entries; | ||
997 | slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); | ||
998 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
999 | if (xprt->slot == NULL) | ||
1000 | return -ENOMEM; | ||
1001 | memset(xprt->slot, 0, slot_table_size); | ||
1002 | |||
1003 | xprt->prot = IPPROTO_TCP; | ||
1004 | xprt->port = XPRT_MAX_RESVPORT; | ||
1005 | xprt->stream = 1; | ||
1006 | xprt->nocong = 1; | ||
1007 | xprt->cwnd = RPC_MAXCWND(xprt); | ||
1008 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
1009 | xprt->max_payload = (1U << 31) - 1; | ||
1010 | |||
1011 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | ||
1012 | |||
1013 | xprt->ops = &xprt_socket_ops; | ||
1014 | |||
1015 | if (to) | ||
1016 | xprt->timeout = *to; | ||
1017 | else | ||
1018 | xprt_default_timeout(to, xprt->prot); | ||
1019 | |||
1020 | return 0; | ||
1021 | } | ||