aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorChuck Lever <cel@citi.umich.edu>2005-08-11 16:25:56 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2005-09-23 12:38:31 -0400
commit262965f53defd312a294b45366ea17907b6a616b (patch)
treea5e0f194c02f230ff12d9d5b0933bad9eb8810ea /net
parentb0d93ad511ce2f37823a07c7a3258117a431f5fb (diff)
[PATCH] RPC: separate TCP and UDP socket write paths
Split the RPC client's main socket write path into a TCP version and a UDP version to eliminate another dependency on the "xprt->stream" variable. Compiler optimization removes unneeded code from xs_sendpages, as this function is now called with some constant arguments. We can now cleanly perform transport protocol-specific return code testing and error recovery in each path. Test-plan: Millions of fsx operations. Performance characterization such as "sio" or "iozone". Examine oprofile results for any changes before and after this patch is applied. Version: Thu, 11 Aug 2005 16:08:46 -0400 Signed-off-by: Chuck Lever <cel@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/xprtsock.c215
1 files changed, 128 insertions, 87 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index f91529787b9b..57988300640a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -40,6 +40,12 @@
40 */ 40 */
41#define XS_MAX_RESVPORT (800U) 41#define XS_MAX_RESVPORT (800U)
42 42
43/*
44 * How many times to try sending a request on a socket before waiting
45 * for the socket buffer to clear.
46 */
47#define XS_SENDMSG_RETRY (10U)
48
43#ifdef RPC_DEBUG 49#ifdef RPC_DEBUG
44# undef RPC_DEBUG_DATA 50# undef RPC_DEBUG_DATA
45# define RPCDBG_FACILITY RPCDBG_TRANS 51# define RPCDBG_FACILITY RPCDBG_TRANS
@@ -114,13 +120,18 @@ static int xs_send_tail(struct socket *sock, struct xdr_buf *xdr, unsigned int b
114 * @base: starting position in the buffer 120 * @base: starting position in the buffer
115 * 121 *
116 */ 122 */
117static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 123static inline int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
118{ 124{
119 struct page **ppage = xdr->pages; 125 struct page **ppage = xdr->pages;
120 unsigned int len, pglen = xdr->page_len; 126 unsigned int len, pglen = xdr->page_len;
121 int err, ret = 0; 127 int err, ret = 0;
122 ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); 128 ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
123 129
130 if (unlikely(!sock))
131 return -ENOTCONN;
132
133 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
134
124 len = xdr->head[0].iov_len; 135 len = xdr->head[0].iov_len;
125 if (base < len || (addr != NULL && base == 0)) { 136 if (base < len || (addr != NULL && base == 0)) {
126 err = xs_send_head(sock, addr, addrlen, xdr, base, len); 137 err = xs_send_head(sock, addr, addrlen, xdr, base, len);
@@ -187,140 +198,162 @@ out:
187} 198}
188 199
189/** 200/**
190 * xs_sendmsg - write an RPC request to a socket 201 * xs_nospace - place task on wait queue if transmit was incomplete
191 * @xprt: generic transport 202 * @task: task to put to sleep
192 * @req: the RPC request to write
193 * 203 *
194 */ 204 */
195static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) 205static void xs_nospace(struct rpc_task *task)
196{ 206{
197 struct socket *sock = xprt->sock; 207 struct rpc_rqst *req = task->tk_rqstp;
198 struct xdr_buf *xdr = &req->rq_snd_buf; 208 struct rpc_xprt *xprt = req->rq_xprt;
199 struct sockaddr *addr = NULL;
200 int addrlen = 0;
201 unsigned int skip;
202 int result;
203 209
204 if (!sock) 210 dprintk("RPC: %4d xmit incomplete (%u left of %u)\n",
205 return -ENOTCONN; 211 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
212 req->rq_slen);
213
214 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
215 /* Protect against races with write_space */
216 spin_lock_bh(&xprt->transport_lock);
217
218 /* Don't race with disconnect */
219 if (!xprt_connected(xprt))
220 task->tk_status = -ENOTCONN;
221 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags))
222 xprt_wait_for_buffer_space(task);
223
224 spin_unlock_bh(&xprt->transport_lock);
225 } else
226 /* Keep holding the socket if it is blocked */
227 rpc_delay(task, HZ>>4);
228}
229
230/**
231 * xs_udp_send_request - write an RPC request to a UDP socket
232 * @task: address of RPC task that manages the state of an RPC request
233 *
234 * Return values:
235 * 0: The request has been sent
236 * EAGAIN: The socket was blocked, please call again later to
237 * complete the request
238 * ENOTCONN: Caller needs to invoke connect logic then call again
239 * other: Some other error occured, the request was not sent
240 */
241static int xs_udp_send_request(struct rpc_task *task)
242{
243 struct rpc_rqst *req = task->tk_rqstp;
244 struct rpc_xprt *xprt = req->rq_xprt;
245 struct xdr_buf *xdr = &req->rq_snd_buf;
246 int status;
206 247
207 xs_pktdump("packet data:", 248 xs_pktdump("packet data:",
208 req->rq_svec->iov_base, 249 req->rq_svec->iov_base,
209 req->rq_svec->iov_len); 250 req->rq_svec->iov_len);
210 251
211 /* For UDP, we need to provide an address */ 252 req->rq_xtime = jiffies;
212 if (!xprt->stream) { 253 status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr,
213 addr = (struct sockaddr *) &xprt->addr; 254 sizeof(xprt->addr), xdr, req->rq_bytes_sent);
214 addrlen = sizeof(xprt->addr);
215 }
216 /* Don't repeat bytes */
217 skip = req->rq_bytes_sent;
218 255
219 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 256 dprintk("RPC: xs_udp_send_request(%u) = %d\n",
220 result = xs_sendpages(sock, addr, addrlen, xdr, skip); 257 xdr->len - req->rq_bytes_sent, status);
221 258
222 dprintk("RPC: xs_sendmsg(%d) = %d\n", xdr->len - skip, result); 259 if (likely(status >= (int) req->rq_slen))
260 return 0;
223 261
224 if (result >= 0) 262 /* Still some bytes left; set up for a retry later. */
225 return result; 263 if (status > 0)
264 status = -EAGAIN;
226 265
227 switch (result) { 266 switch (status) {
267 case -ENETUNREACH:
268 case -EPIPE:
228 case -ECONNREFUSED: 269 case -ECONNREFUSED:
229 /* When the server has died, an ICMP port unreachable message 270 /* When the server has died, an ICMP port unreachable message
230 * prompts ECONNREFUSED. */ 271 * prompts ECONNREFUSED. */
231 case -EAGAIN:
232 break; 272 break;
233 case -ECONNRESET: 273 case -EAGAIN:
234 case -ENOTCONN: 274 xs_nospace(task);
235 case -EPIPE:
236 /* connection broken */
237 if (xprt->stream)
238 result = -ENOTCONN;
239 break; 275 break;
240 default: 276 default:
277 dprintk("RPC: sendmsg returned unrecognized error %d\n",
278 -status);
241 break; 279 break;
242 } 280 }
243 return result; 281
282 return status;
244} 283}
245 284
246/** 285/**
247 * xs_send_request - write an RPC request to a socket 286 * xs_tcp_send_request - write an RPC request to a TCP socket
248 * @task: address of RPC task that manages the state of an RPC request 287 * @task: address of RPC task that manages the state of an RPC request
249 * 288 *
250 * Return values: 289 * Return values:
251 * 0: The request has been sent 290 * 0: The request has been sent
252 * EAGAIN: The socket was blocked, please call again later to 291 * EAGAIN: The socket was blocked, please call again later to
253 * complete the request 292 * complete the request
254 * other: Some other error occured, the request was not sent 293 * ENOTCONN: Caller needs to invoke connect logic then call again
294 * other: Some other error occured, the request was not sent
255 * 295 *
256 * XXX: In the case of soft timeouts, should we eventually give up 296 * XXX: In the case of soft timeouts, should we eventually give up
257 * if the socket is not able to make progress? 297 * if sendmsg is not able to make progress?
258 */ 298 */
259static int xs_send_request(struct rpc_task *task) 299static int xs_tcp_send_request(struct rpc_task *task)
260{ 300{
261 struct rpc_rqst *req = task->tk_rqstp; 301 struct rpc_rqst *req = task->tk_rqstp;
262 struct rpc_xprt *xprt = req->rq_xprt; 302 struct rpc_xprt *xprt = req->rq_xprt;
303 struct xdr_buf *xdr = &req->rq_snd_buf;
304 u32 *marker = req->rq_svec[0].iov_base;
263 int status, retry = 0; 305 int status, retry = 0;
264 306
265 /* set up everything as needed. */
266 /* Write the record marker */ 307 /* Write the record marker */
267 if (xprt->stream) { 308 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
268 u32 *marker = req->rq_svec[0].iov_base;
269 309
270 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 310 xs_pktdump("packet data:",
271 } 311 req->rq_svec->iov_base,
312 req->rq_svec->iov_len);
272 313
273 /* Continue transmitting the packet/record. We must be careful 314 /* Continue transmitting the packet/record. We must be careful
274 * to cope with writespace callbacks arriving _after_ we have 315 * to cope with writespace callbacks arriving _after_ we have
275 * called sendmsg(). 316 * called sendmsg(). */
276 */
277 while (1) { 317 while (1) {
278 req->rq_xtime = jiffies; 318 req->rq_xtime = jiffies;
279 status = xs_sendmsg(xprt, req); 319 status = xs_sendpages(xprt->sock, NULL, 0, xdr,
320 req->rq_bytes_sent);
280 321
281 if (status < 0) 322 dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
282 break; 323 xdr->len - req->rq_bytes_sent, status);
283 324
284 if (xprt->stream) { 325 if (unlikely(status < 0))
285 req->rq_bytes_sent += status;
286
287 /* If we've sent the entire packet, immediately
288 * reset the count of bytes sent. */
289 if (req->rq_bytes_sent >= req->rq_slen) {
290 req->rq_bytes_sent = 0;
291 return 0;
292 }
293 } else {
294 if (status >= req->rq_slen)
295 return 0;
296 status = -EAGAIN;
297 break; 326 break;
298 }
299 327
300 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", 328 /* If we've sent the entire packet, immediately
301 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 329 * reset the count of bytes sent. */
302 req->rq_slen); 330 req->rq_bytes_sent += status;
331 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
332 req->rq_bytes_sent = 0;
333 return 0;
334 }
303 335
304 status = -EAGAIN; 336 status = -EAGAIN;
305 if (retry++ > 50) 337 if (retry++ > XS_SENDMSG_RETRY)
306 break; 338 break;
307 } 339 }
308 340
309 if (status == -EAGAIN) { 341 switch (status) {
310 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 342 case -EAGAIN:
311 /* Protect against races with write_space */ 343 xs_nospace(task);
312 spin_lock_bh(&xprt->transport_lock); 344 break;
313 /* Don't race with disconnect */ 345 case -ECONNREFUSED:
314 if (!xprt_connected(xprt)) 346 case -ECONNRESET:
315 task->tk_status = -ENOTCONN; 347 case -ENOTCONN:
316 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) 348 case -EPIPE:
317 xprt_wait_for_buffer_space(task); 349 status = -ENOTCONN;
318 spin_unlock_bh(&xprt->transport_lock); 350 break;
319 return status; 351 default:
320 } 352 dprintk("RPC: sendmsg returned unrecognized error %d\n",
321 /* Keep holding the socket if it is blocked */ 353 -status);
322 rpc_delay(task, HZ>>4); 354 break;
323 } 355 }
356
324 return status; 357 return status;
325} 358}
326 359
@@ -992,10 +1025,18 @@ static void xs_connect(struct rpc_task *task)
992 } 1025 }
993} 1026}
994 1027
995static struct rpc_xprt_ops xs_ops = { 1028static struct rpc_xprt_ops xs_udp_ops = {
1029 .set_buffer_size = xs_set_buffer_size,
1030 .connect = xs_connect,
1031 .send_request = xs_udp_send_request,
1032 .close = xs_close,
1033 .destroy = xs_destroy,
1034};
1035
1036static struct rpc_xprt_ops xs_tcp_ops = {
996 .set_buffer_size = xs_set_buffer_size, 1037 .set_buffer_size = xs_set_buffer_size,
997 .connect = xs_connect, 1038 .connect = xs_connect,
998 .send_request = xs_send_request, 1039 .send_request = xs_tcp_send_request,
999 .close = xs_close, 1040 .close = xs_close,
1000 .destroy = xs_destroy, 1041 .destroy = xs_destroy,
1001}; 1042};
@@ -1033,7 +1074,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1033 1074
1034 INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt); 1075 INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
1035 1076
1036 xprt->ops = &xs_ops; 1077 xprt->ops = &xs_udp_ops;
1037 1078
1038 if (to) 1079 if (to)
1039 xprt->timeout = *to; 1080 xprt->timeout = *to;
@@ -1072,7 +1113,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1072 1113
1073 INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt); 1114 INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
1074 1115
1075 xprt->ops = &xs_ops; 1116 xprt->ops = &xs_tcp_ops;
1076 1117
1077 if (to) 1118 if (to)
1078 xprt->timeout = *to; 1119 xprt->timeout = *to;