diff options
author | Chuck Lever <cel@citi.umich.edu> | 2005-08-11 16:25:23 -0400 |
---|---|---|
committer | Trond Myklebust <Trond.Myklebust@netapp.com> | 2005-09-23 12:38:12 -0400 |
commit | a246b0105bbd9a70a698f69baae2042996f2a0e9 (patch) | |
tree | 6c8831d8579a7fdc5201d3e9c20270cb1420eeda /net/sunrpc/xprtsock.c | |
parent | 094bb20b9fcab3a1652a77741caba6b78097d622 (diff) |
[PATCH] RPC: introduce client-side transport switch
Move the bulk of client-side socket-specific code into a separate source
file, net/sunrpc/xprtsock.c.
Test-plan:
Millions of fsx operations. Performance characterization such as "sio" or
"iozone". Destructive testing (unplugging the network temporarily, server
reboots). Connectathon with v2, v3, and v4.
Version: Thu, 11 Aug 2005 16:03:38 -0400
Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r-- | net/sunrpc/xprtsock.c | 1021 |
1 files changed, 1021 insertions, 0 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c new file mode 100644 index 000000000000..fa1180ac4823 --- /dev/null +++ b/net/sunrpc/xprtsock.c | |||
@@ -0,0 +1,1021 @@ | |||
1 | /* | ||
2 | * linux/net/sunrpc/xprtsock.c | ||
3 | * | ||
4 | * Client-side transport implementation for sockets. | ||
5 | * | ||
6 | * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
7 | * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
8 | * TCP NFS related read + write fixes | ||
9 | * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> | ||
10 | * | ||
11 | * Rewrite of larges part of the code in order to stabilize TCP stuff. | ||
12 | * Fix behaviour when socket buffer is full. | ||
13 | * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> | ||
14 | */ | ||
15 | |||
16 | #include <linux/types.h> | ||
17 | #include <linux/slab.h> | ||
18 | #include <linux/capability.h> | ||
19 | #include <linux/sched.h> | ||
20 | #include <linux/pagemap.h> | ||
21 | #include <linux/errno.h> | ||
22 | #include <linux/socket.h> | ||
23 | #include <linux/in.h> | ||
24 | #include <linux/net.h> | ||
25 | #include <linux/mm.h> | ||
26 | #include <linux/udp.h> | ||
27 | #include <linux/tcp.h> | ||
28 | #include <linux/sunrpc/clnt.h> | ||
29 | #include <linux/file.h> | ||
30 | |||
31 | #include <net/sock.h> | ||
32 | #include <net/checksum.h> | ||
33 | #include <net/udp.h> | ||
34 | #include <net/tcp.h> | ||
35 | |||
36 | #ifdef RPC_DEBUG | ||
37 | # undef RPC_DEBUG_DATA | ||
38 | # define RPCDBG_FACILITY RPCDBG_XPRT | ||
39 | #endif | ||
40 | |||
41 | #define XPRT_MAX_RESVPORT (800) | ||
42 | |||
43 | #ifdef RPC_DEBUG_DATA | ||
44 | /* | ||
45 | * Print the buffer contents (first 128 bytes only--just enough for | ||
46 | * diropres return). | ||
47 | */ | ||
48 | static void | ||
49 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
50 | { | ||
51 | u8 *buf = (u8 *) packet; | ||
52 | int j; | ||
53 | |||
54 | dprintk("RPC: %s\n", msg); | ||
55 | for (j = 0; j < count && j < 128; j += 4) { | ||
56 | if (!(j & 31)) { | ||
57 | if (j) | ||
58 | dprintk("\n"); | ||
59 | dprintk("0x%04x ", j); | ||
60 | } | ||
61 | dprintk("%02x%02x%02x%02x ", | ||
62 | buf[j], buf[j+1], buf[j+2], buf[j+3]); | ||
63 | } | ||
64 | dprintk("\n"); | ||
65 | } | ||
66 | #else | ||
67 | static inline void | ||
68 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
69 | { | ||
70 | /* NOP */ | ||
71 | } | ||
72 | #endif | ||
73 | |||
74 | /* | ||
75 | * Look up RPC transport given an INET socket | ||
76 | */ | ||
77 | static inline struct rpc_xprt * | ||
78 | xprt_from_sock(struct sock *sk) | ||
79 | { | ||
80 | return (struct rpc_xprt *) sk->sk_user_data; | ||
81 | } | ||
82 | |||
83 | static int | ||
84 | xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, | ||
85 | struct xdr_buf *xdr, unsigned int base, int msgflags) | ||
86 | { | ||
87 | struct page **ppage = xdr->pages; | ||
88 | unsigned int len, pglen = xdr->page_len; | ||
89 | int err, ret = 0; | ||
90 | ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); | ||
91 | |||
92 | len = xdr->head[0].iov_len; | ||
93 | if (base < len || (addr != NULL && base == 0)) { | ||
94 | struct kvec iov = { | ||
95 | .iov_base = xdr->head[0].iov_base + base, | ||
96 | .iov_len = len - base, | ||
97 | }; | ||
98 | struct msghdr msg = { | ||
99 | .msg_name = addr, | ||
100 | .msg_namelen = addrlen, | ||
101 | .msg_flags = msgflags, | ||
102 | }; | ||
103 | if (xdr->len > len) | ||
104 | msg.msg_flags |= MSG_MORE; | ||
105 | |||
106 | if (iov.iov_len != 0) | ||
107 | err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); | ||
108 | else | ||
109 | err = kernel_sendmsg(sock, &msg, NULL, 0, 0); | ||
110 | if (ret == 0) | ||
111 | ret = err; | ||
112 | else if (err > 0) | ||
113 | ret += err; | ||
114 | if (err != iov.iov_len) | ||
115 | goto out; | ||
116 | base = 0; | ||
117 | } else | ||
118 | base -= len; | ||
119 | |||
120 | if (pglen == 0) | ||
121 | goto copy_tail; | ||
122 | if (base >= pglen) { | ||
123 | base -= pglen; | ||
124 | goto copy_tail; | ||
125 | } | ||
126 | if (base || xdr->page_base) { | ||
127 | pglen -= base; | ||
128 | base += xdr->page_base; | ||
129 | ppage += base >> PAGE_CACHE_SHIFT; | ||
130 | base &= ~PAGE_CACHE_MASK; | ||
131 | } | ||
132 | |||
133 | sendpage = sock->ops->sendpage ? : sock_no_sendpage; | ||
134 | do { | ||
135 | int flags = msgflags; | ||
136 | |||
137 | len = PAGE_CACHE_SIZE; | ||
138 | if (base) | ||
139 | len -= base; | ||
140 | if (pglen < len) | ||
141 | len = pglen; | ||
142 | |||
143 | if (pglen != len || xdr->tail[0].iov_len != 0) | ||
144 | flags |= MSG_MORE; | ||
145 | |||
146 | /* Hmm... We might be dealing with highmem pages */ | ||
147 | if (PageHighMem(*ppage)) | ||
148 | sendpage = sock_no_sendpage; | ||
149 | err = sendpage(sock, *ppage, base, len, flags); | ||
150 | if (ret == 0) | ||
151 | ret = err; | ||
152 | else if (err > 0) | ||
153 | ret += err; | ||
154 | if (err != len) | ||
155 | goto out; | ||
156 | base = 0; | ||
157 | ppage++; | ||
158 | } while ((pglen -= len) != 0); | ||
159 | copy_tail: | ||
160 | len = xdr->tail[0].iov_len; | ||
161 | if (base < len) { | ||
162 | struct kvec iov = { | ||
163 | .iov_base = xdr->tail[0].iov_base + base, | ||
164 | .iov_len = len - base, | ||
165 | }; | ||
166 | struct msghdr msg = { | ||
167 | .msg_flags = msgflags, | ||
168 | }; | ||
169 | err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); | ||
170 | if (ret == 0) | ||
171 | ret = err; | ||
172 | else if (err > 0) | ||
173 | ret += err; | ||
174 | } | ||
175 | out: | ||
176 | return ret; | ||
177 | } | ||
178 | |||
179 | /* | ||
180 | * Write data to socket. | ||
181 | */ | ||
182 | static inline int | ||
183 | xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) | ||
184 | { | ||
185 | struct socket *sock = xprt->sock; | ||
186 | struct xdr_buf *xdr = &req->rq_snd_buf; | ||
187 | struct sockaddr *addr = NULL; | ||
188 | int addrlen = 0; | ||
189 | unsigned int skip; | ||
190 | int result; | ||
191 | |||
192 | if (!sock) | ||
193 | return -ENOTCONN; | ||
194 | |||
195 | xprt_pktdump("packet data:", | ||
196 | req->rq_svec->iov_base, | ||
197 | req->rq_svec->iov_len); | ||
198 | |||
199 | /* For UDP, we need to provide an address */ | ||
200 | if (!xprt->stream) { | ||
201 | addr = (struct sockaddr *) &xprt->addr; | ||
202 | addrlen = sizeof(xprt->addr); | ||
203 | } | ||
204 | /* Dont repeat bytes */ | ||
205 | skip = req->rq_bytes_sent; | ||
206 | |||
207 | clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); | ||
208 | result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); | ||
209 | |||
210 | dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); | ||
211 | |||
212 | if (result >= 0) | ||
213 | return result; | ||
214 | |||
215 | switch (result) { | ||
216 | case -ECONNREFUSED: | ||
217 | /* When the server has died, an ICMP port unreachable message | ||
218 | * prompts ECONNREFUSED. | ||
219 | */ | ||
220 | case -EAGAIN: | ||
221 | break; | ||
222 | case -ECONNRESET: | ||
223 | case -ENOTCONN: | ||
224 | case -EPIPE: | ||
225 | /* connection broken */ | ||
226 | if (xprt->stream) | ||
227 | result = -ENOTCONN; | ||
228 | break; | ||
229 | default: | ||
230 | printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); | ||
231 | } | ||
232 | return result; | ||
233 | } | ||
234 | |||
235 | static int | ||
236 | xprt_send_request(struct rpc_task *task) | ||
237 | { | ||
238 | struct rpc_rqst *req = task->tk_rqstp; | ||
239 | struct rpc_xprt *xprt = req->rq_xprt; | ||
240 | int status, retry = 0; | ||
241 | |||
242 | /* set up everything as needed. */ | ||
243 | /* Write the record marker */ | ||
244 | if (xprt->stream) { | ||
245 | u32 *marker = req->rq_svec[0].iov_base; | ||
246 | |||
247 | *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); | ||
248 | } | ||
249 | |||
250 | /* Continue transmitting the packet/record. We must be careful | ||
251 | * to cope with writespace callbacks arriving _after_ we have | ||
252 | * called xprt_sendmsg(). | ||
253 | */ | ||
254 | while (1) { | ||
255 | req->rq_xtime = jiffies; | ||
256 | status = xprt_sendmsg(xprt, req); | ||
257 | |||
258 | if (status < 0) | ||
259 | break; | ||
260 | |||
261 | if (xprt->stream) { | ||
262 | req->rq_bytes_sent += status; | ||
263 | |||
264 | /* If we've sent the entire packet, immediately | ||
265 | * reset the count of bytes sent. */ | ||
266 | if (req->rq_bytes_sent >= req->rq_slen) { | ||
267 | req->rq_bytes_sent = 0; | ||
268 | return 0; | ||
269 | } | ||
270 | } else { | ||
271 | if (status >= req->rq_slen) | ||
272 | return 0; | ||
273 | status = -EAGAIN; | ||
274 | break; | ||
275 | } | ||
276 | |||
277 | dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", | ||
278 | task->tk_pid, req->rq_slen - req->rq_bytes_sent, | ||
279 | req->rq_slen); | ||
280 | |||
281 | status = -EAGAIN; | ||
282 | if (retry++ > 50) | ||
283 | break; | ||
284 | } | ||
285 | |||
286 | if (status == -EAGAIN) { | ||
287 | if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { | ||
288 | /* Protect against races with xprt_write_space */ | ||
289 | spin_lock_bh(&xprt->sock_lock); | ||
290 | /* Don't race with disconnect */ | ||
291 | if (!xprt_connected(xprt)) | ||
292 | task->tk_status = -ENOTCONN; | ||
293 | else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { | ||
294 | task->tk_timeout = req->rq_timeout; | ||
295 | rpc_sleep_on(&xprt->pending, task, NULL, NULL); | ||
296 | } | ||
297 | spin_unlock_bh(&xprt->sock_lock); | ||
298 | return status; | ||
299 | } | ||
300 | /* Keep holding the socket if it is blocked */ | ||
301 | rpc_delay(task, HZ>>4); | ||
302 | } | ||
303 | return status; | ||
304 | } | ||
305 | |||
306 | /* | ||
307 | * Close down a transport socket | ||
308 | */ | ||
309 | static void | ||
310 | xprt_close(struct rpc_xprt *xprt) | ||
311 | { | ||
312 | struct socket *sock = xprt->sock; | ||
313 | struct sock *sk = xprt->inet; | ||
314 | |||
315 | if (!sk) | ||
316 | return; | ||
317 | |||
318 | write_lock_bh(&sk->sk_callback_lock); | ||
319 | xprt->inet = NULL; | ||
320 | xprt->sock = NULL; | ||
321 | |||
322 | sk->sk_user_data = NULL; | ||
323 | sk->sk_data_ready = xprt->old_data_ready; | ||
324 | sk->sk_state_change = xprt->old_state_change; | ||
325 | sk->sk_write_space = xprt->old_write_space; | ||
326 | write_unlock_bh(&sk->sk_callback_lock); | ||
327 | |||
328 | sk->sk_no_check = 0; | ||
329 | |||
330 | sock_release(sock); | ||
331 | } | ||
332 | |||
333 | static void xprt_socket_destroy(struct rpc_xprt *xprt) | ||
334 | { | ||
335 | cancel_delayed_work(&xprt->sock_connect); | ||
336 | flush_scheduled_work(); | ||
337 | |||
338 | xprt_disconnect(xprt); | ||
339 | xprt_close(xprt); | ||
340 | kfree(xprt->slot); | ||
341 | } | ||
342 | |||
343 | /* | ||
344 | * Input handler for RPC replies. Called from a bottom half and hence | ||
345 | * atomic. | ||
346 | */ | ||
347 | static void | ||
348 | udp_data_ready(struct sock *sk, int len) | ||
349 | { | ||
350 | struct rpc_task *task; | ||
351 | struct rpc_xprt *xprt; | ||
352 | struct rpc_rqst *rovr; | ||
353 | struct sk_buff *skb; | ||
354 | int err, repsize, copied; | ||
355 | u32 _xid, *xp; | ||
356 | |||
357 | read_lock(&sk->sk_callback_lock); | ||
358 | dprintk("RPC: udp_data_ready...\n"); | ||
359 | if (!(xprt = xprt_from_sock(sk))) { | ||
360 | printk("RPC: udp_data_ready request not found!\n"); | ||
361 | goto out; | ||
362 | } | ||
363 | |||
364 | dprintk("RPC: udp_data_ready client %p\n", xprt); | ||
365 | |||
366 | if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) | ||
367 | goto out; | ||
368 | |||
369 | if (xprt->shutdown) | ||
370 | goto dropit; | ||
371 | |||
372 | repsize = skb->len - sizeof(struct udphdr); | ||
373 | if (repsize < 4) { | ||
374 | printk("RPC: impossible RPC reply size %d!\n", repsize); | ||
375 | goto dropit; | ||
376 | } | ||
377 | |||
378 | /* Copy the XID from the skb... */ | ||
379 | xp = skb_header_pointer(skb, sizeof(struct udphdr), | ||
380 | sizeof(_xid), &_xid); | ||
381 | if (xp == NULL) | ||
382 | goto dropit; | ||
383 | |||
384 | /* Look up and lock the request corresponding to the given XID */ | ||
385 | spin_lock(&xprt->sock_lock); | ||
386 | rovr = xprt_lookup_rqst(xprt, *xp); | ||
387 | if (!rovr) | ||
388 | goto out_unlock; | ||
389 | task = rovr->rq_task; | ||
390 | |||
391 | dprintk("RPC: %4d received reply\n", task->tk_pid); | ||
392 | |||
393 | if ((copied = rovr->rq_private_buf.buflen) > repsize) | ||
394 | copied = repsize; | ||
395 | |||
396 | /* Suck it into the iovec, verify checksum if not done by hw. */ | ||
397 | if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) | ||
398 | goto out_unlock; | ||
399 | |||
400 | /* Something worked... */ | ||
401 | dst_confirm(skb->dst); | ||
402 | |||
403 | xprt_complete_rqst(xprt, rovr, copied); | ||
404 | |||
405 | out_unlock: | ||
406 | spin_unlock(&xprt->sock_lock); | ||
407 | dropit: | ||
408 | skb_free_datagram(sk, skb); | ||
409 | out: | ||
410 | read_unlock(&sk->sk_callback_lock); | ||
411 | } | ||
412 | |||
413 | /* | ||
414 | * Copy from an skb into memory and shrink the skb. | ||
415 | */ | ||
416 | static inline size_t | ||
417 | tcp_copy_data(skb_reader_t *desc, void *p, size_t len) | ||
418 | { | ||
419 | if (len > desc->count) | ||
420 | len = desc->count; | ||
421 | if (skb_copy_bits(desc->skb, desc->offset, p, len)) { | ||
422 | dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", | ||
423 | len, desc->count); | ||
424 | return 0; | ||
425 | } | ||
426 | desc->offset += len; | ||
427 | desc->count -= len; | ||
428 | dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", | ||
429 | len, desc->count); | ||
430 | return len; | ||
431 | } | ||
432 | |||
433 | /* | ||
434 | * TCP read fragment marker | ||
435 | */ | ||
436 | static inline void | ||
437 | tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
438 | { | ||
439 | size_t len, used; | ||
440 | char *p; | ||
441 | |||
442 | p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; | ||
443 | len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; | ||
444 | used = tcp_copy_data(desc, p, len); | ||
445 | xprt->tcp_offset += used; | ||
446 | if (used != len) | ||
447 | return; | ||
448 | xprt->tcp_reclen = ntohl(xprt->tcp_recm); | ||
449 | if (xprt->tcp_reclen & 0x80000000) | ||
450 | xprt->tcp_flags |= XPRT_LAST_FRAG; | ||
451 | else | ||
452 | xprt->tcp_flags &= ~XPRT_LAST_FRAG; | ||
453 | xprt->tcp_reclen &= 0x7fffffff; | ||
454 | xprt->tcp_flags &= ~XPRT_COPY_RECM; | ||
455 | xprt->tcp_offset = 0; | ||
456 | /* Sanity check of the record length */ | ||
457 | if (xprt->tcp_reclen < 4) { | ||
458 | printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); | ||
459 | xprt_disconnect(xprt); | ||
460 | } | ||
461 | dprintk("RPC: reading TCP record fragment of length %d\n", | ||
462 | xprt->tcp_reclen); | ||
463 | } | ||
464 | |||
465 | static void | ||
466 | tcp_check_recm(struct rpc_xprt *xprt) | ||
467 | { | ||
468 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", | ||
469 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); | ||
470 | if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
471 | xprt->tcp_flags |= XPRT_COPY_RECM; | ||
472 | xprt->tcp_offset = 0; | ||
473 | if (xprt->tcp_flags & XPRT_LAST_FRAG) { | ||
474 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
475 | xprt->tcp_flags |= XPRT_COPY_XID; | ||
476 | xprt->tcp_copied = 0; | ||
477 | } | ||
478 | } | ||
479 | } | ||
480 | |||
481 | /* | ||
482 | * TCP read xid | ||
483 | */ | ||
484 | static inline void | ||
485 | tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
486 | { | ||
487 | size_t len, used; | ||
488 | char *p; | ||
489 | |||
490 | len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; | ||
491 | dprintk("RPC: reading XID (%Zu bytes)\n", len); | ||
492 | p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; | ||
493 | used = tcp_copy_data(desc, p, len); | ||
494 | xprt->tcp_offset += used; | ||
495 | if (used != len) | ||
496 | return; | ||
497 | xprt->tcp_flags &= ~XPRT_COPY_XID; | ||
498 | xprt->tcp_flags |= XPRT_COPY_DATA; | ||
499 | xprt->tcp_copied = 4; | ||
500 | dprintk("RPC: reading reply for XID %08x\n", | ||
501 | ntohl(xprt->tcp_xid)); | ||
502 | tcp_check_recm(xprt); | ||
503 | } | ||
504 | |||
505 | /* | ||
506 | * TCP read and complete request | ||
507 | */ | ||
508 | static inline void | ||
509 | tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
510 | { | ||
511 | struct rpc_rqst *req; | ||
512 | struct xdr_buf *rcvbuf; | ||
513 | size_t len; | ||
514 | ssize_t r; | ||
515 | |||
516 | /* Find and lock the request corresponding to this xid */ | ||
517 | spin_lock(&xprt->sock_lock); | ||
518 | req = xprt_lookup_rqst(xprt, xprt->tcp_xid); | ||
519 | if (!req) { | ||
520 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
521 | dprintk("RPC: XID %08x request not found!\n", | ||
522 | ntohl(xprt->tcp_xid)); | ||
523 | spin_unlock(&xprt->sock_lock); | ||
524 | return; | ||
525 | } | ||
526 | |||
527 | rcvbuf = &req->rq_private_buf; | ||
528 | len = desc->count; | ||
529 | if (len > xprt->tcp_reclen - xprt->tcp_offset) { | ||
530 | skb_reader_t my_desc; | ||
531 | |||
532 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
533 | memcpy(&my_desc, desc, sizeof(my_desc)); | ||
534 | my_desc.count = len; | ||
535 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
536 | &my_desc, tcp_copy_data); | ||
537 | desc->count -= r; | ||
538 | desc->offset += r; | ||
539 | } else | ||
540 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
541 | desc, tcp_copy_data); | ||
542 | |||
543 | if (r > 0) { | ||
544 | xprt->tcp_copied += r; | ||
545 | xprt->tcp_offset += r; | ||
546 | } | ||
547 | if (r != len) { | ||
548 | /* Error when copying to the receive buffer, | ||
549 | * usually because we weren't able to allocate | ||
550 | * additional buffer pages. All we can do now | ||
551 | * is turn off XPRT_COPY_DATA, so the request | ||
552 | * will not receive any additional updates, | ||
553 | * and time out. | ||
554 | * Any remaining data from this record will | ||
555 | * be discarded. | ||
556 | */ | ||
557 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
558 | dprintk("RPC: XID %08x truncated request\n", | ||
559 | ntohl(xprt->tcp_xid)); | ||
560 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
561 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
562 | goto out; | ||
563 | } | ||
564 | |||
565 | dprintk("RPC: XID %08x read %Zd bytes\n", | ||
566 | ntohl(xprt->tcp_xid), r); | ||
567 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
568 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
569 | |||
570 | if (xprt->tcp_copied == req->rq_private_buf.buflen) | ||
571 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
572 | else if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
573 | if (xprt->tcp_flags & XPRT_LAST_FRAG) | ||
574 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
575 | } | ||
576 | |||
577 | out: | ||
578 | if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { | ||
579 | dprintk("RPC: %4d received reply complete\n", | ||
580 | req->rq_task->tk_pid); | ||
581 | xprt_complete_rqst(xprt, req, xprt->tcp_copied); | ||
582 | } | ||
583 | spin_unlock(&xprt->sock_lock); | ||
584 | tcp_check_recm(xprt); | ||
585 | } | ||
586 | |||
587 | /* | ||
588 | * TCP discard extra bytes from a short read | ||
589 | */ | ||
590 | static inline void | ||
591 | tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
592 | { | ||
593 | size_t len; | ||
594 | |||
595 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
596 | if (len > desc->count) | ||
597 | len = desc->count; | ||
598 | desc->count -= len; | ||
599 | desc->offset += len; | ||
600 | xprt->tcp_offset += len; | ||
601 | dprintk("RPC: discarded %Zu bytes\n", len); | ||
602 | tcp_check_recm(xprt); | ||
603 | } | ||
604 | |||
605 | /* | ||
606 | * TCP record receive routine | ||
607 | * We first have to grab the record marker, then the XID, then the data. | ||
608 | */ | ||
609 | static int | ||
610 | tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, | ||
611 | unsigned int offset, size_t len) | ||
612 | { | ||
613 | struct rpc_xprt *xprt = rd_desc->arg.data; | ||
614 | skb_reader_t desc = { | ||
615 | .skb = skb, | ||
616 | .offset = offset, | ||
617 | .count = len, | ||
618 | .csum = 0 | ||
619 | }; | ||
620 | |||
621 | dprintk("RPC: tcp_data_recv\n"); | ||
622 | do { | ||
623 | /* Read in a new fragment marker if necessary */ | ||
624 | /* Can we ever really expect to get completely empty fragments? */ | ||
625 | if (xprt->tcp_flags & XPRT_COPY_RECM) { | ||
626 | tcp_read_fraghdr(xprt, &desc); | ||
627 | continue; | ||
628 | } | ||
629 | /* Read in the xid if necessary */ | ||
630 | if (xprt->tcp_flags & XPRT_COPY_XID) { | ||
631 | tcp_read_xid(xprt, &desc); | ||
632 | continue; | ||
633 | } | ||
634 | /* Read in the request data */ | ||
635 | if (xprt->tcp_flags & XPRT_COPY_DATA) { | ||
636 | tcp_read_request(xprt, &desc); | ||
637 | continue; | ||
638 | } | ||
639 | /* Skip over any trailing bytes on short reads */ | ||
640 | tcp_read_discard(xprt, &desc); | ||
641 | } while (desc.count); | ||
642 | dprintk("RPC: tcp_data_recv done\n"); | ||
643 | return len - desc.count; | ||
644 | } | ||
645 | |||
646 | static void tcp_data_ready(struct sock *sk, int bytes) | ||
647 | { | ||
648 | struct rpc_xprt *xprt; | ||
649 | read_descriptor_t rd_desc; | ||
650 | |||
651 | read_lock(&sk->sk_callback_lock); | ||
652 | dprintk("RPC: tcp_data_ready...\n"); | ||
653 | if (!(xprt = xprt_from_sock(sk))) { | ||
654 | printk("RPC: tcp_data_ready socket info not found!\n"); | ||
655 | goto out; | ||
656 | } | ||
657 | if (xprt->shutdown) | ||
658 | goto out; | ||
659 | |||
660 | /* We use rd_desc to pass struct xprt to tcp_data_recv */ | ||
661 | rd_desc.arg.data = xprt; | ||
662 | rd_desc.count = 65536; | ||
663 | tcp_read_sock(sk, &rd_desc, tcp_data_recv); | ||
664 | out: | ||
665 | read_unlock(&sk->sk_callback_lock); | ||
666 | } | ||
667 | |||
668 | static void | ||
669 | tcp_state_change(struct sock *sk) | ||
670 | { | ||
671 | struct rpc_xprt *xprt; | ||
672 | |||
673 | read_lock(&sk->sk_callback_lock); | ||
674 | if (!(xprt = xprt_from_sock(sk))) | ||
675 | goto out; | ||
676 | dprintk("RPC: tcp_state_change client %p...\n", xprt); | ||
677 | dprintk("RPC: state %x conn %d dead %d zapped %d\n", | ||
678 | sk->sk_state, xprt_connected(xprt), | ||
679 | sock_flag(sk, SOCK_DEAD), | ||
680 | sock_flag(sk, SOCK_ZAPPED)); | ||
681 | |||
682 | switch (sk->sk_state) { | ||
683 | case TCP_ESTABLISHED: | ||
684 | spin_lock_bh(&xprt->sock_lock); | ||
685 | if (!xprt_test_and_set_connected(xprt)) { | ||
686 | /* Reset TCP record info */ | ||
687 | xprt->tcp_offset = 0; | ||
688 | xprt->tcp_reclen = 0; | ||
689 | xprt->tcp_copied = 0; | ||
690 | xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; | ||
691 | rpc_wake_up(&xprt->pending); | ||
692 | } | ||
693 | spin_unlock_bh(&xprt->sock_lock); | ||
694 | break; | ||
695 | case TCP_SYN_SENT: | ||
696 | case TCP_SYN_RECV: | ||
697 | break; | ||
698 | default: | ||
699 | xprt_disconnect(xprt); | ||
700 | break; | ||
701 | } | ||
702 | out: | ||
703 | read_unlock(&sk->sk_callback_lock); | ||
704 | } | ||
705 | |||
706 | /* | ||
707 | * Called when more output buffer space is available for this socket. | ||
708 | * We try not to wake our writers until they can make "significant" | ||
709 | * progress, otherwise we'll waste resources thrashing sock_sendmsg | ||
710 | * with a bunch of small requests. | ||
711 | */ | ||
712 | static void | ||
713 | xprt_write_space(struct sock *sk) | ||
714 | { | ||
715 | struct rpc_xprt *xprt; | ||
716 | struct socket *sock; | ||
717 | |||
718 | read_lock(&sk->sk_callback_lock); | ||
719 | if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) | ||
720 | goto out; | ||
721 | if (xprt->shutdown) | ||
722 | goto out; | ||
723 | |||
724 | /* Wait until we have enough socket memory */ | ||
725 | if (xprt->stream) { | ||
726 | /* from net/core/stream.c:sk_stream_write_space */ | ||
727 | if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) | ||
728 | goto out; | ||
729 | } else { | ||
730 | /* from net/core/sock.c:sock_def_write_space */ | ||
731 | if (!sock_writeable(sk)) | ||
732 | goto out; | ||
733 | } | ||
734 | |||
735 | if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) | ||
736 | goto out; | ||
737 | |||
738 | spin_lock_bh(&xprt->sock_lock); | ||
739 | if (xprt->snd_task) | ||
740 | rpc_wake_up_task(xprt->snd_task); | ||
741 | spin_unlock_bh(&xprt->sock_lock); | ||
742 | out: | ||
743 | read_unlock(&sk->sk_callback_lock); | ||
744 | } | ||
745 | |||
746 | /* | ||
747 | * Set socket buffer length | ||
748 | */ | ||
749 | static void | ||
750 | xprt_sock_setbufsize(struct rpc_xprt *xprt) | ||
751 | { | ||
752 | struct sock *sk = xprt->inet; | ||
753 | |||
754 | if (xprt->stream) | ||
755 | return; | ||
756 | if (xprt->rcvsize) { | ||
757 | sk->sk_userlocks |= SOCK_RCVBUF_LOCK; | ||
758 | sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; | ||
759 | } | ||
760 | if (xprt->sndsize) { | ||
761 | sk->sk_userlocks |= SOCK_SNDBUF_LOCK; | ||
762 | sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; | ||
763 | sk->sk_write_space(sk); | ||
764 | } | ||
765 | } | ||
766 | |||
767 | /* | ||
768 | * Bind to a reserved port | ||
769 | */ | ||
770 | static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) | ||
771 | { | ||
772 | struct sockaddr_in myaddr = { | ||
773 | .sin_family = AF_INET, | ||
774 | }; | ||
775 | int err, port; | ||
776 | |||
777 | /* Were we already bound to a given port? Try to reuse it */ | ||
778 | port = xprt->port; | ||
779 | do { | ||
780 | myaddr.sin_port = htons(port); | ||
781 | err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, | ||
782 | sizeof(myaddr)); | ||
783 | if (err == 0) { | ||
784 | xprt->port = port; | ||
785 | return 0; | ||
786 | } | ||
787 | if (--port == 0) | ||
788 | port = XPRT_MAX_RESVPORT; | ||
789 | } while (err == -EADDRINUSE && port != xprt->port); | ||
790 | |||
791 | printk("RPC: Can't bind to reserved port (%d).\n", -err); | ||
792 | return err; | ||
793 | } | ||
794 | |||
795 | static void | ||
796 | xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) | ||
797 | { | ||
798 | struct sock *sk = sock->sk; | ||
799 | |||
800 | if (xprt->inet) | ||
801 | return; | ||
802 | |||
803 | write_lock_bh(&sk->sk_callback_lock); | ||
804 | sk->sk_user_data = xprt; | ||
805 | xprt->old_data_ready = sk->sk_data_ready; | ||
806 | xprt->old_state_change = sk->sk_state_change; | ||
807 | xprt->old_write_space = sk->sk_write_space; | ||
808 | if (xprt->prot == IPPROTO_UDP) { | ||
809 | sk->sk_data_ready = udp_data_ready; | ||
810 | sk->sk_no_check = UDP_CSUM_NORCV; | ||
811 | xprt_set_connected(xprt); | ||
812 | } else { | ||
813 | tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ | ||
814 | sk->sk_data_ready = tcp_data_ready; | ||
815 | sk->sk_state_change = tcp_state_change; | ||
816 | xprt_clear_connected(xprt); | ||
817 | } | ||
818 | sk->sk_write_space = xprt_write_space; | ||
819 | |||
820 | /* Reset to new socket */ | ||
821 | xprt->sock = sock; | ||
822 | xprt->inet = sk; | ||
823 | write_unlock_bh(&sk->sk_callback_lock); | ||
824 | |||
825 | return; | ||
826 | } | ||
827 | |||
828 | /* | ||
829 | * Datastream sockets are created here, but xprt_connect will create | ||
830 | * and connect stream sockets. | ||
831 | */ | ||
832 | static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) | ||
833 | { | ||
834 | struct socket *sock; | ||
835 | int type, err; | ||
836 | |||
837 | dprintk("RPC: xprt_create_socket(%s %d)\n", | ||
838 | (proto == IPPROTO_UDP)? "udp" : "tcp", proto); | ||
839 | |||
840 | type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; | ||
841 | |||
842 | if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { | ||
843 | printk("RPC: can't create socket (%d).\n", -err); | ||
844 | return NULL; | ||
845 | } | ||
846 | |||
847 | /* If the caller has the capability, bind to a reserved port */ | ||
848 | if (resvport && xprt_bindresvport(xprt, sock) < 0) { | ||
849 | printk("RPC: can't bind to reserved port.\n"); | ||
850 | goto failed; | ||
851 | } | ||
852 | |||
853 | return sock; | ||
854 | |||
855 | failed: | ||
856 | sock_release(sock); | ||
857 | return NULL; | ||
858 | } | ||
859 | |||
860 | static void xprt_socket_connect(void *args) | ||
861 | { | ||
862 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | ||
863 | struct socket *sock = xprt->sock; | ||
864 | int status = -EIO; | ||
865 | |||
866 | if (xprt->shutdown || xprt->addr.sin_port == 0) | ||
867 | goto out; | ||
868 | |||
869 | /* | ||
870 | * Start by resetting any existing state | ||
871 | */ | ||
872 | xprt_close(xprt); | ||
873 | sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); | ||
874 | if (sock == NULL) { | ||
875 | /* couldn't create socket or bind to reserved port; | ||
876 | * this is likely a permanent error, so cause an abort */ | ||
877 | goto out; | ||
878 | } | ||
879 | xprt_bind_socket(xprt, sock); | ||
880 | xprt_sock_setbufsize(xprt); | ||
881 | |||
882 | status = 0; | ||
883 | if (!xprt->stream) | ||
884 | goto out; | ||
885 | |||
886 | /* | ||
887 | * Tell the socket layer to start connecting... | ||
888 | */ | ||
889 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, | ||
890 | sizeof(xprt->addr), O_NONBLOCK); | ||
891 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", | ||
892 | xprt, -status, xprt_connected(xprt), sock->sk->sk_state); | ||
893 | if (status < 0) { | ||
894 | switch (status) { | ||
895 | case -EINPROGRESS: | ||
896 | case -EALREADY: | ||
897 | goto out_clear; | ||
898 | } | ||
899 | } | ||
900 | out: | ||
901 | if (status < 0) | ||
902 | rpc_wake_up_status(&xprt->pending, status); | ||
903 | else | ||
904 | rpc_wake_up(&xprt->pending); | ||
905 | out_clear: | ||
906 | smp_mb__before_clear_bit(); | ||
907 | clear_bit(XPRT_CONNECTING, &xprt->sockstate); | ||
908 | smp_mb__after_clear_bit(); | ||
909 | } | ||
910 | |||
911 | static void | ||
912 | xprt_connect_sock(struct rpc_task *task) | ||
913 | { | ||
914 | struct rpc_xprt *xprt = task->tk_xprt; | ||
915 | |||
916 | if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { | ||
917 | /* Note: if we are here due to a dropped connection | ||
918 | * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ | ||
919 | * seconds | ||
920 | */ | ||
921 | if (xprt->sock != NULL) | ||
922 | schedule_delayed_work(&xprt->sock_connect, | ||
923 | RPC_REESTABLISH_TIMEOUT); | ||
924 | else { | ||
925 | schedule_work(&xprt->sock_connect); | ||
926 | /* flush_scheduled_work can sleep... */ | ||
927 | if (!RPC_IS_ASYNC(task)) | ||
928 | flush_scheduled_work(); | ||
929 | } | ||
930 | } | ||
931 | } | ||
932 | |||
933 | /* | ||
934 | * Set default timeout parameters | ||
935 | */ | ||
936 | static void | ||
937 | xprt_default_timeout(struct rpc_timeout *to, int proto) | ||
938 | { | ||
939 | if (proto == IPPROTO_UDP) | ||
940 | xprt_set_timeout(to, 5, 5 * HZ); | ||
941 | else | ||
942 | xprt_set_timeout(to, 2, 60 * HZ); | ||
943 | } | ||
944 | |||
945 | static struct rpc_xprt_ops xprt_socket_ops = { | ||
946 | .set_buffer_size = xprt_sock_setbufsize, | ||
947 | .connect = xprt_connect_sock, | ||
948 | .send_request = xprt_send_request, | ||
949 | .close = xprt_close, | ||
950 | .destroy = xprt_socket_destroy, | ||
951 | }; | ||
952 | |||
953 | extern unsigned int xprt_udp_slot_table_entries; | ||
954 | extern unsigned int xprt_tcp_slot_table_entries; | ||
955 | |||
956 | int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) | ||
957 | { | ||
958 | size_t slot_table_size; | ||
959 | |||
960 | dprintk("RPC: setting up udp-ipv4 transport...\n"); | ||
961 | |||
962 | xprt->max_reqs = xprt_udp_slot_table_entries; | ||
963 | slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); | ||
964 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
965 | if (xprt->slot == NULL) | ||
966 | return -ENOMEM; | ||
967 | memset(xprt->slot, 0, slot_table_size); | ||
968 | |||
969 | xprt->prot = IPPROTO_UDP; | ||
970 | xprt->port = XPRT_MAX_RESVPORT; | ||
971 | xprt->stream = 0; | ||
972 | xprt->nocong = 0; | ||
973 | xprt->cwnd = RPC_INITCWND; | ||
974 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
975 | /* XXX: header size can vary due to auth type, IPv6, etc. */ | ||
976 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); | ||
977 | |||
978 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | ||
979 | |||
980 | xprt->ops = &xprt_socket_ops; | ||
981 | |||
982 | if (to) | ||
983 | xprt->timeout = *to; | ||
984 | else | ||
985 | xprt_default_timeout(to, xprt->prot); | ||
986 | |||
987 | return 0; | ||
988 | } | ||
989 | |||
990 | int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) | ||
991 | { | ||
992 | size_t slot_table_size; | ||
993 | |||
994 | dprintk("RPC: setting up tcp-ipv4 transport...\n"); | ||
995 | |||
996 | xprt->max_reqs = xprt_tcp_slot_table_entries; | ||
997 | slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); | ||
998 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
999 | if (xprt->slot == NULL) | ||
1000 | return -ENOMEM; | ||
1001 | memset(xprt->slot, 0, slot_table_size); | ||
1002 | |||
1003 | xprt->prot = IPPROTO_TCP; | ||
1004 | xprt->port = XPRT_MAX_RESVPORT; | ||
1005 | xprt->stream = 1; | ||
1006 | xprt->nocong = 1; | ||
1007 | xprt->cwnd = RPC_MAXCWND(xprt); | ||
1008 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
1009 | xprt->max_payload = (1U << 31) - 1; | ||
1010 | |||
1011 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | ||
1012 | |||
1013 | xprt->ops = &xprt_socket_ops; | ||
1014 | |||
1015 | if (to) | ||
1016 | xprt->timeout = *to; | ||
1017 | else | ||
1018 | xprt_default_timeout(to, xprt->prot); | ||
1019 | |||
1020 | return 0; | ||
1021 | } | ||