aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
authorChuck Lever <cel@citi.umich.edu>2005-08-11 16:25:26 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2005-09-23 12:38:14 -0400
commit9903cd1c27a1f30e8efea75e125be3b2002f7cb9 (patch)
treead684d0ce47793f161839e2321f118206ef707f3 /net/sunrpc/xprtsock.c
parenta246b0105bbd9a70a698f69baae2042996f2a0e9 (diff)
[PATCH] RPC: transport switch function naming
Introduce block header comments and a function naming convention to the socket transport implementation. Provide a debug setting for transports that is separate from RPCDBG_XPRT. Eliminate xprt_default_timeout(). Provide block comments for exposed interfaces in xprt.c, and eliminate the useless obvious comments. Convert printk's to dprintk's. Test-plan: Compile kernel with CONFIG_NFS enabled. Version: Thu, 11 Aug 2005 16:04:04 -0400 Signed-off-by: Chuck Lever <cel@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c464
1 files changed, 238 insertions, 226 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index fa1180ac4823..80222de3afa4 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -33,23 +33,21 @@
33#include <net/udp.h> 33#include <net/udp.h>
34#include <net/tcp.h> 34#include <net/tcp.h>
35 35
36/*
37 * Maximum port number to use when requesting a reserved port.
38 */
39#define XS_MAX_RESVPORT (800U)
40
36#ifdef RPC_DEBUG 41#ifdef RPC_DEBUG
37# undef RPC_DEBUG_DATA 42# undef RPC_DEBUG_DATA
38# define RPCDBG_FACILITY RPCDBG_XPRT 43# define RPCDBG_FACILITY RPCDBG_TRANS
39#endif 44#endif
40 45
41#define XPRT_MAX_RESVPORT (800)
42
43#ifdef RPC_DEBUG_DATA 46#ifdef RPC_DEBUG_DATA
44/* 47static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
45 * Print the buffer contents (first 128 bytes only--just enough for
46 * diropres return).
47 */
48static void
49xprt_pktdump(char *msg, u32 *packet, unsigned int count)
50{ 48{
51 u8 *buf = (u8 *) packet; 49 u8 *buf = (u8 *) packet;
52 int j; 50 int j;
53 51
54 dprintk("RPC: %s\n", msg); 52 dprintk("RPC: %s\n", msg);
55 for (j = 0; j < count && j < 128; j += 4) { 53 for (j = 0; j < count && j < 128; j += 4) {
@@ -64,25 +62,22 @@ xprt_pktdump(char *msg, u32 *packet, unsigned int count)
64 dprintk("\n"); 62 dprintk("\n");
65} 63}
66#else 64#else
67static inline void 65static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
68xprt_pktdump(char *msg, u32 *packet, unsigned int count)
69{ 66{
70 /* NOP */ 67 /* NOP */
71} 68}
72#endif 69#endif
73 70
74/* 71/**
75 * Look up RPC transport given an INET socket 72 * xs_sendpages - write pages directly to a socket
73 * @sock: socket to send on
74 * @addr: UDP only -- address of destination
75 * @addrlen: UDP only -- length of destination address
76 * @xdr: buffer containing this request
77 * @base: starting position in the buffer
78 *
76 */ 79 */
77static inline struct rpc_xprt * 80static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, int msgflags)
78xprt_from_sock(struct sock *sk)
79{
80 return (struct rpc_xprt *) sk->sk_user_data;
81}
82
83static int
84xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
85 struct xdr_buf *xdr, unsigned int base, int msgflags)
86{ 81{
87 struct page **ppage = xdr->pages; 82 struct page **ppage = xdr->pages;
88 unsigned int len, pglen = xdr->page_len; 83 unsigned int len, pglen = xdr->page_len;
@@ -125,7 +120,7 @@ xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
125 } 120 }
126 if (base || xdr->page_base) { 121 if (base || xdr->page_base) {
127 pglen -= base; 122 pglen -= base;
128 base += xdr->page_base; 123 base += xdr->page_base;
129 ppage += base >> PAGE_CACHE_SHIFT; 124 ppage += base >> PAGE_CACHE_SHIFT;
130 base &= ~PAGE_CACHE_MASK; 125 base &= ~PAGE_CACHE_MASK;
131 } 126 }
@@ -176,23 +171,25 @@ out:
176 return ret; 171 return ret;
177} 172}
178 173
179/* 174/**
180 * Write data to socket. 175 * xs_sendmsg - write an RPC request to a socket
176 * @xprt: generic transport
177 * @req: the RPC request to write
178 *
181 */ 179 */
182static inline int 180static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
183xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
184{ 181{
185 struct socket *sock = xprt->sock; 182 struct socket *sock = xprt->sock;
186 struct xdr_buf *xdr = &req->rq_snd_buf; 183 struct xdr_buf *xdr = &req->rq_snd_buf;
187 struct sockaddr *addr = NULL; 184 struct sockaddr *addr = NULL;
188 int addrlen = 0; 185 int addrlen = 0;
189 unsigned int skip; 186 unsigned int skip;
190 int result; 187 int result;
191 188
192 if (!sock) 189 if (!sock)
193 return -ENOTCONN; 190 return -ENOTCONN;
194 191
195 xprt_pktdump("packet data:", 192 xs_pktdump("packet data:",
196 req->rq_svec->iov_base, 193 req->rq_svec->iov_base,
197 req->rq_svec->iov_len); 194 req->rq_svec->iov_len);
198 195
@@ -201,13 +198,13 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
201 addr = (struct sockaddr *) &xprt->addr; 198 addr = (struct sockaddr *) &xprt->addr;
202 addrlen = sizeof(xprt->addr); 199 addrlen = sizeof(xprt->addr);
203 } 200 }
204 /* Dont repeat bytes */ 201 /* Don't repeat bytes */
205 skip = req->rq_bytes_sent; 202 skip = req->rq_bytes_sent;
206 203
207 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 204 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
208 result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); 205 result = xs_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT);
209 206
210 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); 207 dprintk("RPC: xs_sendmsg(%d) = %d\n", xdr->len - skip, result);
211 208
212 if (result >= 0) 209 if (result >= 0)
213 return result; 210 return result;
@@ -215,8 +212,7 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
215 switch (result) { 212 switch (result) {
216 case -ECONNREFUSED: 213 case -ECONNREFUSED:
217 /* When the server has died, an ICMP port unreachable message 214 /* When the server has died, an ICMP port unreachable message
218 * prompts ECONNREFUSED. 215 * prompts ECONNREFUSED. */
219 */
220 case -EAGAIN: 216 case -EAGAIN:
221 break; 217 break;
222 case -ECONNRESET: 218 case -ECONNRESET:
@@ -227,13 +223,25 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
227 result = -ENOTCONN; 223 result = -ENOTCONN;
228 break; 224 break;
229 default: 225 default:
230 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); 226 break;
231 } 227 }
232 return result; 228 return result;
233} 229}
234 230
235static int 231/**
236xprt_send_request(struct rpc_task *task) 232 * xs_send_request - write an RPC request to a socket
233 * @task: address of RPC task that manages the state of an RPC request
234 *
235 * Return values:
236 * 0: The request has been sent
237 * EAGAIN: The socket was blocked, please call again later to
238 * complete the request
239 * other: Some other error occured, the request was not sent
240 *
241 * XXX: In the case of soft timeouts, should we eventually give up
242 * if the socket is not able to make progress?
243 */
244static int xs_send_request(struct rpc_task *task)
237{ 245{
238 struct rpc_rqst *req = task->tk_rqstp; 246 struct rpc_rqst *req = task->tk_rqstp;
239 struct rpc_xprt *xprt = req->rq_xprt; 247 struct rpc_xprt *xprt = req->rq_xprt;
@@ -242,18 +250,18 @@ xprt_send_request(struct rpc_task *task)
242 /* set up everything as needed. */ 250 /* set up everything as needed. */
243 /* Write the record marker */ 251 /* Write the record marker */
244 if (xprt->stream) { 252 if (xprt->stream) {
245 u32 *marker = req->rq_svec[0].iov_base; 253 u32 *marker = req->rq_svec[0].iov_base;
246 254
247 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 255 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
248 } 256 }
249 257
250 /* Continue transmitting the packet/record. We must be careful 258 /* Continue transmitting the packet/record. We must be careful
251 * to cope with writespace callbacks arriving _after_ we have 259 * to cope with writespace callbacks arriving _after_ we have
252 * called xprt_sendmsg(). 260 * called sendmsg().
253 */ 261 */
254 while (1) { 262 while (1) {
255 req->rq_xtime = jiffies; 263 req->rq_xtime = jiffies;
256 status = xprt_sendmsg(xprt, req); 264 status = xs_sendmsg(xprt, req);
257 265
258 if (status < 0) 266 if (status < 0)
259 break; 267 break;
@@ -285,7 +293,7 @@ xprt_send_request(struct rpc_task *task)
285 293
286 if (status == -EAGAIN) { 294 if (status == -EAGAIN) {
287 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 295 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
288 /* Protect against races with xprt_write_space */ 296 /* Protect against races with xs_write_space */
289 spin_lock_bh(&xprt->sock_lock); 297 spin_lock_bh(&xprt->sock_lock);
290 /* Don't race with disconnect */ 298 /* Don't race with disconnect */
291 if (!xprt_connected(xprt)) 299 if (!xprt_connected(xprt))
@@ -303,65 +311,77 @@ xprt_send_request(struct rpc_task *task)
303 return status; 311 return status;
304} 312}
305 313
306/* 314/**
307 * Close down a transport socket 315 * xs_close - close a socket
316 * @xprt: transport
317 *
308 */ 318 */
309static void 319static void xs_close(struct rpc_xprt *xprt)
310xprt_close(struct rpc_xprt *xprt)
311{ 320{
312 struct socket *sock = xprt->sock; 321 struct socket *sock = xprt->sock;
313 struct sock *sk = xprt->inet; 322 struct sock *sk = xprt->inet;
314 323
315 if (!sk) 324 if (!sk)
316 return; 325 return;
317 326
327 dprintk("RPC: xs_close xprt %p\n", xprt);
328
318 write_lock_bh(&sk->sk_callback_lock); 329 write_lock_bh(&sk->sk_callback_lock);
319 xprt->inet = NULL; 330 xprt->inet = NULL;
320 xprt->sock = NULL; 331 xprt->sock = NULL;
321 332
322 sk->sk_user_data = NULL; 333 sk->sk_user_data = NULL;
323 sk->sk_data_ready = xprt->old_data_ready; 334 sk->sk_data_ready = xprt->old_data_ready;
324 sk->sk_state_change = xprt->old_state_change; 335 sk->sk_state_change = xprt->old_state_change;
325 sk->sk_write_space = xprt->old_write_space; 336 sk->sk_write_space = xprt->old_write_space;
326 write_unlock_bh(&sk->sk_callback_lock); 337 write_unlock_bh(&sk->sk_callback_lock);
327 338
328 sk->sk_no_check = 0; 339 sk->sk_no_check = 0;
329 340
330 sock_release(sock); 341 sock_release(sock);
331} 342}
332 343
333static void xprt_socket_destroy(struct rpc_xprt *xprt) 344/**
345 * xs_destroy - prepare to shutdown a transport
346 * @xprt: doomed transport
347 *
348 */
349static void xs_destroy(struct rpc_xprt *xprt)
334{ 350{
351 dprintk("RPC: xs_destroy xprt %p\n", xprt);
352
335 cancel_delayed_work(&xprt->sock_connect); 353 cancel_delayed_work(&xprt->sock_connect);
336 flush_scheduled_work(); 354 flush_scheduled_work();
337 355
338 xprt_disconnect(xprt); 356 xprt_disconnect(xprt);
339 xprt_close(xprt); 357 xs_close(xprt);
340 kfree(xprt->slot); 358 kfree(xprt->slot);
341} 359}
342 360
343/* 361static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
344 * Input handler for RPC replies. Called from a bottom half and hence 362{
345 * atomic. 363 return (struct rpc_xprt *) sk->sk_user_data;
364}
365
366/**
367 * xs_udp_data_ready - "data ready" callback for UDP sockets
368 * @sk: socket with data to read
369 * @len: how much data to read
370 *
346 */ 371 */
347static void 372static void xs_udp_data_ready(struct sock *sk, int len)
348udp_data_ready(struct sock *sk, int len)
349{ 373{
350 struct rpc_task *task; 374 struct rpc_task *task;
351 struct rpc_xprt *xprt; 375 struct rpc_xprt *xprt;
352 struct rpc_rqst *rovr; 376 struct rpc_rqst *rovr;
353 struct sk_buff *skb; 377 struct sk_buff *skb;
354 int err, repsize, copied; 378 int err, repsize, copied;
355 u32 _xid, *xp; 379 u32 _xid, *xp;
356 380
357 read_lock(&sk->sk_callback_lock); 381 read_lock(&sk->sk_callback_lock);
358 dprintk("RPC: udp_data_ready...\n"); 382 dprintk("RPC: xs_udp_data_ready...\n");
359 if (!(xprt = xprt_from_sock(sk))) { 383 if (!(xprt = xprt_from_sock(sk)))
360 printk("RPC: udp_data_ready request not found!\n");
361 goto out; 384 goto out;
362 }
363
364 dprintk("RPC: udp_data_ready client %p\n", xprt);
365 385
366 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 386 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
367 goto out; 387 goto out;
@@ -371,7 +391,7 @@ udp_data_ready(struct sock *sk, int len)
371 391
372 repsize = skb->len - sizeof(struct udphdr); 392 repsize = skb->len - sizeof(struct udphdr);
373 if (repsize < 4) { 393 if (repsize < 4) {
374 printk("RPC: impossible RPC reply size %d!\n", repsize); 394 dprintk("RPC: impossible RPC reply size %d!\n", repsize);
375 goto dropit; 395 goto dropit;
376 } 396 }
377 397
@@ -410,11 +430,7 @@ udp_data_ready(struct sock *sk, int len)
410 read_unlock(&sk->sk_callback_lock); 430 read_unlock(&sk->sk_callback_lock);
411} 431}
412 432
413/* 433static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
414 * Copy from an skb into memory and shrink the skb.
415 */
416static inline size_t
417tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
418{ 434{
419 if (len > desc->count) 435 if (len > desc->count)
420 len = desc->count; 436 len = desc->count;
@@ -430,18 +446,14 @@ tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
430 return len; 446 return len;
431} 447}
432 448
433/* 449static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
434 * TCP read fragment marker
435 */
436static inline void
437tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
438{ 450{
439 size_t len, used; 451 size_t len, used;
440 char *p; 452 char *p;
441 453
442 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; 454 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
443 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; 455 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
444 used = tcp_copy_data(desc, p, len); 456 used = xs_tcp_copy_data(desc, p, len);
445 xprt->tcp_offset += used; 457 xprt->tcp_offset += used;
446 if (used != len) 458 if (used != len)
447 return; 459 return;
@@ -455,15 +467,15 @@ tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
455 xprt->tcp_offset = 0; 467 xprt->tcp_offset = 0;
456 /* Sanity check of the record length */ 468 /* Sanity check of the record length */
457 if (xprt->tcp_reclen < 4) { 469 if (xprt->tcp_reclen < 4) {
458 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); 470 dprintk("RPC: invalid TCP record fragment length\n");
459 xprt_disconnect(xprt); 471 xprt_disconnect(xprt);
472 return;
460 } 473 }
461 dprintk("RPC: reading TCP record fragment of length %d\n", 474 dprintk("RPC: reading TCP record fragment of length %d\n",
462 xprt->tcp_reclen); 475 xprt->tcp_reclen);
463} 476}
464 477
465static void 478static void xs_tcp_check_recm(struct rpc_xprt *xprt)
466tcp_check_recm(struct rpc_xprt *xprt)
467{ 479{
468 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", 480 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
469 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); 481 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
@@ -478,11 +490,7 @@ tcp_check_recm(struct rpc_xprt *xprt)
478 } 490 }
479} 491}
480 492
481/* 493static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
482 * TCP read xid
483 */
484static inline void
485tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
486{ 494{
487 size_t len, used; 495 size_t len, used;
488 char *p; 496 char *p;
@@ -490,7 +498,7 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
490 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; 498 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
491 dprintk("RPC: reading XID (%Zu bytes)\n", len); 499 dprintk("RPC: reading XID (%Zu bytes)\n", len);
492 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; 500 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
493 used = tcp_copy_data(desc, p, len); 501 used = xs_tcp_copy_data(desc, p, len);
494 xprt->tcp_offset += used; 502 xprt->tcp_offset += used;
495 if (used != len) 503 if (used != len)
496 return; 504 return;
@@ -499,14 +507,10 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
499 xprt->tcp_copied = 4; 507 xprt->tcp_copied = 4;
500 dprintk("RPC: reading reply for XID %08x\n", 508 dprintk("RPC: reading reply for XID %08x\n",
501 ntohl(xprt->tcp_xid)); 509 ntohl(xprt->tcp_xid));
502 tcp_check_recm(xprt); 510 xs_tcp_check_recm(xprt);
503} 511}
504 512
505/* 513static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
506 * TCP read and complete request
507 */
508static inline void
509tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
510{ 514{
511 struct rpc_rqst *req; 515 struct rpc_rqst *req;
512 struct xdr_buf *rcvbuf; 516 struct xdr_buf *rcvbuf;
@@ -533,12 +537,12 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
533 memcpy(&my_desc, desc, sizeof(my_desc)); 537 memcpy(&my_desc, desc, sizeof(my_desc));
534 my_desc.count = len; 538 my_desc.count = len;
535 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 539 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
536 &my_desc, tcp_copy_data); 540 &my_desc, xs_tcp_copy_data);
537 desc->count -= r; 541 desc->count -= r;
538 desc->offset += r; 542 desc->offset += r;
539 } else 543 } else
540 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 544 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
541 desc, tcp_copy_data); 545 desc, xs_tcp_copy_data);
542 546
543 if (r > 0) { 547 if (r > 0) {
544 xprt->tcp_copied += r; 548 xprt->tcp_copied += r;
@@ -581,14 +585,10 @@ out:
581 xprt_complete_rqst(xprt, req, xprt->tcp_copied); 585 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
582 } 586 }
583 spin_unlock(&xprt->sock_lock); 587 spin_unlock(&xprt->sock_lock);
584 tcp_check_recm(xprt); 588 xs_tcp_check_recm(xprt);
585} 589}
586 590
587/* 591static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
588 * TCP discard extra bytes from a short read
589 */
590static inline void
591tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
592{ 592{
593 size_t len; 593 size_t len;
594 594
@@ -599,16 +599,10 @@ tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
599 desc->offset += len; 599 desc->offset += len;
600 xprt->tcp_offset += len; 600 xprt->tcp_offset += len;
601 dprintk("RPC: discarded %Zu bytes\n", len); 601 dprintk("RPC: discarded %Zu bytes\n", len);
602 tcp_check_recm(xprt); 602 xs_tcp_check_recm(xprt);
603} 603}
604 604
605/* 605static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
606 * TCP record receive routine
607 * We first have to grab the record marker, then the XID, then the data.
608 */
609static int
610tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
611 unsigned int offset, size_t len)
612{ 606{
613 struct rpc_xprt *xprt = rd_desc->arg.data; 607 struct rpc_xprt *xprt = rd_desc->arg.data;
614 skb_reader_t desc = { 608 skb_reader_t desc = {
@@ -616,64 +610,72 @@ tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
616 .offset = offset, 610 .offset = offset,
617 .count = len, 611 .count = len,
618 .csum = 0 612 .csum = 0
619 }; 613 };
620 614
621 dprintk("RPC: tcp_data_recv\n"); 615 dprintk("RPC: xs_tcp_data_recv started\n");
622 do { 616 do {
623 /* Read in a new fragment marker if necessary */ 617 /* Read in a new fragment marker if necessary */
624 /* Can we ever really expect to get completely empty fragments? */ 618 /* Can we ever really expect to get completely empty fragments? */
625 if (xprt->tcp_flags & XPRT_COPY_RECM) { 619 if (xprt->tcp_flags & XPRT_COPY_RECM) {
626 tcp_read_fraghdr(xprt, &desc); 620 xs_tcp_read_fraghdr(xprt, &desc);
627 continue; 621 continue;
628 } 622 }
629 /* Read in the xid if necessary */ 623 /* Read in the xid if necessary */
630 if (xprt->tcp_flags & XPRT_COPY_XID) { 624 if (xprt->tcp_flags & XPRT_COPY_XID) {
631 tcp_read_xid(xprt, &desc); 625 xs_tcp_read_xid(xprt, &desc);
632 continue; 626 continue;
633 } 627 }
634 /* Read in the request data */ 628 /* Read in the request data */
635 if (xprt->tcp_flags & XPRT_COPY_DATA) { 629 if (xprt->tcp_flags & XPRT_COPY_DATA) {
636 tcp_read_request(xprt, &desc); 630 xs_tcp_read_request(xprt, &desc);
637 continue; 631 continue;
638 } 632 }
639 /* Skip over any trailing bytes on short reads */ 633 /* Skip over any trailing bytes on short reads */
640 tcp_read_discard(xprt, &desc); 634 xs_tcp_read_discard(xprt, &desc);
641 } while (desc.count); 635 } while (desc.count);
642 dprintk("RPC: tcp_data_recv done\n"); 636 dprintk("RPC: xs_tcp_data_recv done\n");
643 return len - desc.count; 637 return len - desc.count;
644} 638}
645 639
646static void tcp_data_ready(struct sock *sk, int bytes) 640/**
641 * xs_tcp_data_ready - "data ready" callback for TCP sockets
642 * @sk: socket with data to read
643 * @bytes: how much data to read
644 *
645 */
646static void xs_tcp_data_ready(struct sock *sk, int bytes)
647{ 647{
648 struct rpc_xprt *xprt; 648 struct rpc_xprt *xprt;
649 read_descriptor_t rd_desc; 649 read_descriptor_t rd_desc;
650 650
651 read_lock(&sk->sk_callback_lock); 651 read_lock(&sk->sk_callback_lock);
652 dprintk("RPC: tcp_data_ready...\n"); 652 dprintk("RPC: xs_tcp_data_ready...\n");
653 if (!(xprt = xprt_from_sock(sk))) { 653 if (!(xprt = xprt_from_sock(sk)))
654 printk("RPC: tcp_data_ready socket info not found!\n");
655 goto out; 654 goto out;
656 }
657 if (xprt->shutdown) 655 if (xprt->shutdown)
658 goto out; 656 goto out;
659 657
660 /* We use rd_desc to pass struct xprt to tcp_data_recv */ 658 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
661 rd_desc.arg.data = xprt; 659 rd_desc.arg.data = xprt;
662 rd_desc.count = 65536; 660 rd_desc.count = 65536;
663 tcp_read_sock(sk, &rd_desc, tcp_data_recv); 661 tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
664out: 662out:
665 read_unlock(&sk->sk_callback_lock); 663 read_unlock(&sk->sk_callback_lock);
666} 664}
667 665
668static void 666/**
669tcp_state_change(struct sock *sk) 667 * xs_tcp_state_change - callback to handle TCP socket state changes
668 * @sk: socket whose state has changed
669 *
670 */
671static void xs_tcp_state_change(struct sock *sk)
670{ 672{
671 struct rpc_xprt *xprt; 673 struct rpc_xprt *xprt;
672 674
673 read_lock(&sk->sk_callback_lock); 675 read_lock(&sk->sk_callback_lock);
674 if (!(xprt = xprt_from_sock(sk))) 676 if (!(xprt = xprt_from_sock(sk)))
675 goto out; 677 goto out;
676 dprintk("RPC: tcp_state_change client %p...\n", xprt); 678 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt);
677 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 679 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
678 sk->sk_state, xprt_connected(xprt), 680 sk->sk_state, xprt_connected(xprt),
679 sock_flag(sk, SOCK_DEAD), 681 sock_flag(sk, SOCK_DEAD),
@@ -703,17 +705,20 @@ tcp_state_change(struct sock *sk)
703 read_unlock(&sk->sk_callback_lock); 705 read_unlock(&sk->sk_callback_lock);
704} 706}
705 707
706/* 708/**
709 * xs_write_space - callback invoked when socket buffer space becomes
710 * available
711 * @sk: socket whose state has changed
712 *
707 * Called when more output buffer space is available for this socket. 713 * Called when more output buffer space is available for this socket.
708 * We try not to wake our writers until they can make "significant" 714 * We try not to wake our writers until they can make "significant"
709 * progress, otherwise we'll waste resources thrashing sock_sendmsg 715 * progress, otherwise we'll waste resources thrashing sock_sendmsg
710 * with a bunch of small requests. 716 * with a bunch of small requests.
711 */ 717 */
712static void 718static void xs_write_space(struct sock *sk)
713xprt_write_space(struct sock *sk)
714{ 719{
715 struct rpc_xprt *xprt; 720 struct rpc_xprt *xprt;
716 struct socket *sock; 721 struct socket *sock;
717 722
718 read_lock(&sk->sk_callback_lock); 723 read_lock(&sk->sk_callback_lock);
719 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) 724 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
@@ -743,11 +748,15 @@ out:
743 read_unlock(&sk->sk_callback_lock); 748 read_unlock(&sk->sk_callback_lock);
744} 749}
745 750
746/* 751/**
747 * Set socket buffer length 752 * xs_set_buffer_size - set send and receive limits
753 * @xprt: generic transport
754 *
755 * Set socket send and receive limits based on the
756 * sndsize and rcvsize fields in the generic transport
757 * structure. This applies only to UDP sockets.
748 */ 758 */
749static void 759static void xs_set_buffer_size(struct rpc_xprt *xprt)
750xprt_sock_setbufsize(struct rpc_xprt *xprt)
751{ 760{
752 struct sock *sk = xprt->inet; 761 struct sock *sk = xprt->inet;
753 762
@@ -764,15 +773,12 @@ xprt_sock_setbufsize(struct rpc_xprt *xprt)
764 } 773 }
765} 774}
766 775
767/* 776static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
768 * Bind to a reserved port
769 */
770static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
771{ 777{
772 struct sockaddr_in myaddr = { 778 struct sockaddr_in myaddr = {
773 .sin_family = AF_INET, 779 .sin_family = AF_INET,
774 }; 780 };
775 int err, port; 781 int err, port;
776 782
777 /* Were we already bound to a given port? Try to reuse it */ 783 /* Were we already bound to a given port? Try to reuse it */
778 port = xprt->port; 784 port = xprt->port;
@@ -782,20 +788,47 @@ static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
782 sizeof(myaddr)); 788 sizeof(myaddr));
783 if (err == 0) { 789 if (err == 0) {
784 xprt->port = port; 790 xprt->port = port;
791 dprintk("RPC: xs_bindresvport bound to port %u\n",
792 port);
785 return 0; 793 return 0;
786 } 794 }
787 if (--port == 0) 795 if (--port == 0)
788 port = XPRT_MAX_RESVPORT; 796 port = XS_MAX_RESVPORT;
789 } while (err == -EADDRINUSE && port != xprt->port); 797 } while (err == -EADDRINUSE && port != xprt->port);
790 798
791 printk("RPC: Can't bind to reserved port (%d).\n", -err); 799 dprintk("RPC: can't bind to reserved port (%d).\n", -err);
792 return err; 800 return err;
793} 801}
794 802
795static void 803static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
796xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
797{ 804{
798 struct sock *sk = sock->sk; 805 struct socket *sock;
806 int type, err;
807
808 dprintk("RPC: xs_create(%s %d)\n",
809 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
810
811 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
812
813 if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
814 dprintk("RPC: can't create socket (%d).\n", -err);
815 return NULL;
816 }
817
818 /* If the caller has the capability, bind to a reserved port */
819 if (resvport && xs_bindresvport(xprt, sock) < 0)
820 goto failed;
821
822 return sock;
823
824failed:
825 sock_release(sock);
826 return NULL;
827}
828
829static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
830{
831 struct sock *sk = sock->sk;
799 832
800 if (xprt->inet) 833 if (xprt->inet)
801 return; 834 return;
@@ -806,16 +839,16 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
806 xprt->old_state_change = sk->sk_state_change; 839 xprt->old_state_change = sk->sk_state_change;
807 xprt->old_write_space = sk->sk_write_space; 840 xprt->old_write_space = sk->sk_write_space;
808 if (xprt->prot == IPPROTO_UDP) { 841 if (xprt->prot == IPPROTO_UDP) {
809 sk->sk_data_ready = udp_data_ready; 842 sk->sk_data_ready = xs_udp_data_ready;
810 sk->sk_no_check = UDP_CSUM_NORCV; 843 sk->sk_no_check = UDP_CSUM_NORCV;
811 xprt_set_connected(xprt); 844 xprt_set_connected(xprt);
812 } else { 845 } else {
813 tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ 846 tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */
814 sk->sk_data_ready = tcp_data_ready; 847 sk->sk_data_ready = xs_tcp_data_ready;
815 sk->sk_state_change = tcp_state_change; 848 sk->sk_state_change = xs_tcp_state_change;
816 xprt_clear_connected(xprt); 849 xprt_clear_connected(xprt);
817 } 850 }
818 sk->sk_write_space = xprt_write_space; 851 sk->sk_write_space = xs_write_space;
819 852
820 /* Reset to new socket */ 853 /* Reset to new socket */
821 xprt->sock = sock; 854 xprt->sock = sock;
@@ -825,39 +858,13 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
825 return; 858 return;
826} 859}
827 860
828/* 861/**
829 * Datastream sockets are created here, but xprt_connect will create 862 * xs_connect_worker - try to connect a socket to a remote endpoint
830 * and connect stream sockets. 863 * @args: RPC transport to connect
864 *
865 * Invoked by a work queue tasklet.
831 */ 866 */
832static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) 867static void xs_connect_worker(void *args)
833{
834 struct socket *sock;
835 int type, err;
836
837 dprintk("RPC: xprt_create_socket(%s %d)\n",
838 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
839
840 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
841
842 if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
843 printk("RPC: can't create socket (%d).\n", -err);
844 return NULL;
845 }
846
847 /* If the caller has the capability, bind to a reserved port */
848 if (resvport && xprt_bindresvport(xprt, sock) < 0) {
849 printk("RPC: can't bind to reserved port.\n");
850 goto failed;
851 }
852
853 return sock;
854
855failed:
856 sock_release(sock);
857 return NULL;
858}
859
860static void xprt_socket_connect(void *args)
861{ 868{
862 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 869 struct rpc_xprt *xprt = (struct rpc_xprt *)args;
863 struct socket *sock = xprt->sock; 870 struct socket *sock = xprt->sock;
@@ -866,18 +873,20 @@ static void xprt_socket_connect(void *args)
866 if (xprt->shutdown || xprt->addr.sin_port == 0) 873 if (xprt->shutdown || xprt->addr.sin_port == 0)
867 goto out; 874 goto out;
868 875
876 dprintk("RPC: xs_connect_worker xprt %p\n", xprt);
877
869 /* 878 /*
870 * Start by resetting any existing state 879 * Start by resetting any existing state
871 */ 880 */
872 xprt_close(xprt); 881 xs_close(xprt);
873 sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); 882 sock = xs_create(xprt, xprt->prot, xprt->resvport);
874 if (sock == NULL) { 883 if (sock == NULL) {
875 /* couldn't create socket or bind to reserved port; 884 /* couldn't create socket or bind to reserved port;
876 * this is likely a permanent error, so cause an abort */ 885 * this is likely a permanent error, so cause an abort */
877 goto out; 886 goto out;
878 } 887 }
879 xprt_bind_socket(xprt, sock); 888 xs_bind(xprt, sock);
880 xprt_sock_setbufsize(xprt); 889 xs_set_buffer_size(xprt);
881 890
882 status = 0; 891 status = 0;
883 if (!xprt->stream) 892 if (!xprt->stream)
@@ -908,20 +917,23 @@ out_clear:
908 smp_mb__after_clear_bit(); 917 smp_mb__after_clear_bit();
909} 918}
910 919
911static void 920/**
912xprt_connect_sock(struct rpc_task *task) 921 * xs_connect - connect a socket to a remote endpoint
922 * @task: address of RPC task that manages state of connect request
923 *
924 * TCP: If the remote end dropped the connection, delay reconnecting.
925 */
926static void xs_connect(struct rpc_task *task)
913{ 927{
914 struct rpc_xprt *xprt = task->tk_xprt; 928 struct rpc_xprt *xprt = task->tk_xprt;
915 929
916 if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { 930 if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
917 /* Note: if we are here due to a dropped connection 931 if (xprt->sock != NULL) {
918 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ 932 dprintk("RPC: xs_connect delayed xprt %p\n", xprt);
919 * seconds
920 */
921 if (xprt->sock != NULL)
922 schedule_delayed_work(&xprt->sock_connect, 933 schedule_delayed_work(&xprt->sock_connect,
923 RPC_REESTABLISH_TIMEOUT); 934 RPC_REESTABLISH_TIMEOUT);
924 else { 935 } else {
936 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
925 schedule_work(&xprt->sock_connect); 937 schedule_work(&xprt->sock_connect);
926 /* flush_scheduled_work can sleep... */ 938 /* flush_scheduled_work can sleep... */
927 if (!RPC_IS_ASYNC(task)) 939 if (!RPC_IS_ASYNC(task))
@@ -930,29 +942,23 @@ xprt_connect_sock(struct rpc_task *task)
930 } 942 }
931} 943}
932 944
933/* 945static struct rpc_xprt_ops xs_ops = {
934 * Set default timeout parameters 946 .set_buffer_size = xs_set_buffer_size,
935 */ 947 .connect = xs_connect,
936static void 948 .send_request = xs_send_request,
937xprt_default_timeout(struct rpc_timeout *to, int proto) 949 .close = xs_close,
938{ 950 .destroy = xs_destroy,
939 if (proto == IPPROTO_UDP)
940 xprt_set_timeout(to, 5, 5 * HZ);
941 else
942 xprt_set_timeout(to, 2, 60 * HZ);
943}
944
945static struct rpc_xprt_ops xprt_socket_ops = {
946 .set_buffer_size = xprt_sock_setbufsize,
947 .connect = xprt_connect_sock,
948 .send_request = xprt_send_request,
949 .close = xprt_close,
950 .destroy = xprt_socket_destroy,
951}; 951};
952 952
953extern unsigned int xprt_udp_slot_table_entries; 953extern unsigned int xprt_udp_slot_table_entries;
954extern unsigned int xprt_tcp_slot_table_entries; 954extern unsigned int xprt_tcp_slot_table_entries;
955 955
956/**
957 * xs_setup_udp - Set up transport to use a UDP socket
958 * @xprt: transport to set up
959 * @to: timeout parameters
960 *
961 */
956int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) 962int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
957{ 963{
958 size_t slot_table_size; 964 size_t slot_table_size;
@@ -967,7 +973,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
967 memset(xprt->slot, 0, slot_table_size); 973 memset(xprt->slot, 0, slot_table_size);
968 974
969 xprt->prot = IPPROTO_UDP; 975 xprt->prot = IPPROTO_UDP;
970 xprt->port = XPRT_MAX_RESVPORT; 976 xprt->port = XS_MAX_RESVPORT;
971 xprt->stream = 0; 977 xprt->stream = 0;
972 xprt->nocong = 0; 978 xprt->nocong = 0;
973 xprt->cwnd = RPC_INITCWND; 979 xprt->cwnd = RPC_INITCWND;
@@ -975,18 +981,24 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
975 /* XXX: header size can vary due to auth type, IPv6, etc. */ 981 /* XXX: header size can vary due to auth type, IPv6, etc. */
976 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 982 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
977 983
978 INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 984 INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
979 985
980 xprt->ops = &xprt_socket_ops; 986 xprt->ops = &xs_ops;
981 987
982 if (to) 988 if (to)
983 xprt->timeout = *to; 989 xprt->timeout = *to;
984 else 990 else
985 xprt_default_timeout(to, xprt->prot); 991 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
986 992
987 return 0; 993 return 0;
988} 994}
989 995
996/**
997 * xs_setup_tcp - Set up transport to use a TCP socket
998 * @xprt: transport to set up
999 * @to: timeout parameters
1000 *
1001 */
990int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) 1002int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
991{ 1003{
992 size_t slot_table_size; 1004 size_t slot_table_size;
@@ -1001,21 +1013,21 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1001 memset(xprt->slot, 0, slot_table_size); 1013 memset(xprt->slot, 0, slot_table_size);
1002 1014
1003 xprt->prot = IPPROTO_TCP; 1015 xprt->prot = IPPROTO_TCP;
1004 xprt->port = XPRT_MAX_RESVPORT; 1016 xprt->port = XS_MAX_RESVPORT;
1005 xprt->stream = 1; 1017 xprt->stream = 1;
1006 xprt->nocong = 1; 1018 xprt->nocong = 1;
1007 xprt->cwnd = RPC_MAXCWND(xprt); 1019 xprt->cwnd = RPC_MAXCWND(xprt);
1008 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 1020 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1009 xprt->max_payload = (1U << 31) - 1; 1021 xprt->max_payload = (1U << 31) - 1;
1010 1022
1011 INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 1023 INIT_WORK(&xprt->sock_connect, xs_connect_worker, xprt);
1012 1024
1013 xprt->ops = &xprt_socket_ops; 1025 xprt->ops = &xs_ops;
1014 1026
1015 if (to) 1027 if (to)
1016 xprt->timeout = *to; 1028 xprt->timeout = *to;
1017 else 1029 else
1018 xprt_default_timeout(to, xprt->prot); 1030 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1019 1031
1020 return 0; 1032 return 0;
1021} 1033}