diff options
Diffstat (limited to 'net/sunrpc/xprt.c')
-rw-r--r-- | net/sunrpc/xprt.c | 1613 |
1 files changed, 447 insertions, 1166 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 3c654e06b084..6dda3860351f 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c | |||
@@ -10,12 +10,12 @@ | |||
10 | * one is available. Otherwise, it sleeps on the backlog queue | 10 | * one is available. Otherwise, it sleeps on the backlog queue |
11 | * (xprt_reserve). | 11 | * (xprt_reserve). |
12 | * - Next, the caller puts together the RPC message, stuffs it into | 12 | * - Next, the caller puts together the RPC message, stuffs it into |
13 | * the request struct, and calls xprt_call(). | 13 | * the request struct, and calls xprt_transmit(). |
14 | * - xprt_call transmits the message and installs the caller on the | 14 | * - xprt_transmit sends the message and installs the caller on the |
15 | * socket's wait list. At the same time, it installs a timer that | 15 | * transport's wait list. At the same time, it installs a timer that |
16 | * is run after the packet's timeout has expired. | 16 | * is run after the packet's timeout has expired. |
17 | * - When a packet arrives, the data_ready handler walks the list of | 17 | * - When a packet arrives, the data_ready handler walks the list of |
18 | * pending requests for that socket. If a matching XID is found, the | 18 | * pending requests for that transport. If a matching XID is found, the |
19 | * caller is woken up, and the timer removed. | 19 | * caller is woken up, and the timer removed. |
20 | * - When no reply arrives within the timeout interval, the timer is | 20 | * - When no reply arrives within the timeout interval, the timer is |
21 | * fired by the kernel and runs xprt_timer(). It either adjusts the | 21 | * fired by the kernel and runs xprt_timer(). It either adjusts the |
@@ -33,36 +33,17 @@ | |||
33 | * | 33 | * |
34 | * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> | 34 | * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> |
35 | * | 35 | * |
36 | * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> | 36 | * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> |
37 | * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> | ||
38 | * TCP NFS related read + write fixes | ||
39 | * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> | ||
40 | * | ||
41 | * Rewrite of larges part of the code in order to stabilize TCP stuff. | ||
42 | * Fix behaviour when socket buffer is full. | ||
43 | * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> | ||
44 | */ | 37 | */ |
45 | 38 | ||
39 | #include <linux/module.h> | ||
40 | |||
46 | #include <linux/types.h> | 41 | #include <linux/types.h> |
47 | #include <linux/slab.h> | 42 | #include <linux/interrupt.h> |
48 | #include <linux/capability.h> | ||
49 | #include <linux/sched.h> | ||
50 | #include <linux/errno.h> | ||
51 | #include <linux/socket.h> | ||
52 | #include <linux/in.h> | ||
53 | #include <linux/net.h> | ||
54 | #include <linux/mm.h> | ||
55 | #include <linux/udp.h> | ||
56 | #include <linux/tcp.h> | ||
57 | #include <linux/sunrpc/clnt.h> | ||
58 | #include <linux/file.h> | ||
59 | #include <linux/workqueue.h> | 43 | #include <linux/workqueue.h> |
60 | #include <linux/random.h> | 44 | #include <linux/random.h> |
61 | 45 | ||
62 | #include <net/sock.h> | 46 | #include <linux/sunrpc/clnt.h> |
63 | #include <net/checksum.h> | ||
64 | #include <net/udp.h> | ||
65 | #include <net/tcp.h> | ||
66 | 47 | ||
67 | /* | 48 | /* |
68 | * Local variables | 49 | * Local variables |
@@ -73,81 +54,90 @@ | |||
73 | # define RPCDBG_FACILITY RPCDBG_XPRT | 54 | # define RPCDBG_FACILITY RPCDBG_XPRT |
74 | #endif | 55 | #endif |
75 | 56 | ||
76 | #define XPRT_MAX_BACKOFF (8) | ||
77 | #define XPRT_IDLE_TIMEOUT (5*60*HZ) | ||
78 | #define XPRT_MAX_RESVPORT (800) | ||
79 | |||
80 | /* | 57 | /* |
81 | * Local functions | 58 | * Local functions |
82 | */ | 59 | */ |
83 | static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); | 60 | static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); |
84 | static inline void do_xprt_reserve(struct rpc_task *); | 61 | static inline void do_xprt_reserve(struct rpc_task *); |
85 | static void xprt_disconnect(struct rpc_xprt *); | ||
86 | static void xprt_connect_status(struct rpc_task *task); | 62 | static void xprt_connect_status(struct rpc_task *task); |
87 | static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, | ||
88 | struct rpc_timeout *to); | ||
89 | static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); | ||
90 | static void xprt_bind_socket(struct rpc_xprt *, struct socket *); | ||
91 | static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); | 63 | static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); |
92 | 64 | ||
93 | static int xprt_clear_backlog(struct rpc_xprt *xprt); | ||
94 | |||
95 | #ifdef RPC_DEBUG_DATA | ||
96 | /* | 65 | /* |
97 | * Print the buffer contents (first 128 bytes only--just enough for | 66 | * The transport code maintains an estimate on the maximum number of out- |
98 | * diropres return). | 67 | * standing RPC requests, using a smoothed version of the congestion |
68 | * avoidance implemented in 44BSD. This is basically the Van Jacobson | ||
69 | * congestion algorithm: If a retransmit occurs, the congestion window is | ||
70 | * halved; otherwise, it is incremented by 1/cwnd when | ||
71 | * | ||
72 | * - a reply is received and | ||
73 | * - a full number of requests are outstanding and | ||
74 | * - the congestion window hasn't been updated recently. | ||
99 | */ | 75 | */ |
100 | static void | 76 | #define RPC_CWNDSHIFT (8U) |
101 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | 77 | #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) |
102 | { | 78 | #define RPC_INITCWND RPC_CWNDSCALE |
103 | u8 *buf = (u8 *) packet; | 79 | #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) |
104 | int j; | ||
105 | |||
106 | dprintk("RPC: %s\n", msg); | ||
107 | for (j = 0; j < count && j < 128; j += 4) { | ||
108 | if (!(j & 31)) { | ||
109 | if (j) | ||
110 | dprintk("\n"); | ||
111 | dprintk("0x%04x ", j); | ||
112 | } | ||
113 | dprintk("%02x%02x%02x%02x ", | ||
114 | buf[j], buf[j+1], buf[j+2], buf[j+3]); | ||
115 | } | ||
116 | dprintk("\n"); | ||
117 | } | ||
118 | #else | ||
119 | static inline void | ||
120 | xprt_pktdump(char *msg, u32 *packet, unsigned int count) | ||
121 | { | ||
122 | /* NOP */ | ||
123 | } | ||
124 | #endif | ||
125 | 80 | ||
126 | /* | 81 | #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) |
127 | * Look up RPC transport given an INET socket | 82 | |
83 | /** | ||
84 | * xprt_reserve_xprt - serialize write access to transports | ||
85 | * @task: task that is requesting access to the transport | ||
86 | * | ||
87 | * This prevents mixing the payload of separate requests, and prevents | ||
88 | * transport connects from colliding with writes. No congestion control | ||
89 | * is provided. | ||
128 | */ | 90 | */ |
129 | static inline struct rpc_xprt * | 91 | int xprt_reserve_xprt(struct rpc_task *task) |
130 | xprt_from_sock(struct sock *sk) | ||
131 | { | 92 | { |
132 | return (struct rpc_xprt *) sk->sk_user_data; | 93 | struct rpc_xprt *xprt = task->tk_xprt; |
94 | struct rpc_rqst *req = task->tk_rqstp; | ||
95 | |||
96 | if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { | ||
97 | if (task == xprt->snd_task) | ||
98 | return 1; | ||
99 | if (task == NULL) | ||
100 | return 0; | ||
101 | goto out_sleep; | ||
102 | } | ||
103 | xprt->snd_task = task; | ||
104 | if (req) { | ||
105 | req->rq_bytes_sent = 0; | ||
106 | req->rq_ntrans++; | ||
107 | } | ||
108 | return 1; | ||
109 | |||
110 | out_sleep: | ||
111 | dprintk("RPC: %4d failed to lock transport %p\n", | ||
112 | task->tk_pid, xprt); | ||
113 | task->tk_timeout = 0; | ||
114 | task->tk_status = -EAGAIN; | ||
115 | if (req && req->rq_ntrans) | ||
116 | rpc_sleep_on(&xprt->resend, task, NULL, NULL); | ||
117 | else | ||
118 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); | ||
119 | return 0; | ||
133 | } | 120 | } |
134 | 121 | ||
135 | /* | 122 | /* |
136 | * Serialize write access to sockets, in order to prevent different | 123 | * xprt_reserve_xprt_cong - serialize write access to transports |
137 | * requests from interfering with each other. | 124 | * @task: task that is requesting access to the transport |
138 | * Also prevents TCP socket connects from colliding with writes. | 125 | * |
126 | * Same as xprt_reserve_xprt, but Van Jacobson congestion control is | ||
127 | * integrated into the decision of whether a request is allowed to be | ||
128 | * woken up and given access to the transport. | ||
139 | */ | 129 | */ |
140 | static int | 130 | int xprt_reserve_xprt_cong(struct rpc_task *task) |
141 | __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) | ||
142 | { | 131 | { |
132 | struct rpc_xprt *xprt = task->tk_xprt; | ||
143 | struct rpc_rqst *req = task->tk_rqstp; | 133 | struct rpc_rqst *req = task->tk_rqstp; |
144 | 134 | ||
145 | if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { | 135 | if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { |
146 | if (task == xprt->snd_task) | 136 | if (task == xprt->snd_task) |
147 | return 1; | 137 | return 1; |
148 | goto out_sleep; | 138 | goto out_sleep; |
149 | } | 139 | } |
150 | if (xprt->nocong || __xprt_get_cong(xprt, task)) { | 140 | if (__xprt_get_cong(xprt, task)) { |
151 | xprt->snd_task = task; | 141 | xprt->snd_task = task; |
152 | if (req) { | 142 | if (req) { |
153 | req->rq_bytes_sent = 0; | 143 | req->rq_bytes_sent = 0; |
@@ -156,10 +146,10 @@ __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) | |||
156 | return 1; | 146 | return 1; |
157 | } | 147 | } |
158 | smp_mb__before_clear_bit(); | 148 | smp_mb__before_clear_bit(); |
159 | clear_bit(XPRT_LOCKED, &xprt->sockstate); | 149 | clear_bit(XPRT_LOCKED, &xprt->state); |
160 | smp_mb__after_clear_bit(); | 150 | smp_mb__after_clear_bit(); |
161 | out_sleep: | 151 | out_sleep: |
162 | dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); | 152 | dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt); |
163 | task->tk_timeout = 0; | 153 | task->tk_timeout = 0; |
164 | task->tk_status = -EAGAIN; | 154 | task->tk_status = -EAGAIN; |
165 | if (req && req->rq_ntrans) | 155 | if (req && req->rq_ntrans) |
@@ -169,26 +159,52 @@ out_sleep: | |||
169 | return 0; | 159 | return 0; |
170 | } | 160 | } |
171 | 161 | ||
172 | static inline int | 162 | static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) |
173 | xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) | ||
174 | { | 163 | { |
175 | int retval; | 164 | int retval; |
176 | 165 | ||
177 | spin_lock_bh(&xprt->sock_lock); | 166 | spin_lock_bh(&xprt->transport_lock); |
178 | retval = __xprt_lock_write(xprt, task); | 167 | retval = xprt->ops->reserve_xprt(task); |
179 | spin_unlock_bh(&xprt->sock_lock); | 168 | spin_unlock_bh(&xprt->transport_lock); |
180 | return retval; | 169 | return retval; |
181 | } | 170 | } |
182 | 171 | ||
172 | static void __xprt_lock_write_next(struct rpc_xprt *xprt) | ||
173 | { | ||
174 | struct rpc_task *task; | ||
175 | struct rpc_rqst *req; | ||
183 | 176 | ||
184 | static void | 177 | if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) |
185 | __xprt_lock_write_next(struct rpc_xprt *xprt) | 178 | return; |
179 | |||
180 | task = rpc_wake_up_next(&xprt->resend); | ||
181 | if (!task) { | ||
182 | task = rpc_wake_up_next(&xprt->sending); | ||
183 | if (!task) | ||
184 | goto out_unlock; | ||
185 | } | ||
186 | |||
187 | req = task->tk_rqstp; | ||
188 | xprt->snd_task = task; | ||
189 | if (req) { | ||
190 | req->rq_bytes_sent = 0; | ||
191 | req->rq_ntrans++; | ||
192 | } | ||
193 | return; | ||
194 | |||
195 | out_unlock: | ||
196 | smp_mb__before_clear_bit(); | ||
197 | clear_bit(XPRT_LOCKED, &xprt->state); | ||
198 | smp_mb__after_clear_bit(); | ||
199 | } | ||
200 | |||
201 | static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) | ||
186 | { | 202 | { |
187 | struct rpc_task *task; | 203 | struct rpc_task *task; |
188 | 204 | ||
189 | if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) | 205 | if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) |
190 | return; | 206 | return; |
191 | if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) | 207 | if (RPCXPRT_CONGESTED(xprt)) |
192 | goto out_unlock; | 208 | goto out_unlock; |
193 | task = rpc_wake_up_next(&xprt->resend); | 209 | task = rpc_wake_up_next(&xprt->resend); |
194 | if (!task) { | 210 | if (!task) { |
@@ -196,7 +212,7 @@ __xprt_lock_write_next(struct rpc_xprt *xprt) | |||
196 | if (!task) | 212 | if (!task) |
197 | goto out_unlock; | 213 | goto out_unlock; |
198 | } | 214 | } |
199 | if (xprt->nocong || __xprt_get_cong(xprt, task)) { | 215 | if (__xprt_get_cong(xprt, task)) { |
200 | struct rpc_rqst *req = task->tk_rqstp; | 216 | struct rpc_rqst *req = task->tk_rqstp; |
201 | xprt->snd_task = task; | 217 | xprt->snd_task = task; |
202 | if (req) { | 218 | if (req) { |
@@ -207,87 +223,52 @@ __xprt_lock_write_next(struct rpc_xprt *xprt) | |||
207 | } | 223 | } |
208 | out_unlock: | 224 | out_unlock: |
209 | smp_mb__before_clear_bit(); | 225 | smp_mb__before_clear_bit(); |
210 | clear_bit(XPRT_LOCKED, &xprt->sockstate); | 226 | clear_bit(XPRT_LOCKED, &xprt->state); |
211 | smp_mb__after_clear_bit(); | 227 | smp_mb__after_clear_bit(); |
212 | } | 228 | } |
213 | 229 | ||
214 | /* | 230 | /** |
215 | * Releases the socket for use by other requests. | 231 | * xprt_release_xprt - allow other requests to use a transport |
232 | * @xprt: transport with other tasks potentially waiting | ||
233 | * @task: task that is releasing access to the transport | ||
234 | * | ||
235 | * Note that "task" can be NULL. No congestion control is provided. | ||
216 | */ | 236 | */ |
217 | static void | 237 | void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) |
218 | __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) | ||
219 | { | 238 | { |
220 | if (xprt->snd_task == task) { | 239 | if (xprt->snd_task == task) { |
221 | xprt->snd_task = NULL; | 240 | xprt->snd_task = NULL; |
222 | smp_mb__before_clear_bit(); | 241 | smp_mb__before_clear_bit(); |
223 | clear_bit(XPRT_LOCKED, &xprt->sockstate); | 242 | clear_bit(XPRT_LOCKED, &xprt->state); |
224 | smp_mb__after_clear_bit(); | 243 | smp_mb__after_clear_bit(); |
225 | __xprt_lock_write_next(xprt); | 244 | __xprt_lock_write_next(xprt); |
226 | } | 245 | } |
227 | } | 246 | } |
228 | 247 | ||
229 | static inline void | 248 | /** |
230 | xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) | 249 | * xprt_release_xprt_cong - allow other requests to use a transport |
231 | { | 250 | * @xprt: transport with other tasks potentially waiting |
232 | spin_lock_bh(&xprt->sock_lock); | 251 | * @task: task that is releasing access to the transport |
233 | __xprt_release_write(xprt, task); | 252 | * |
234 | spin_unlock_bh(&xprt->sock_lock); | 253 | * Note that "task" can be NULL. Another task is awoken to use the |
235 | } | 254 | * transport if the transport's congestion window allows it. |
236 | |||
237 | /* | ||
238 | * Write data to socket. | ||
239 | */ | 255 | */ |
240 | static inline int | 256 | void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) |
241 | xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) | ||
242 | { | 257 | { |
243 | struct socket *sock = xprt->sock; | 258 | if (xprt->snd_task == task) { |
244 | struct xdr_buf *xdr = &req->rq_snd_buf; | 259 | xprt->snd_task = NULL; |
245 | struct sockaddr *addr = NULL; | 260 | smp_mb__before_clear_bit(); |
246 | int addrlen = 0; | 261 | clear_bit(XPRT_LOCKED, &xprt->state); |
247 | unsigned int skip; | 262 | smp_mb__after_clear_bit(); |
248 | int result; | 263 | __xprt_lock_write_next_cong(xprt); |
249 | |||
250 | if (!sock) | ||
251 | return -ENOTCONN; | ||
252 | |||
253 | xprt_pktdump("packet data:", | ||
254 | req->rq_svec->iov_base, | ||
255 | req->rq_svec->iov_len); | ||
256 | |||
257 | /* For UDP, we need to provide an address */ | ||
258 | if (!xprt->stream) { | ||
259 | addr = (struct sockaddr *) &xprt->addr; | ||
260 | addrlen = sizeof(xprt->addr); | ||
261 | } | 264 | } |
262 | /* Dont repeat bytes */ | 265 | } |
263 | skip = req->rq_bytes_sent; | ||
264 | |||
265 | clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); | ||
266 | result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); | ||
267 | |||
268 | dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); | ||
269 | |||
270 | if (result >= 0) | ||
271 | return result; | ||
272 | 266 | ||
273 | switch (result) { | 267 | static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) |
274 | case -ECONNREFUSED: | 268 | { |
275 | /* When the server has died, an ICMP port unreachable message | 269 | spin_lock_bh(&xprt->transport_lock); |
276 | * prompts ECONNREFUSED. | 270 | xprt->ops->release_xprt(xprt, task); |
277 | */ | 271 | spin_unlock_bh(&xprt->transport_lock); |
278 | case -EAGAIN: | ||
279 | break; | ||
280 | case -ECONNRESET: | ||
281 | case -ENOTCONN: | ||
282 | case -EPIPE: | ||
283 | /* connection broken */ | ||
284 | if (xprt->stream) | ||
285 | result = -ENOTCONN; | ||
286 | break; | ||
287 | default: | ||
288 | printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); | ||
289 | } | ||
290 | return result; | ||
291 | } | 272 | } |
292 | 273 | ||
293 | /* | 274 | /* |
@@ -321,26 +302,40 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) | |||
321 | return; | 302 | return; |
322 | req->rq_cong = 0; | 303 | req->rq_cong = 0; |
323 | xprt->cong -= RPC_CWNDSCALE; | 304 | xprt->cong -= RPC_CWNDSCALE; |
324 | __xprt_lock_write_next(xprt); | 305 | __xprt_lock_write_next_cong(xprt); |
325 | } | 306 | } |
326 | 307 | ||
327 | /* | 308 | /** |
328 | * Adjust RPC congestion window | 309 | * xprt_release_rqst_cong - housekeeping when request is complete |
310 | * @task: RPC request that recently completed | ||
311 | * | ||
312 | * Useful for transports that require congestion control. | ||
313 | */ | ||
314 | void xprt_release_rqst_cong(struct rpc_task *task) | ||
315 | { | ||
316 | __xprt_put_cong(task->tk_xprt, task->tk_rqstp); | ||
317 | } | ||
318 | |||
319 | /** | ||
320 | * xprt_adjust_cwnd - adjust transport congestion window | ||
321 | * @task: recently completed RPC request used to adjust window | ||
322 | * @result: result code of completed RPC request | ||
323 | * | ||
329 | * We use a time-smoothed congestion estimator to avoid heavy oscillation. | 324 | * We use a time-smoothed congestion estimator to avoid heavy oscillation. |
330 | */ | 325 | */ |
331 | static void | 326 | void xprt_adjust_cwnd(struct rpc_task *task, int result) |
332 | xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) | ||
333 | { | 327 | { |
334 | unsigned long cwnd; | 328 | struct rpc_rqst *req = task->tk_rqstp; |
329 | struct rpc_xprt *xprt = task->tk_xprt; | ||
330 | unsigned long cwnd = xprt->cwnd; | ||
335 | 331 | ||
336 | cwnd = xprt->cwnd; | ||
337 | if (result >= 0 && cwnd <= xprt->cong) { | 332 | if (result >= 0 && cwnd <= xprt->cong) { |
338 | /* The (cwnd >> 1) term makes sure | 333 | /* The (cwnd >> 1) term makes sure |
339 | * the result gets rounded properly. */ | 334 | * the result gets rounded properly. */ |
340 | cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; | 335 | cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; |
341 | if (cwnd > RPC_MAXCWND(xprt)) | 336 | if (cwnd > RPC_MAXCWND(xprt)) |
342 | cwnd = RPC_MAXCWND(xprt); | 337 | cwnd = RPC_MAXCWND(xprt); |
343 | __xprt_lock_write_next(xprt); | 338 | __xprt_lock_write_next_cong(xprt); |
344 | } else if (result == -ETIMEDOUT) { | 339 | } else if (result == -ETIMEDOUT) { |
345 | cwnd >>= 1; | 340 | cwnd >>= 1; |
346 | if (cwnd < RPC_CWNDSCALE) | 341 | if (cwnd < RPC_CWNDSCALE) |
@@ -349,11 +344,89 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) | |||
349 | dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", | 344 | dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", |
350 | xprt->cong, xprt->cwnd, cwnd); | 345 | xprt->cong, xprt->cwnd, cwnd); |
351 | xprt->cwnd = cwnd; | 346 | xprt->cwnd = cwnd; |
347 | __xprt_put_cong(xprt, req); | ||
348 | } | ||
349 | |||
350 | /** | ||
351 | * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue | ||
352 | * @xprt: transport with waiting tasks | ||
353 | * @status: result code to plant in each task before waking it | ||
354 | * | ||
355 | */ | ||
356 | void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) | ||
357 | { | ||
358 | if (status < 0) | ||
359 | rpc_wake_up_status(&xprt->pending, status); | ||
360 | else | ||
361 | rpc_wake_up(&xprt->pending); | ||
362 | } | ||
363 | |||
364 | /** | ||
365 | * xprt_wait_for_buffer_space - wait for transport output buffer to clear | ||
366 | * @task: task to be put to sleep | ||
367 | * | ||
368 | */ | ||
369 | void xprt_wait_for_buffer_space(struct rpc_task *task) | ||
370 | { | ||
371 | struct rpc_rqst *req = task->tk_rqstp; | ||
372 | struct rpc_xprt *xprt = req->rq_xprt; | ||
373 | |||
374 | task->tk_timeout = req->rq_timeout; | ||
375 | rpc_sleep_on(&xprt->pending, task, NULL, NULL); | ||
376 | } | ||
377 | |||
378 | /** | ||
379 | * xprt_write_space - wake the task waiting for transport output buffer space | ||
380 | * @xprt: transport with waiting tasks | ||
381 | * | ||
382 | * Can be called in a soft IRQ context, so xprt_write_space never sleeps. | ||
383 | */ | ||
384 | void xprt_write_space(struct rpc_xprt *xprt) | ||
385 | { | ||
386 | if (unlikely(xprt->shutdown)) | ||
387 | return; | ||
388 | |||
389 | spin_lock_bh(&xprt->transport_lock); | ||
390 | if (xprt->snd_task) { | ||
391 | dprintk("RPC: write space: waking waiting task on xprt %p\n", | ||
392 | xprt); | ||
393 | rpc_wake_up_task(xprt->snd_task); | ||
394 | } | ||
395 | spin_unlock_bh(&xprt->transport_lock); | ||
396 | } | ||
397 | |||
398 | /** | ||
399 | * xprt_set_retrans_timeout_def - set a request's retransmit timeout | ||
400 | * @task: task whose timeout is to be set | ||
401 | * | ||
402 | * Set a request's retransmit timeout based on the transport's | ||
403 | * default timeout parameters. Used by transports that don't adjust | ||
404 | * the retransmit timeout based on round-trip time estimation. | ||
405 | */ | ||
406 | void xprt_set_retrans_timeout_def(struct rpc_task *task) | ||
407 | { | ||
408 | task->tk_timeout = task->tk_rqstp->rq_timeout; | ||
352 | } | 409 | } |
353 | 410 | ||
354 | /* | 411 | /* |
355 | * Reset the major timeout value | 412 | * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout |
413 | * @task: task whose timeout is to be set | ||
414 | * | ||
415 | * Set a request's retransmit timeout using the RTT estimator. | ||
356 | */ | 416 | */ |
417 | void xprt_set_retrans_timeout_rtt(struct rpc_task *task) | ||
418 | { | ||
419 | int timer = task->tk_msg.rpc_proc->p_timer; | ||
420 | struct rpc_rtt *rtt = task->tk_client->cl_rtt; | ||
421 | struct rpc_rqst *req = task->tk_rqstp; | ||
422 | unsigned long max_timeout = req->rq_xprt->timeout.to_maxval; | ||
423 | |||
424 | task->tk_timeout = rpc_calc_rto(rtt, timer); | ||
425 | task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; | ||
426 | if (task->tk_timeout > max_timeout || task->tk_timeout == 0) | ||
427 | task->tk_timeout = max_timeout; | ||
428 | } | ||
429 | |||
357 | static void xprt_reset_majortimeo(struct rpc_rqst *req) | 430 | static void xprt_reset_majortimeo(struct rpc_rqst *req) |
358 | { | 431 | { |
359 | struct rpc_timeout *to = &req->rq_xprt->timeout; | 432 | struct rpc_timeout *to = &req->rq_xprt->timeout; |
@@ -368,8 +441,10 @@ static void xprt_reset_majortimeo(struct rpc_rqst *req) | |||
368 | req->rq_majortimeo += jiffies; | 441 | req->rq_majortimeo += jiffies; |
369 | } | 442 | } |
370 | 443 | ||
371 | /* | 444 | /** |
372 | * Adjust timeout values etc for next retransmit | 445 | * xprt_adjust_timeout - adjust timeout values for next retransmit |
446 | * @req: RPC request containing parameters to use for the adjustment | ||
447 | * | ||
373 | */ | 448 | */ |
374 | int xprt_adjust_timeout(struct rpc_rqst *req) | 449 | int xprt_adjust_timeout(struct rpc_rqst *req) |
375 | { | 450 | { |
@@ -391,9 +466,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req) | |||
391 | req->rq_retries = 0; | 466 | req->rq_retries = 0; |
392 | xprt_reset_majortimeo(req); | 467 | xprt_reset_majortimeo(req); |
393 | /* Reset the RTT counters == "slow start" */ | 468 | /* Reset the RTT counters == "slow start" */ |
394 | spin_lock_bh(&xprt->sock_lock); | 469 | spin_lock_bh(&xprt->transport_lock); |
395 | rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); | 470 | rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); |
396 | spin_unlock_bh(&xprt->sock_lock); | 471 | spin_unlock_bh(&xprt->transport_lock); |
397 | pprintk("RPC: %lu timeout\n", jiffies); | 472 | pprintk("RPC: %lu timeout\n", jiffies); |
398 | status = -ETIMEDOUT; | 473 | status = -ETIMEDOUT; |
399 | } | 474 | } |
@@ -405,133 +480,52 @@ int xprt_adjust_timeout(struct rpc_rqst *req) | |||
405 | return status; | 480 | return status; |
406 | } | 481 | } |
407 | 482 | ||
408 | /* | 483 | static void xprt_autoclose(void *args) |
409 | * Close down a transport socket | ||
410 | */ | ||
411 | static void | ||
412 | xprt_close(struct rpc_xprt *xprt) | ||
413 | { | ||
414 | struct socket *sock = xprt->sock; | ||
415 | struct sock *sk = xprt->inet; | ||
416 | |||
417 | if (!sk) | ||
418 | return; | ||
419 | |||
420 | write_lock_bh(&sk->sk_callback_lock); | ||
421 | xprt->inet = NULL; | ||
422 | xprt->sock = NULL; | ||
423 | |||
424 | sk->sk_user_data = NULL; | ||
425 | sk->sk_data_ready = xprt->old_data_ready; | ||
426 | sk->sk_state_change = xprt->old_state_change; | ||
427 | sk->sk_write_space = xprt->old_write_space; | ||
428 | write_unlock_bh(&sk->sk_callback_lock); | ||
429 | |||
430 | sk->sk_no_check = 0; | ||
431 | |||
432 | sock_release(sock); | ||
433 | } | ||
434 | |||
435 | static void | ||
436 | xprt_socket_autoclose(void *args) | ||
437 | { | 484 | { |
438 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | 485 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; |
439 | 486 | ||
440 | xprt_disconnect(xprt); | 487 | xprt_disconnect(xprt); |
441 | xprt_close(xprt); | 488 | xprt->ops->close(xprt); |
442 | xprt_release_write(xprt, NULL); | 489 | xprt_release_write(xprt, NULL); |
443 | } | 490 | } |
444 | 491 | ||
445 | /* | 492 | /** |
446 | * Mark a transport as disconnected | 493 | * xprt_disconnect - mark a transport as disconnected |
494 | * @xprt: transport to flag for disconnect | ||
495 | * | ||
447 | */ | 496 | */ |
448 | static void | 497 | void xprt_disconnect(struct rpc_xprt *xprt) |
449 | xprt_disconnect(struct rpc_xprt *xprt) | ||
450 | { | 498 | { |
451 | dprintk("RPC: disconnected transport %p\n", xprt); | 499 | dprintk("RPC: disconnected transport %p\n", xprt); |
452 | spin_lock_bh(&xprt->sock_lock); | 500 | spin_lock_bh(&xprt->transport_lock); |
453 | xprt_clear_connected(xprt); | 501 | xprt_clear_connected(xprt); |
454 | rpc_wake_up_status(&xprt->pending, -ENOTCONN); | 502 | xprt_wake_pending_tasks(xprt, -ENOTCONN); |
455 | spin_unlock_bh(&xprt->sock_lock); | 503 | spin_unlock_bh(&xprt->transport_lock); |
456 | } | 504 | } |
457 | 505 | ||
458 | /* | ||
459 | * Used to allow disconnection when we've been idle | ||
460 | */ | ||
461 | static void | 506 | static void |
462 | xprt_init_autodisconnect(unsigned long data) | 507 | xprt_init_autodisconnect(unsigned long data) |
463 | { | 508 | { |
464 | struct rpc_xprt *xprt = (struct rpc_xprt *)data; | 509 | struct rpc_xprt *xprt = (struct rpc_xprt *)data; |
465 | 510 | ||
466 | spin_lock(&xprt->sock_lock); | 511 | spin_lock(&xprt->transport_lock); |
467 | if (!list_empty(&xprt->recv) || xprt->shutdown) | 512 | if (!list_empty(&xprt->recv) || xprt->shutdown) |
468 | goto out_abort; | 513 | goto out_abort; |
469 | if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) | 514 | if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) |
470 | goto out_abort; | 515 | goto out_abort; |
471 | spin_unlock(&xprt->sock_lock); | 516 | spin_unlock(&xprt->transport_lock); |
472 | /* Let keventd close the socket */ | 517 | if (xprt_connecting(xprt)) |
473 | if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) | ||
474 | xprt_release_write(xprt, NULL); | 518 | xprt_release_write(xprt, NULL); |
475 | else | 519 | else |
476 | schedule_work(&xprt->task_cleanup); | 520 | schedule_work(&xprt->task_cleanup); |
477 | return; | 521 | return; |
478 | out_abort: | 522 | out_abort: |
479 | spin_unlock(&xprt->sock_lock); | 523 | spin_unlock(&xprt->transport_lock); |
480 | } | ||
481 | |||
482 | static void xprt_socket_connect(void *args) | ||
483 | { | ||
484 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | ||
485 | struct socket *sock = xprt->sock; | ||
486 | int status = -EIO; | ||
487 | |||
488 | if (xprt->shutdown || xprt->addr.sin_port == 0) | ||
489 | goto out; | ||
490 | |||
491 | /* | ||
492 | * Start by resetting any existing state | ||
493 | */ | ||
494 | xprt_close(xprt); | ||
495 | sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); | ||
496 | if (sock == NULL) { | ||
497 | /* couldn't create socket or bind to reserved port; | ||
498 | * this is likely a permanent error, so cause an abort */ | ||
499 | goto out; | ||
500 | } | ||
501 | xprt_bind_socket(xprt, sock); | ||
502 | xprt_sock_setbufsize(xprt); | ||
503 | |||
504 | status = 0; | ||
505 | if (!xprt->stream) | ||
506 | goto out; | ||
507 | |||
508 | /* | ||
509 | * Tell the socket layer to start connecting... | ||
510 | */ | ||
511 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, | ||
512 | sizeof(xprt->addr), O_NONBLOCK); | ||
513 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", | ||
514 | xprt, -status, xprt_connected(xprt), sock->sk->sk_state); | ||
515 | if (status < 0) { | ||
516 | switch (status) { | ||
517 | case -EINPROGRESS: | ||
518 | case -EALREADY: | ||
519 | goto out_clear; | ||
520 | } | ||
521 | } | ||
522 | out: | ||
523 | if (status < 0) | ||
524 | rpc_wake_up_status(&xprt->pending, status); | ||
525 | else | ||
526 | rpc_wake_up(&xprt->pending); | ||
527 | out_clear: | ||
528 | smp_mb__before_clear_bit(); | ||
529 | clear_bit(XPRT_CONNECTING, &xprt->sockstate); | ||
530 | smp_mb__after_clear_bit(); | ||
531 | } | 524 | } |
532 | 525 | ||
533 | /* | 526 | /** |
534 | * Attempt to connect a TCP socket. | 527 | * xprt_connect - schedule a transport connect operation |
528 | * @task: RPC task that is requesting the connect | ||
535 | * | 529 | * |
536 | */ | 530 | */ |
537 | void xprt_connect(struct rpc_task *task) | 531 | void xprt_connect(struct rpc_task *task) |
@@ -552,37 +546,19 @@ void xprt_connect(struct rpc_task *task) | |||
552 | if (!xprt_lock_write(xprt, task)) | 546 | if (!xprt_lock_write(xprt, task)) |
553 | return; | 547 | return; |
554 | if (xprt_connected(xprt)) | 548 | if (xprt_connected(xprt)) |
555 | goto out_write; | 549 | xprt_release_write(xprt, task); |
550 | else { | ||
551 | if (task->tk_rqstp) | ||
552 | task->tk_rqstp->rq_bytes_sent = 0; | ||
556 | 553 | ||
557 | if (task->tk_rqstp) | 554 | task->tk_timeout = xprt->connect_timeout; |
558 | task->tk_rqstp->rq_bytes_sent = 0; | 555 | rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); |
559 | 556 | xprt->ops->connect(task); | |
560 | task->tk_timeout = RPC_CONNECT_TIMEOUT; | ||
561 | rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); | ||
562 | if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { | ||
563 | /* Note: if we are here due to a dropped connection | ||
564 | * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ | ||
565 | * seconds | ||
566 | */ | ||
567 | if (xprt->sock != NULL) | ||
568 | schedule_delayed_work(&xprt->sock_connect, | ||
569 | RPC_REESTABLISH_TIMEOUT); | ||
570 | else { | ||
571 | schedule_work(&xprt->sock_connect); | ||
572 | if (!RPC_IS_ASYNC(task)) | ||
573 | flush_scheduled_work(); | ||
574 | } | ||
575 | } | 557 | } |
576 | return; | 558 | return; |
577 | out_write: | ||
578 | xprt_release_write(xprt, task); | ||
579 | } | 559 | } |
580 | 560 | ||
581 | /* | 561 | static void xprt_connect_status(struct rpc_task *task) |
582 | * We arrive here when awoken from waiting on connection establishment. | ||
583 | */ | ||
584 | static void | ||
585 | xprt_connect_status(struct rpc_task *task) | ||
586 | { | 562 | { |
587 | struct rpc_xprt *xprt = task->tk_xprt; | 563 | struct rpc_xprt *xprt = task->tk_xprt; |
588 | 564 | ||
@@ -592,31 +568,42 @@ xprt_connect_status(struct rpc_task *task) | |||
592 | return; | 568 | return; |
593 | } | 569 | } |
594 | 570 | ||
595 | /* if soft mounted, just cause this RPC to fail */ | ||
596 | if (RPC_IS_SOFT(task)) | ||
597 | task->tk_status = -EIO; | ||
598 | |||
599 | switch (task->tk_status) { | 571 | switch (task->tk_status) { |
600 | case -ECONNREFUSED: | 572 | case -ECONNREFUSED: |
601 | case -ECONNRESET: | 573 | case -ECONNRESET: |
574 | dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n", | ||
575 | task->tk_pid, task->tk_client->cl_server); | ||
576 | break; | ||
602 | case -ENOTCONN: | 577 | case -ENOTCONN: |
603 | return; | 578 | dprintk("RPC: %4d xprt_connect_status: connection broken\n", |
579 | task->tk_pid); | ||
580 | break; | ||
604 | case -ETIMEDOUT: | 581 | case -ETIMEDOUT: |
605 | dprintk("RPC: %4d xprt_connect_status: timed out\n", | 582 | dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n", |
606 | task->tk_pid); | 583 | task->tk_pid); |
607 | break; | 584 | break; |
608 | default: | 585 | default: |
609 | printk(KERN_ERR "RPC: error %d connecting to server %s\n", | 586 | dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n", |
610 | -task->tk_status, task->tk_client->cl_server); | 587 | task->tk_pid, -task->tk_status, task->tk_client->cl_server); |
588 | xprt_release_write(xprt, task); | ||
589 | task->tk_status = -EIO; | ||
590 | return; | ||
591 | } | ||
592 | |||
593 | /* if soft mounted, just cause this RPC to fail */ | ||
594 | if (RPC_IS_SOFT(task)) { | ||
595 | xprt_release_write(xprt, task); | ||
596 | task->tk_status = -EIO; | ||
611 | } | 597 | } |
612 | xprt_release_write(xprt, task); | ||
613 | } | 598 | } |
614 | 599 | ||
615 | /* | 600 | /** |
616 | * Look up the RPC request corresponding to a reply, and then lock it. | 601 | * xprt_lookup_rqst - find an RPC request corresponding to an XID |
602 | * @xprt: transport on which the original request was transmitted | ||
603 | * @xid: RPC XID of incoming reply | ||
604 | * | ||
617 | */ | 605 | */ |
618 | static inline struct rpc_rqst * | 606 | struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) |
619 | xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) | ||
620 | { | 607 | { |
621 | struct list_head *pos; | 608 | struct list_head *pos; |
622 | struct rpc_rqst *req = NULL; | 609 | struct rpc_rqst *req = NULL; |
@@ -631,556 +618,68 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) | |||
631 | return req; | 618 | return req; |
632 | } | 619 | } |
633 | 620 | ||
634 | /* | 621 | /** |
635 | * Complete reply received. | 622 | * xprt_update_rtt - update an RPC client's RTT state after receiving a reply |
636 | * The TCP code relies on us to remove the request from xprt->pending. | 623 | * @task: RPC request that recently completed |
637 | */ | 624 | * |
638 | static void | ||
639 | xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) | ||
640 | { | ||
641 | struct rpc_task *task = req->rq_task; | ||
642 | struct rpc_clnt *clnt = task->tk_client; | ||
643 | |||
644 | /* Adjust congestion window */ | ||
645 | if (!xprt->nocong) { | ||
646 | unsigned timer = task->tk_msg.rpc_proc->p_timer; | ||
647 | xprt_adjust_cwnd(xprt, copied); | ||
648 | __xprt_put_cong(xprt, req); | ||
649 | if (timer) { | ||
650 | if (req->rq_ntrans == 1) | ||
651 | rpc_update_rtt(clnt->cl_rtt, timer, | ||
652 | (long)jiffies - req->rq_xtime); | ||
653 | rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); | ||
654 | } | ||
655 | } | ||
656 | |||
657 | #ifdef RPC_PROFILE | ||
658 | /* Profile only reads for now */ | ||
659 | if (copied > 1024) { | ||
660 | static unsigned long nextstat; | ||
661 | static unsigned long pkt_rtt, pkt_len, pkt_cnt; | ||
662 | |||
663 | pkt_cnt++; | ||
664 | pkt_len += req->rq_slen + copied; | ||
665 | pkt_rtt += jiffies - req->rq_xtime; | ||
666 | if (time_before(nextstat, jiffies)) { | ||
667 | printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); | ||
668 | printk("RPC: %ld %ld %ld %ld stat\n", | ||
669 | jiffies, pkt_cnt, pkt_len, pkt_rtt); | ||
670 | pkt_rtt = pkt_len = pkt_cnt = 0; | ||
671 | nextstat = jiffies + 5 * HZ; | ||
672 | } | ||
673 | } | ||
674 | #endif | ||
675 | |||
676 | dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); | ||
677 | list_del_init(&req->rq_list); | ||
678 | req->rq_received = req->rq_private_buf.len = copied; | ||
679 | |||
680 | /* ... and wake up the process. */ | ||
681 | rpc_wake_up_task(task); | ||
682 | return; | ||
683 | } | ||
684 | |||
685 | static size_t | ||
686 | skb_read_bits(skb_reader_t *desc, void *to, size_t len) | ||
687 | { | ||
688 | if (len > desc->count) | ||
689 | len = desc->count; | ||
690 | if (skb_copy_bits(desc->skb, desc->offset, to, len)) | ||
691 | return 0; | ||
692 | desc->count -= len; | ||
693 | desc->offset += len; | ||
694 | return len; | ||
695 | } | ||
696 | |||
697 | static size_t | ||
698 | skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len) | ||
699 | { | ||
700 | unsigned int csum2, pos; | ||
701 | |||
702 | if (len > desc->count) | ||
703 | len = desc->count; | ||
704 | pos = desc->offset; | ||
705 | csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0); | ||
706 | desc->csum = csum_block_add(desc->csum, csum2, pos); | ||
707 | desc->count -= len; | ||
708 | desc->offset += len; | ||
709 | return len; | ||
710 | } | ||
711 | |||
712 | /* | ||
713 | * We have set things up such that we perform the checksum of the UDP | ||
714 | * packet in parallel with the copies into the RPC client iovec. -DaveM | ||
715 | */ | ||
716 | int | ||
717 | csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) | ||
718 | { | ||
719 | skb_reader_t desc; | ||
720 | |||
721 | desc.skb = skb; | ||
722 | desc.offset = sizeof(struct udphdr); | ||
723 | desc.count = skb->len - desc.offset; | ||
724 | |||
725 | if (skb->ip_summed == CHECKSUM_UNNECESSARY) | ||
726 | goto no_checksum; | ||
727 | |||
728 | desc.csum = csum_partial(skb->data, desc.offset, skb->csum); | ||
729 | if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits) < 0) | ||
730 | return -1; | ||
731 | if (desc.offset != skb->len) { | ||
732 | unsigned int csum2; | ||
733 | csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0); | ||
734 | desc.csum = csum_block_add(desc.csum, csum2, desc.offset); | ||
735 | } | ||
736 | if (desc.count) | ||
737 | return -1; | ||
738 | if ((unsigned short)csum_fold(desc.csum)) | ||
739 | return -1; | ||
740 | return 0; | ||
741 | no_checksum: | ||
742 | if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits) < 0) | ||
743 | return -1; | ||
744 | if (desc.count) | ||
745 | return -1; | ||
746 | return 0; | ||
747 | } | ||
748 | |||
749 | /* | ||
750 | * Input handler for RPC replies. Called from a bottom half and hence | ||
751 | * atomic. | ||
752 | */ | ||
753 | static void | ||
754 | udp_data_ready(struct sock *sk, int len) | ||
755 | { | ||
756 | struct rpc_task *task; | ||
757 | struct rpc_xprt *xprt; | ||
758 | struct rpc_rqst *rovr; | ||
759 | struct sk_buff *skb; | ||
760 | int err, repsize, copied; | ||
761 | u32 _xid, *xp; | ||
762 | |||
763 | read_lock(&sk->sk_callback_lock); | ||
764 | dprintk("RPC: udp_data_ready...\n"); | ||
765 | if (!(xprt = xprt_from_sock(sk))) { | ||
766 | printk("RPC: udp_data_ready request not found!\n"); | ||
767 | goto out; | ||
768 | } | ||
769 | |||
770 | dprintk("RPC: udp_data_ready client %p\n", xprt); | ||
771 | |||
772 | if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) | ||
773 | goto out; | ||
774 | |||
775 | if (xprt->shutdown) | ||
776 | goto dropit; | ||
777 | |||
778 | repsize = skb->len - sizeof(struct udphdr); | ||
779 | if (repsize < 4) { | ||
780 | printk("RPC: impossible RPC reply size %d!\n", repsize); | ||
781 | goto dropit; | ||
782 | } | ||
783 | |||
784 | /* Copy the XID from the skb... */ | ||
785 | xp = skb_header_pointer(skb, sizeof(struct udphdr), | ||
786 | sizeof(_xid), &_xid); | ||
787 | if (xp == NULL) | ||
788 | goto dropit; | ||
789 | |||
790 | /* Look up and lock the request corresponding to the given XID */ | ||
791 | spin_lock(&xprt->sock_lock); | ||
792 | rovr = xprt_lookup_rqst(xprt, *xp); | ||
793 | if (!rovr) | ||
794 | goto out_unlock; | ||
795 | task = rovr->rq_task; | ||
796 | |||
797 | dprintk("RPC: %4d received reply\n", task->tk_pid); | ||
798 | |||
799 | if ((copied = rovr->rq_private_buf.buflen) > repsize) | ||
800 | copied = repsize; | ||
801 | |||
802 | /* Suck it into the iovec, verify checksum if not done by hw. */ | ||
803 | if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) | ||
804 | goto out_unlock; | ||
805 | |||
806 | /* Something worked... */ | ||
807 | dst_confirm(skb->dst); | ||
808 | |||
809 | xprt_complete_rqst(xprt, rovr, copied); | ||
810 | |||
811 | out_unlock: | ||
812 | spin_unlock(&xprt->sock_lock); | ||
813 | dropit: | ||
814 | skb_free_datagram(sk, skb); | ||
815 | out: | ||
816 | read_unlock(&sk->sk_callback_lock); | ||
817 | } | ||
818 | |||
819 | /* | ||
820 | * Copy from an skb into memory and shrink the skb. | ||
821 | */ | ||
822 | static inline size_t | ||
823 | tcp_copy_data(skb_reader_t *desc, void *p, size_t len) | ||
824 | { | ||
825 | if (len > desc->count) | ||
826 | len = desc->count; | ||
827 | if (skb_copy_bits(desc->skb, desc->offset, p, len)) { | ||
828 | dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", | ||
829 | len, desc->count); | ||
830 | return 0; | ||
831 | } | ||
832 | desc->offset += len; | ||
833 | desc->count -= len; | ||
834 | dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", | ||
835 | len, desc->count); | ||
836 | return len; | ||
837 | } | ||
838 | |||
839 | /* | ||
840 | * TCP read fragment marker | ||
841 | */ | ||
842 | static inline void | ||
843 | tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
844 | { | ||
845 | size_t len, used; | ||
846 | char *p; | ||
847 | |||
848 | p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; | ||
849 | len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; | ||
850 | used = tcp_copy_data(desc, p, len); | ||
851 | xprt->tcp_offset += used; | ||
852 | if (used != len) | ||
853 | return; | ||
854 | xprt->tcp_reclen = ntohl(xprt->tcp_recm); | ||
855 | if (xprt->tcp_reclen & 0x80000000) | ||
856 | xprt->tcp_flags |= XPRT_LAST_FRAG; | ||
857 | else | ||
858 | xprt->tcp_flags &= ~XPRT_LAST_FRAG; | ||
859 | xprt->tcp_reclen &= 0x7fffffff; | ||
860 | xprt->tcp_flags &= ~XPRT_COPY_RECM; | ||
861 | xprt->tcp_offset = 0; | ||
862 | /* Sanity check of the record length */ | ||
863 | if (xprt->tcp_reclen < 4) { | ||
864 | printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); | ||
865 | xprt_disconnect(xprt); | ||
866 | } | ||
867 | dprintk("RPC: reading TCP record fragment of length %d\n", | ||
868 | xprt->tcp_reclen); | ||
869 | } | ||
870 | |||
871 | static void | ||
872 | tcp_check_recm(struct rpc_xprt *xprt) | ||
873 | { | ||
874 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", | ||
875 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); | ||
876 | if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
877 | xprt->tcp_flags |= XPRT_COPY_RECM; | ||
878 | xprt->tcp_offset = 0; | ||
879 | if (xprt->tcp_flags & XPRT_LAST_FRAG) { | ||
880 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
881 | xprt->tcp_flags |= XPRT_COPY_XID; | ||
882 | xprt->tcp_copied = 0; | ||
883 | } | ||
884 | } | ||
885 | } | ||
886 | |||
887 | /* | ||
888 | * TCP read xid | ||
889 | */ | ||
890 | static inline void | ||
891 | tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
892 | { | ||
893 | size_t len, used; | ||
894 | char *p; | ||
895 | |||
896 | len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; | ||
897 | dprintk("RPC: reading XID (%Zu bytes)\n", len); | ||
898 | p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; | ||
899 | used = tcp_copy_data(desc, p, len); | ||
900 | xprt->tcp_offset += used; | ||
901 | if (used != len) | ||
902 | return; | ||
903 | xprt->tcp_flags &= ~XPRT_COPY_XID; | ||
904 | xprt->tcp_flags |= XPRT_COPY_DATA; | ||
905 | xprt->tcp_copied = 4; | ||
906 | dprintk("RPC: reading reply for XID %08x\n", | ||
907 | ntohl(xprt->tcp_xid)); | ||
908 | tcp_check_recm(xprt); | ||
909 | } | ||
910 | |||
911 | /* | ||
912 | * TCP read and complete request | ||
913 | */ | ||
914 | static inline void | ||
915 | tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
916 | { | ||
917 | struct rpc_rqst *req; | ||
918 | struct xdr_buf *rcvbuf; | ||
919 | size_t len; | ||
920 | ssize_t r; | ||
921 | |||
922 | /* Find and lock the request corresponding to this xid */ | ||
923 | spin_lock(&xprt->sock_lock); | ||
924 | req = xprt_lookup_rqst(xprt, xprt->tcp_xid); | ||
925 | if (!req) { | ||
926 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
927 | dprintk("RPC: XID %08x request not found!\n", | ||
928 | ntohl(xprt->tcp_xid)); | ||
929 | spin_unlock(&xprt->sock_lock); | ||
930 | return; | ||
931 | } | ||
932 | |||
933 | rcvbuf = &req->rq_private_buf; | ||
934 | len = desc->count; | ||
935 | if (len > xprt->tcp_reclen - xprt->tcp_offset) { | ||
936 | skb_reader_t my_desc; | ||
937 | |||
938 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
939 | memcpy(&my_desc, desc, sizeof(my_desc)); | ||
940 | my_desc.count = len; | ||
941 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
942 | &my_desc, tcp_copy_data); | ||
943 | desc->count -= r; | ||
944 | desc->offset += r; | ||
945 | } else | ||
946 | r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, | ||
947 | desc, tcp_copy_data); | ||
948 | |||
949 | if (r > 0) { | ||
950 | xprt->tcp_copied += r; | ||
951 | xprt->tcp_offset += r; | ||
952 | } | ||
953 | if (r != len) { | ||
954 | /* Error when copying to the receive buffer, | ||
955 | * usually because we weren't able to allocate | ||
956 | * additional buffer pages. All we can do now | ||
957 | * is turn off XPRT_COPY_DATA, so the request | ||
958 | * will not receive any additional updates, | ||
959 | * and time out. | ||
960 | * Any remaining data from this record will | ||
961 | * be discarded. | ||
962 | */ | ||
963 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
964 | dprintk("RPC: XID %08x truncated request\n", | ||
965 | ntohl(xprt->tcp_xid)); | ||
966 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
967 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
968 | goto out; | ||
969 | } | ||
970 | |||
971 | dprintk("RPC: XID %08x read %Zd bytes\n", | ||
972 | ntohl(xprt->tcp_xid), r); | ||
973 | dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", | ||
974 | xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); | ||
975 | |||
976 | if (xprt->tcp_copied == req->rq_private_buf.buflen) | ||
977 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
978 | else if (xprt->tcp_offset == xprt->tcp_reclen) { | ||
979 | if (xprt->tcp_flags & XPRT_LAST_FRAG) | ||
980 | xprt->tcp_flags &= ~XPRT_COPY_DATA; | ||
981 | } | ||
982 | |||
983 | out: | ||
984 | if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { | ||
985 | dprintk("RPC: %4d received reply complete\n", | ||
986 | req->rq_task->tk_pid); | ||
987 | xprt_complete_rqst(xprt, req, xprt->tcp_copied); | ||
988 | } | ||
989 | spin_unlock(&xprt->sock_lock); | ||
990 | tcp_check_recm(xprt); | ||
991 | } | ||
992 | |||
993 | /* | ||
994 | * TCP discard extra bytes from a short read | ||
995 | */ | ||
996 | static inline void | ||
997 | tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) | ||
998 | { | ||
999 | size_t len; | ||
1000 | |||
1001 | len = xprt->tcp_reclen - xprt->tcp_offset; | ||
1002 | if (len > desc->count) | ||
1003 | len = desc->count; | ||
1004 | desc->count -= len; | ||
1005 | desc->offset += len; | ||
1006 | xprt->tcp_offset += len; | ||
1007 | dprintk("RPC: discarded %Zu bytes\n", len); | ||
1008 | tcp_check_recm(xprt); | ||
1009 | } | ||
1010 | |||
1011 | /* | ||
1012 | * TCP record receive routine | ||
1013 | * We first have to grab the record marker, then the XID, then the data. | ||
1014 | */ | 625 | */ |
1015 | static int | 626 | void xprt_update_rtt(struct rpc_task *task) |
1016 | tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, | ||
1017 | unsigned int offset, size_t len) | ||
1018 | { | ||
1019 | struct rpc_xprt *xprt = rd_desc->arg.data; | ||
1020 | skb_reader_t desc = { | ||
1021 | .skb = skb, | ||
1022 | .offset = offset, | ||
1023 | .count = len, | ||
1024 | .csum = 0 | ||
1025 | }; | ||
1026 | |||
1027 | dprintk("RPC: tcp_data_recv\n"); | ||
1028 | do { | ||
1029 | /* Read in a new fragment marker if necessary */ | ||
1030 | /* Can we ever really expect to get completely empty fragments? */ | ||
1031 | if (xprt->tcp_flags & XPRT_COPY_RECM) { | ||
1032 | tcp_read_fraghdr(xprt, &desc); | ||
1033 | continue; | ||
1034 | } | ||
1035 | /* Read in the xid if necessary */ | ||
1036 | if (xprt->tcp_flags & XPRT_COPY_XID) { | ||
1037 | tcp_read_xid(xprt, &desc); | ||
1038 | continue; | ||
1039 | } | ||
1040 | /* Read in the request data */ | ||
1041 | if (xprt->tcp_flags & XPRT_COPY_DATA) { | ||
1042 | tcp_read_request(xprt, &desc); | ||
1043 | continue; | ||
1044 | } | ||
1045 | /* Skip over any trailing bytes on short reads */ | ||
1046 | tcp_read_discard(xprt, &desc); | ||
1047 | } while (desc.count); | ||
1048 | dprintk("RPC: tcp_data_recv done\n"); | ||
1049 | return len - desc.count; | ||
1050 | } | ||
1051 | |||
1052 | static void tcp_data_ready(struct sock *sk, int bytes) | ||
1053 | { | 627 | { |
1054 | struct rpc_xprt *xprt; | 628 | struct rpc_rqst *req = task->tk_rqstp; |
1055 | read_descriptor_t rd_desc; | 629 | struct rpc_rtt *rtt = task->tk_client->cl_rtt; |
1056 | 630 | unsigned timer = task->tk_msg.rpc_proc->p_timer; | |
1057 | read_lock(&sk->sk_callback_lock); | ||
1058 | dprintk("RPC: tcp_data_ready...\n"); | ||
1059 | if (!(xprt = xprt_from_sock(sk))) { | ||
1060 | printk("RPC: tcp_data_ready socket info not found!\n"); | ||
1061 | goto out; | ||
1062 | } | ||
1063 | if (xprt->shutdown) | ||
1064 | goto out; | ||
1065 | |||
1066 | /* We use rd_desc to pass struct xprt to tcp_data_recv */ | ||
1067 | rd_desc.arg.data = xprt; | ||
1068 | rd_desc.count = 65536; | ||
1069 | tcp_read_sock(sk, &rd_desc, tcp_data_recv); | ||
1070 | out: | ||
1071 | read_unlock(&sk->sk_callback_lock); | ||
1072 | } | ||
1073 | |||
1074 | static void | ||
1075 | tcp_state_change(struct sock *sk) | ||
1076 | { | ||
1077 | struct rpc_xprt *xprt; | ||
1078 | 631 | ||
1079 | read_lock(&sk->sk_callback_lock); | 632 | if (timer) { |
1080 | if (!(xprt = xprt_from_sock(sk))) | 633 | if (req->rq_ntrans == 1) |
1081 | goto out; | 634 | rpc_update_rtt(rtt, timer, |
1082 | dprintk("RPC: tcp_state_change client %p...\n", xprt); | 635 | (long)jiffies - req->rq_xtime); |
1083 | dprintk("RPC: state %x conn %d dead %d zapped %d\n", | 636 | rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); |
1084 | sk->sk_state, xprt_connected(xprt), | ||
1085 | sock_flag(sk, SOCK_DEAD), | ||
1086 | sock_flag(sk, SOCK_ZAPPED)); | ||
1087 | |||
1088 | switch (sk->sk_state) { | ||
1089 | case TCP_ESTABLISHED: | ||
1090 | spin_lock_bh(&xprt->sock_lock); | ||
1091 | if (!xprt_test_and_set_connected(xprt)) { | ||
1092 | /* Reset TCP record info */ | ||
1093 | xprt->tcp_offset = 0; | ||
1094 | xprt->tcp_reclen = 0; | ||
1095 | xprt->tcp_copied = 0; | ||
1096 | xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; | ||
1097 | rpc_wake_up(&xprt->pending); | ||
1098 | } | ||
1099 | spin_unlock_bh(&xprt->sock_lock); | ||
1100 | break; | ||
1101 | case TCP_SYN_SENT: | ||
1102 | case TCP_SYN_RECV: | ||
1103 | break; | ||
1104 | default: | ||
1105 | xprt_disconnect(xprt); | ||
1106 | break; | ||
1107 | } | 637 | } |
1108 | out: | ||
1109 | read_unlock(&sk->sk_callback_lock); | ||
1110 | } | 638 | } |
1111 | 639 | ||
1112 | /* | 640 | /** |
1113 | * Called when more output buffer space is available for this socket. | 641 | * xprt_complete_rqst - called when reply processing is complete |
1114 | * We try not to wake our writers until they can make "significant" | 642 | * @task: RPC request that recently completed |
1115 | * progress, otherwise we'll waste resources thrashing sock_sendmsg | 643 | * @copied: actual number of bytes received from the transport |
1116 | * with a bunch of small requests. | 644 | * |
645 | * Caller holds transport lock. | ||
1117 | */ | 646 | */ |
1118 | static void | 647 | void xprt_complete_rqst(struct rpc_task *task, int copied) |
1119 | xprt_write_space(struct sock *sk) | ||
1120 | { | 648 | { |
1121 | struct rpc_xprt *xprt; | 649 | struct rpc_rqst *req = task->tk_rqstp; |
1122 | struct socket *sock; | ||
1123 | |||
1124 | read_lock(&sk->sk_callback_lock); | ||
1125 | if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) | ||
1126 | goto out; | ||
1127 | if (xprt->shutdown) | ||
1128 | goto out; | ||
1129 | |||
1130 | /* Wait until we have enough socket memory */ | ||
1131 | if (xprt->stream) { | ||
1132 | /* from net/core/stream.c:sk_stream_write_space */ | ||
1133 | if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) | ||
1134 | goto out; | ||
1135 | } else { | ||
1136 | /* from net/core/sock.c:sock_def_write_space */ | ||
1137 | if (!sock_writeable(sk)) | ||
1138 | goto out; | ||
1139 | } | ||
1140 | 650 | ||
1141 | if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) | 651 | dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", |
1142 | goto out; | 652 | task->tk_pid, ntohl(req->rq_xid), copied); |
1143 | 653 | ||
1144 | spin_lock_bh(&xprt->sock_lock); | 654 | list_del_init(&req->rq_list); |
1145 | if (xprt->snd_task) | 655 | req->rq_received = req->rq_private_buf.len = copied; |
1146 | rpc_wake_up_task(xprt->snd_task); | 656 | rpc_wake_up_task(task); |
1147 | spin_unlock_bh(&xprt->sock_lock); | ||
1148 | out: | ||
1149 | read_unlock(&sk->sk_callback_lock); | ||
1150 | } | 657 | } |
1151 | 658 | ||
1152 | /* | 659 | static void xprt_timer(struct rpc_task *task) |
1153 | * RPC receive timeout handler. | ||
1154 | */ | ||
1155 | static void | ||
1156 | xprt_timer(struct rpc_task *task) | ||
1157 | { | 660 | { |
1158 | struct rpc_rqst *req = task->tk_rqstp; | 661 | struct rpc_rqst *req = task->tk_rqstp; |
1159 | struct rpc_xprt *xprt = req->rq_xprt; | 662 | struct rpc_xprt *xprt = req->rq_xprt; |
1160 | 663 | ||
1161 | spin_lock(&xprt->sock_lock); | 664 | dprintk("RPC: %4d xprt_timer\n", task->tk_pid); |
1162 | if (req->rq_received) | ||
1163 | goto out; | ||
1164 | |||
1165 | xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); | ||
1166 | __xprt_put_cong(xprt, req); | ||
1167 | 665 | ||
1168 | dprintk("RPC: %4d xprt_timer (%s request)\n", | 666 | spin_lock(&xprt->transport_lock); |
1169 | task->tk_pid, req ? "pending" : "backlogged"); | 667 | if (!req->rq_received) { |
1170 | 668 | if (xprt->ops->timer) | |
1171 | task->tk_status = -ETIMEDOUT; | 669 | xprt->ops->timer(task); |
1172 | out: | 670 | task->tk_status = -ETIMEDOUT; |
671 | } | ||
1173 | task->tk_timeout = 0; | 672 | task->tk_timeout = 0; |
1174 | rpc_wake_up_task(task); | 673 | rpc_wake_up_task(task); |
1175 | spin_unlock(&xprt->sock_lock); | 674 | spin_unlock(&xprt->transport_lock); |
1176 | } | 675 | } |
1177 | 676 | ||
1178 | /* | 677 | /** |
1179 | * Place the actual RPC call. | 678 | * xprt_prepare_transmit - reserve the transport before sending a request |
1180 | * We have to copy the iovec because sendmsg fiddles with its contents. | 679 | * @task: RPC task about to send a request |
680 | * | ||
1181 | */ | 681 | */ |
1182 | int | 682 | int xprt_prepare_transmit(struct rpc_task *task) |
1183 | xprt_prepare_transmit(struct rpc_task *task) | ||
1184 | { | 683 | { |
1185 | struct rpc_rqst *req = task->tk_rqstp; | 684 | struct rpc_rqst *req = task->tk_rqstp; |
1186 | struct rpc_xprt *xprt = req->rq_xprt; | 685 | struct rpc_xprt *xprt = req->rq_xprt; |
@@ -1191,12 +690,12 @@ xprt_prepare_transmit(struct rpc_task *task) | |||
1191 | if (xprt->shutdown) | 690 | if (xprt->shutdown) |
1192 | return -EIO; | 691 | return -EIO; |
1193 | 692 | ||
1194 | spin_lock_bh(&xprt->sock_lock); | 693 | spin_lock_bh(&xprt->transport_lock); |
1195 | if (req->rq_received && !req->rq_bytes_sent) { | 694 | if (req->rq_received && !req->rq_bytes_sent) { |
1196 | err = req->rq_received; | 695 | err = req->rq_received; |
1197 | goto out_unlock; | 696 | goto out_unlock; |
1198 | } | 697 | } |
1199 | if (!__xprt_lock_write(xprt, task)) { | 698 | if (!xprt->ops->reserve_xprt(task)) { |
1200 | err = -EAGAIN; | 699 | err = -EAGAIN; |
1201 | goto out_unlock; | 700 | goto out_unlock; |
1202 | } | 701 | } |
@@ -1206,39 +705,42 @@ xprt_prepare_transmit(struct rpc_task *task) | |||
1206 | goto out_unlock; | 705 | goto out_unlock; |
1207 | } | 706 | } |
1208 | out_unlock: | 707 | out_unlock: |
1209 | spin_unlock_bh(&xprt->sock_lock); | 708 | spin_unlock_bh(&xprt->transport_lock); |
1210 | return err; | 709 | return err; |
1211 | } | 710 | } |
1212 | 711 | ||
1213 | void | 712 | void |
1214 | xprt_transmit(struct rpc_task *task) | 713 | xprt_abort_transmit(struct rpc_task *task) |
714 | { | ||
715 | struct rpc_xprt *xprt = task->tk_xprt; | ||
716 | |||
717 | xprt_release_write(xprt, task); | ||
718 | } | ||
719 | |||
720 | /** | ||
721 | * xprt_transmit - send an RPC request on a transport | ||
722 | * @task: controlling RPC task | ||
723 | * | ||
724 | * We have to copy the iovec because sendmsg fiddles with its contents. | ||
725 | */ | ||
726 | void xprt_transmit(struct rpc_task *task) | ||
1215 | { | 727 | { |
1216 | struct rpc_clnt *clnt = task->tk_client; | ||
1217 | struct rpc_rqst *req = task->tk_rqstp; | 728 | struct rpc_rqst *req = task->tk_rqstp; |
1218 | struct rpc_xprt *xprt = req->rq_xprt; | 729 | struct rpc_xprt *xprt = req->rq_xprt; |
1219 | int status, retry = 0; | 730 | int status; |
1220 | |||
1221 | 731 | ||
1222 | dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); | 732 | dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); |
1223 | 733 | ||
1224 | /* set up everything as needed. */ | ||
1225 | /* Write the record marker */ | ||
1226 | if (xprt->stream) { | ||
1227 | u32 *marker = req->rq_svec[0].iov_base; | ||
1228 | |||
1229 | *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); | ||
1230 | } | ||
1231 | |||
1232 | smp_rmb(); | 734 | smp_rmb(); |
1233 | if (!req->rq_received) { | 735 | if (!req->rq_received) { |
1234 | if (list_empty(&req->rq_list)) { | 736 | if (list_empty(&req->rq_list)) { |
1235 | spin_lock_bh(&xprt->sock_lock); | 737 | spin_lock_bh(&xprt->transport_lock); |
1236 | /* Update the softirq receive buffer */ | 738 | /* Update the softirq receive buffer */ |
1237 | memcpy(&req->rq_private_buf, &req->rq_rcv_buf, | 739 | memcpy(&req->rq_private_buf, &req->rq_rcv_buf, |
1238 | sizeof(req->rq_private_buf)); | 740 | sizeof(req->rq_private_buf)); |
1239 | /* Add request to the receive list */ | 741 | /* Add request to the receive list */ |
1240 | list_add_tail(&req->rq_list, &xprt->recv); | 742 | list_add_tail(&req->rq_list, &xprt->recv); |
1241 | spin_unlock_bh(&xprt->sock_lock); | 743 | spin_unlock_bh(&xprt->transport_lock); |
1242 | xprt_reset_majortimeo(req); | 744 | xprt_reset_majortimeo(req); |
1243 | /* Turn off autodisconnect */ | 745 | /* Turn off autodisconnect */ |
1244 | del_singleshot_timer_sync(&xprt->timer); | 746 | del_singleshot_timer_sync(&xprt->timer); |
@@ -1246,40 +748,19 @@ xprt_transmit(struct rpc_task *task) | |||
1246 | } else if (!req->rq_bytes_sent) | 748 | } else if (!req->rq_bytes_sent) |
1247 | return; | 749 | return; |
1248 | 750 | ||
1249 | /* Continue transmitting the packet/record. We must be careful | 751 | status = xprt->ops->send_request(task); |
1250 | * to cope with writespace callbacks arriving _after_ we have | 752 | if (status == 0) { |
1251 | * called xprt_sendmsg(). | 753 | dprintk("RPC: %4d xmit complete\n", task->tk_pid); |
1252 | */ | 754 | spin_lock_bh(&xprt->transport_lock); |
1253 | while (1) { | 755 | xprt->ops->set_retrans_timeout(task); |
1254 | req->rq_xtime = jiffies; | 756 | /* Don't race with disconnect */ |
1255 | status = xprt_sendmsg(xprt, req); | 757 | if (!xprt_connected(xprt)) |
1256 | 758 | task->tk_status = -ENOTCONN; | |
1257 | if (status < 0) | 759 | else if (!req->rq_received) |
1258 | break; | 760 | rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); |
1259 | 761 | xprt->ops->release_xprt(xprt, task); | |
1260 | if (xprt->stream) { | 762 | spin_unlock_bh(&xprt->transport_lock); |
1261 | req->rq_bytes_sent += status; | 763 | return; |
1262 | |||
1263 | /* If we've sent the entire packet, immediately | ||
1264 | * reset the count of bytes sent. */ | ||
1265 | if (req->rq_bytes_sent >= req->rq_slen) { | ||
1266 | req->rq_bytes_sent = 0; | ||
1267 | goto out_receive; | ||
1268 | } | ||
1269 | } else { | ||
1270 | if (status >= req->rq_slen) | ||
1271 | goto out_receive; | ||
1272 | status = -EAGAIN; | ||
1273 | break; | ||
1274 | } | ||
1275 | |||
1276 | dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", | ||
1277 | task->tk_pid, req->rq_slen - req->rq_bytes_sent, | ||
1278 | req->rq_slen); | ||
1279 | |||
1280 | status = -EAGAIN; | ||
1281 | if (retry++ > 50) | ||
1282 | break; | ||
1283 | } | 764 | } |
1284 | 765 | ||
1285 | /* Note: at this point, task->tk_sleeping has not yet been set, | 766 | /* Note: at this point, task->tk_sleeping has not yet been set, |
@@ -1289,60 +770,19 @@ xprt_transmit(struct rpc_task *task) | |||
1289 | task->tk_status = status; | 770 | task->tk_status = status; |
1290 | 771 | ||
1291 | switch (status) { | 772 | switch (status) { |
1292 | case -EAGAIN: | ||
1293 | if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { | ||
1294 | /* Protect against races with xprt_write_space */ | ||
1295 | spin_lock_bh(&xprt->sock_lock); | ||
1296 | /* Don't race with disconnect */ | ||
1297 | if (!xprt_connected(xprt)) | ||
1298 | task->tk_status = -ENOTCONN; | ||
1299 | else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { | ||
1300 | task->tk_timeout = req->rq_timeout; | ||
1301 | rpc_sleep_on(&xprt->pending, task, NULL, NULL); | ||
1302 | } | ||
1303 | spin_unlock_bh(&xprt->sock_lock); | ||
1304 | return; | ||
1305 | } | ||
1306 | /* Keep holding the socket if it is blocked */ | ||
1307 | rpc_delay(task, HZ>>4); | ||
1308 | return; | ||
1309 | case -ECONNREFUSED: | 773 | case -ECONNREFUSED: |
1310 | task->tk_timeout = RPC_REESTABLISH_TIMEOUT; | ||
1311 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); | 774 | rpc_sleep_on(&xprt->sending, task, NULL, NULL); |
775 | case -EAGAIN: | ||
1312 | case -ENOTCONN: | 776 | case -ENOTCONN: |
1313 | return; | 777 | return; |
1314 | default: | 778 | default: |
1315 | if (xprt->stream) | 779 | break; |
1316 | xprt_disconnect(xprt); | ||
1317 | } | 780 | } |
1318 | xprt_release_write(xprt, task); | 781 | xprt_release_write(xprt, task); |
1319 | return; | 782 | return; |
1320 | out_receive: | ||
1321 | dprintk("RPC: %4d xmit complete\n", task->tk_pid); | ||
1322 | /* Set the task's receive timeout value */ | ||
1323 | spin_lock_bh(&xprt->sock_lock); | ||
1324 | if (!xprt->nocong) { | ||
1325 | int timer = task->tk_msg.rpc_proc->p_timer; | ||
1326 | task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); | ||
1327 | task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; | ||
1328 | if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) | ||
1329 | task->tk_timeout = xprt->timeout.to_maxval; | ||
1330 | } else | ||
1331 | task->tk_timeout = req->rq_timeout; | ||
1332 | /* Don't race with disconnect */ | ||
1333 | if (!xprt_connected(xprt)) | ||
1334 | task->tk_status = -ENOTCONN; | ||
1335 | else if (!req->rq_received) | ||
1336 | rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); | ||
1337 | __xprt_release_write(xprt, task); | ||
1338 | spin_unlock_bh(&xprt->sock_lock); | ||
1339 | } | 783 | } |
1340 | 784 | ||
1341 | /* | 785 | static inline void do_xprt_reserve(struct rpc_task *task) |
1342 | * Reserve an RPC call slot. | ||
1343 | */ | ||
1344 | static inline void | ||
1345 | do_xprt_reserve(struct rpc_task *task) | ||
1346 | { | 786 | { |
1347 | struct rpc_xprt *xprt = task->tk_xprt; | 787 | struct rpc_xprt *xprt = task->tk_xprt; |
1348 | 788 | ||
@@ -1362,22 +802,25 @@ do_xprt_reserve(struct rpc_task *task) | |||
1362 | rpc_sleep_on(&xprt->backlog, task, NULL, NULL); | 802 | rpc_sleep_on(&xprt->backlog, task, NULL, NULL); |
1363 | } | 803 | } |
1364 | 804 | ||
1365 | void | 805 | /** |
1366 | xprt_reserve(struct rpc_task *task) | 806 | * xprt_reserve - allocate an RPC request slot |
807 | * @task: RPC task requesting a slot allocation | ||
808 | * | ||
809 | * If no more slots are available, place the task on the transport's | ||
810 | * backlog queue. | ||
811 | */ | ||
812 | void xprt_reserve(struct rpc_task *task) | ||
1367 | { | 813 | { |
1368 | struct rpc_xprt *xprt = task->tk_xprt; | 814 | struct rpc_xprt *xprt = task->tk_xprt; |
1369 | 815 | ||
1370 | task->tk_status = -EIO; | 816 | task->tk_status = -EIO; |
1371 | if (!xprt->shutdown) { | 817 | if (!xprt->shutdown) { |
1372 | spin_lock(&xprt->xprt_lock); | 818 | spin_lock(&xprt->reserve_lock); |
1373 | do_xprt_reserve(task); | 819 | do_xprt_reserve(task); |
1374 | spin_unlock(&xprt->xprt_lock); | 820 | spin_unlock(&xprt->reserve_lock); |
1375 | } | 821 | } |
1376 | } | 822 | } |
1377 | 823 | ||
1378 | /* | ||
1379 | * Allocate a 'unique' XID | ||
1380 | */ | ||
1381 | static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) | 824 | static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) |
1382 | { | 825 | { |
1383 | return xprt->xid++; | 826 | return xprt->xid++; |
@@ -1388,11 +831,7 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt) | |||
1388 | get_random_bytes(&xprt->xid, sizeof(xprt->xid)); | 831 | get_random_bytes(&xprt->xid, sizeof(xprt->xid)); |
1389 | } | 832 | } |
1390 | 833 | ||
1391 | /* | 834 | static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) |
1392 | * Initialize RPC request | ||
1393 | */ | ||
1394 | static void | ||
1395 | xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) | ||
1396 | { | 835 | { |
1397 | struct rpc_rqst *req = task->tk_rqstp; | 836 | struct rpc_rqst *req = task->tk_rqstp; |
1398 | 837 | ||
@@ -1400,128 +839,104 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) | |||
1400 | req->rq_task = task; | 839 | req->rq_task = task; |
1401 | req->rq_xprt = xprt; | 840 | req->rq_xprt = xprt; |
1402 | req->rq_xid = xprt_alloc_xid(xprt); | 841 | req->rq_xid = xprt_alloc_xid(xprt); |
842 | req->rq_release_snd_buf = NULL; | ||
1403 | dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, | 843 | dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, |
1404 | req, ntohl(req->rq_xid)); | 844 | req, ntohl(req->rq_xid)); |
1405 | } | 845 | } |
1406 | 846 | ||
1407 | /* | 847 | /** |
1408 | * Release an RPC call slot | 848 | * xprt_release - release an RPC request slot |
849 | * @task: task which is finished with the slot | ||
850 | * | ||
1409 | */ | 851 | */ |
1410 | void | 852 | void xprt_release(struct rpc_task *task) |
1411 | xprt_release(struct rpc_task *task) | ||
1412 | { | 853 | { |
1413 | struct rpc_xprt *xprt = task->tk_xprt; | 854 | struct rpc_xprt *xprt = task->tk_xprt; |
1414 | struct rpc_rqst *req; | 855 | struct rpc_rqst *req; |
1415 | 856 | ||
1416 | if (!(req = task->tk_rqstp)) | 857 | if (!(req = task->tk_rqstp)) |
1417 | return; | 858 | return; |
1418 | spin_lock_bh(&xprt->sock_lock); | 859 | spin_lock_bh(&xprt->transport_lock); |
1419 | __xprt_release_write(xprt, task); | 860 | xprt->ops->release_xprt(xprt, task); |
1420 | __xprt_put_cong(xprt, req); | 861 | if (xprt->ops->release_request) |
862 | xprt->ops->release_request(task); | ||
1421 | if (!list_empty(&req->rq_list)) | 863 | if (!list_empty(&req->rq_list)) |
1422 | list_del(&req->rq_list); | 864 | list_del(&req->rq_list); |
1423 | xprt->last_used = jiffies; | 865 | xprt->last_used = jiffies; |
1424 | if (list_empty(&xprt->recv) && !xprt->shutdown) | 866 | if (list_empty(&xprt->recv) && !xprt->shutdown) |
1425 | mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); | 867 | mod_timer(&xprt->timer, |
1426 | spin_unlock_bh(&xprt->sock_lock); | 868 | xprt->last_used + xprt->idle_timeout); |
869 | spin_unlock_bh(&xprt->transport_lock); | ||
1427 | task->tk_rqstp = NULL; | 870 | task->tk_rqstp = NULL; |
871 | if (req->rq_release_snd_buf) | ||
872 | req->rq_release_snd_buf(req); | ||
1428 | memset(req, 0, sizeof(*req)); /* mark unused */ | 873 | memset(req, 0, sizeof(*req)); /* mark unused */ |
1429 | 874 | ||
1430 | dprintk("RPC: %4d release request %p\n", task->tk_pid, req); | 875 | dprintk("RPC: %4d release request %p\n", task->tk_pid, req); |
1431 | 876 | ||
1432 | spin_lock(&xprt->xprt_lock); | 877 | spin_lock(&xprt->reserve_lock); |
1433 | list_add(&req->rq_list, &xprt->free); | 878 | list_add(&req->rq_list, &xprt->free); |
1434 | xprt_clear_backlog(xprt); | 879 | rpc_wake_up_next(&xprt->backlog); |
1435 | spin_unlock(&xprt->xprt_lock); | 880 | spin_unlock(&xprt->reserve_lock); |
1436 | } | ||
1437 | |||
1438 | /* | ||
1439 | * Set default timeout parameters | ||
1440 | */ | ||
1441 | static void | ||
1442 | xprt_default_timeout(struct rpc_timeout *to, int proto) | ||
1443 | { | ||
1444 | if (proto == IPPROTO_UDP) | ||
1445 | xprt_set_timeout(to, 5, 5 * HZ); | ||
1446 | else | ||
1447 | xprt_set_timeout(to, 5, 60 * HZ); | ||
1448 | } | 881 | } |
1449 | 882 | ||
1450 | /* | 883 | /** |
1451 | * Set constant timeout | 884 | * xprt_set_timeout - set constant RPC timeout |
885 | * @to: RPC timeout parameters to set up | ||
886 | * @retr: number of retries | ||
887 | * @incr: amount of increase after each retry | ||
888 | * | ||
1452 | */ | 889 | */ |
1453 | void | 890 | void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) |
1454 | xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) | ||
1455 | { | 891 | { |
1456 | to->to_initval = | 892 | to->to_initval = |
1457 | to->to_increment = incr; | 893 | to->to_increment = incr; |
1458 | to->to_maxval = incr * retr; | 894 | to->to_maxval = to->to_initval + (incr * retr); |
1459 | to->to_retries = retr; | 895 | to->to_retries = retr; |
1460 | to->to_exponential = 0; | 896 | to->to_exponential = 0; |
1461 | } | 897 | } |
1462 | 898 | ||
1463 | unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; | 899 | static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) |
1464 | unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; | ||
1465 | |||
1466 | /* | ||
1467 | * Initialize an RPC client | ||
1468 | */ | ||
1469 | static struct rpc_xprt * | ||
1470 | xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | ||
1471 | { | 900 | { |
901 | int result; | ||
1472 | struct rpc_xprt *xprt; | 902 | struct rpc_xprt *xprt; |
1473 | unsigned int entries; | ||
1474 | size_t slot_table_size; | ||
1475 | struct rpc_rqst *req; | 903 | struct rpc_rqst *req; |
1476 | 904 | ||
1477 | dprintk("RPC: setting up %s transport...\n", | ||
1478 | proto == IPPROTO_UDP? "UDP" : "TCP"); | ||
1479 | |||
1480 | entries = (proto == IPPROTO_TCP)? | ||
1481 | xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; | ||
1482 | |||
1483 | if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) | 905 | if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) |
1484 | return ERR_PTR(-ENOMEM); | 906 | return ERR_PTR(-ENOMEM); |
1485 | memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ | 907 | memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ |
1486 | xprt->max_reqs = entries; | ||
1487 | slot_table_size = entries * sizeof(xprt->slot[0]); | ||
1488 | xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); | ||
1489 | if (xprt->slot == NULL) { | ||
1490 | kfree(xprt); | ||
1491 | return ERR_PTR(-ENOMEM); | ||
1492 | } | ||
1493 | memset(xprt->slot, 0, slot_table_size); | ||
1494 | 908 | ||
1495 | xprt->addr = *ap; | 909 | xprt->addr = *ap; |
1496 | xprt->prot = proto; | 910 | |
1497 | xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; | 911 | switch (proto) { |
1498 | if (xprt->stream) { | 912 | case IPPROTO_UDP: |
1499 | xprt->cwnd = RPC_MAXCWND(xprt); | 913 | result = xs_setup_udp(xprt, to); |
1500 | xprt->nocong = 1; | 914 | break; |
1501 | xprt->max_payload = (1U << 31) - 1; | 915 | case IPPROTO_TCP: |
1502 | } else { | 916 | result = xs_setup_tcp(xprt, to); |
1503 | xprt->cwnd = RPC_INITCWND; | 917 | break; |
1504 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); | 918 | default: |
919 | printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", | ||
920 | proto); | ||
921 | result = -EIO; | ||
922 | break; | ||
923 | } | ||
924 | if (result) { | ||
925 | kfree(xprt); | ||
926 | return ERR_PTR(result); | ||
1505 | } | 927 | } |
1506 | spin_lock_init(&xprt->sock_lock); | 928 | |
1507 | spin_lock_init(&xprt->xprt_lock); | 929 | spin_lock_init(&xprt->transport_lock); |
1508 | init_waitqueue_head(&xprt->cong_wait); | 930 | spin_lock_init(&xprt->reserve_lock); |
1509 | 931 | ||
1510 | INIT_LIST_HEAD(&xprt->free); | 932 | INIT_LIST_HEAD(&xprt->free); |
1511 | INIT_LIST_HEAD(&xprt->recv); | 933 | INIT_LIST_HEAD(&xprt->recv); |
1512 | INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); | 934 | INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt); |
1513 | INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); | ||
1514 | init_timer(&xprt->timer); | 935 | init_timer(&xprt->timer); |
1515 | xprt->timer.function = xprt_init_autodisconnect; | 936 | xprt->timer.function = xprt_init_autodisconnect; |
1516 | xprt->timer.data = (unsigned long) xprt; | 937 | xprt->timer.data = (unsigned long) xprt; |
1517 | xprt->last_used = jiffies; | 938 | xprt->last_used = jiffies; |
1518 | xprt->port = XPRT_MAX_RESVPORT; | 939 | xprt->cwnd = RPC_INITCWND; |
1519 | |||
1520 | /* Set timeout parameters */ | ||
1521 | if (to) { | ||
1522 | xprt->timeout = *to; | ||
1523 | } else | ||
1524 | xprt_default_timeout(&xprt->timeout, xprt->prot); | ||
1525 | 940 | ||
1526 | rpc_init_wait_queue(&xprt->pending, "xprt_pending"); | 941 | rpc_init_wait_queue(&xprt->pending, "xprt_pending"); |
1527 | rpc_init_wait_queue(&xprt->sending, "xprt_sending"); | 942 | rpc_init_wait_queue(&xprt->sending, "xprt_sending"); |
@@ -1529,139 +944,25 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) | |||
1529 | rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); | 944 | rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); |
1530 | 945 | ||
1531 | /* initialize free list */ | 946 | /* initialize free list */ |
1532 | for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) | 947 | for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) |
1533 | list_add(&req->rq_list, &xprt->free); | 948 | list_add(&req->rq_list, &xprt->free); |
1534 | 949 | ||
1535 | xprt_init_xid(xprt); | 950 | xprt_init_xid(xprt); |
1536 | 951 | ||
1537 | /* Check whether we want to use a reserved port */ | ||
1538 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | ||
1539 | |||
1540 | dprintk("RPC: created transport %p with %u slots\n", xprt, | 952 | dprintk("RPC: created transport %p with %u slots\n", xprt, |
1541 | xprt->max_reqs); | 953 | xprt->max_reqs); |
1542 | 954 | ||
1543 | return xprt; | 955 | return xprt; |
1544 | } | 956 | } |
1545 | 957 | ||
1546 | /* | 958 | /** |
1547 | * Bind to a reserved port | 959 | * xprt_create_proto - create an RPC client transport |
1548 | */ | 960 | * @proto: requested transport protocol |
1549 | static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) | 961 | * @sap: remote peer's address |
1550 | { | 962 | * @to: timeout parameters for new transport |
1551 | struct sockaddr_in myaddr = { | 963 | * |
1552 | .sin_family = AF_INET, | ||
1553 | }; | ||
1554 | int err, port; | ||
1555 | |||
1556 | /* Were we already bound to a given port? Try to reuse it */ | ||
1557 | port = xprt->port; | ||
1558 | do { | ||
1559 | myaddr.sin_port = htons(port); | ||
1560 | err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, | ||
1561 | sizeof(myaddr)); | ||
1562 | if (err == 0) { | ||
1563 | xprt->port = port; | ||
1564 | return 0; | ||
1565 | } | ||
1566 | if (--port == 0) | ||
1567 | port = XPRT_MAX_RESVPORT; | ||
1568 | } while (err == -EADDRINUSE && port != xprt->port); | ||
1569 | |||
1570 | printk("RPC: Can't bind to reserved port (%d).\n", -err); | ||
1571 | return err; | ||
1572 | } | ||
1573 | |||
1574 | static void | ||
1575 | xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) | ||
1576 | { | ||
1577 | struct sock *sk = sock->sk; | ||
1578 | |||
1579 | if (xprt->inet) | ||
1580 | return; | ||
1581 | |||
1582 | write_lock_bh(&sk->sk_callback_lock); | ||
1583 | sk->sk_user_data = xprt; | ||
1584 | xprt->old_data_ready = sk->sk_data_ready; | ||
1585 | xprt->old_state_change = sk->sk_state_change; | ||
1586 | xprt->old_write_space = sk->sk_write_space; | ||
1587 | if (xprt->prot == IPPROTO_UDP) { | ||
1588 | sk->sk_data_ready = udp_data_ready; | ||
1589 | sk->sk_no_check = UDP_CSUM_NORCV; | ||
1590 | xprt_set_connected(xprt); | ||
1591 | } else { | ||
1592 | tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ | ||
1593 | sk->sk_data_ready = tcp_data_ready; | ||
1594 | sk->sk_state_change = tcp_state_change; | ||
1595 | xprt_clear_connected(xprt); | ||
1596 | } | ||
1597 | sk->sk_write_space = xprt_write_space; | ||
1598 | |||
1599 | /* Reset to new socket */ | ||
1600 | xprt->sock = sock; | ||
1601 | xprt->inet = sk; | ||
1602 | write_unlock_bh(&sk->sk_callback_lock); | ||
1603 | |||
1604 | return; | ||
1605 | } | ||
1606 | |||
1607 | /* | ||
1608 | * Set socket buffer length | ||
1609 | */ | ||
1610 | void | ||
1611 | xprt_sock_setbufsize(struct rpc_xprt *xprt) | ||
1612 | { | ||
1613 | struct sock *sk = xprt->inet; | ||
1614 | |||
1615 | if (xprt->stream) | ||
1616 | return; | ||
1617 | if (xprt->rcvsize) { | ||
1618 | sk->sk_userlocks |= SOCK_RCVBUF_LOCK; | ||
1619 | sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; | ||
1620 | } | ||
1621 | if (xprt->sndsize) { | ||
1622 | sk->sk_userlocks |= SOCK_SNDBUF_LOCK; | ||
1623 | sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; | ||
1624 | sk->sk_write_space(sk); | ||
1625 | } | ||
1626 | } | ||
1627 | |||
1628 | /* | ||
1629 | * Datastream sockets are created here, but xprt_connect will create | ||
1630 | * and connect stream sockets. | ||
1631 | */ | ||
1632 | static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) | ||
1633 | { | ||
1634 | struct socket *sock; | ||
1635 | int type, err; | ||
1636 | |||
1637 | dprintk("RPC: xprt_create_socket(%s %d)\n", | ||
1638 | (proto == IPPROTO_UDP)? "udp" : "tcp", proto); | ||
1639 | |||
1640 | type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; | ||
1641 | |||
1642 | if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { | ||
1643 | printk("RPC: can't create socket (%d).\n", -err); | ||
1644 | return NULL; | ||
1645 | } | ||
1646 | |||
1647 | /* If the caller has the capability, bind to a reserved port */ | ||
1648 | if (resvport && xprt_bindresvport(xprt, sock) < 0) { | ||
1649 | printk("RPC: can't bind to reserved port.\n"); | ||
1650 | goto failed; | ||
1651 | } | ||
1652 | |||
1653 | return sock; | ||
1654 | |||
1655 | failed: | ||
1656 | sock_release(sock); | ||
1657 | return NULL; | ||
1658 | } | ||
1659 | |||
1660 | /* | ||
1661 | * Create an RPC client transport given the protocol and peer address. | ||
1662 | */ | 964 | */ |
1663 | struct rpc_xprt * | 965 | struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) |
1664 | xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) | ||
1665 | { | 966 | { |
1666 | struct rpc_xprt *xprt; | 967 | struct rpc_xprt *xprt; |
1667 | 968 | ||
@@ -1673,46 +974,26 @@ xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) | |||
1673 | return xprt; | 974 | return xprt; |
1674 | } | 975 | } |
1675 | 976 | ||
1676 | /* | 977 | static void xprt_shutdown(struct rpc_xprt *xprt) |
1677 | * Prepare for transport shutdown. | ||
1678 | */ | ||
1679 | static void | ||
1680 | xprt_shutdown(struct rpc_xprt *xprt) | ||
1681 | { | 978 | { |
1682 | xprt->shutdown = 1; | 979 | xprt->shutdown = 1; |
1683 | rpc_wake_up(&xprt->sending); | 980 | rpc_wake_up(&xprt->sending); |
1684 | rpc_wake_up(&xprt->resend); | 981 | rpc_wake_up(&xprt->resend); |
1685 | rpc_wake_up(&xprt->pending); | 982 | xprt_wake_pending_tasks(xprt, -EIO); |
1686 | rpc_wake_up(&xprt->backlog); | 983 | rpc_wake_up(&xprt->backlog); |
1687 | wake_up(&xprt->cong_wait); | ||
1688 | del_timer_sync(&xprt->timer); | 984 | del_timer_sync(&xprt->timer); |
1689 | |||
1690 | /* synchronously wait for connect worker to finish */ | ||
1691 | cancel_delayed_work(&xprt->sock_connect); | ||
1692 | flush_scheduled_work(); | ||
1693 | } | 985 | } |
1694 | 986 | ||
1695 | /* | 987 | /** |
1696 | * Clear the xprt backlog queue | 988 | * xprt_destroy - destroy an RPC transport, killing off all requests. |
1697 | */ | 989 | * @xprt: transport to destroy |
1698 | static int | 990 | * |
1699 | xprt_clear_backlog(struct rpc_xprt *xprt) { | ||
1700 | rpc_wake_up_next(&xprt->backlog); | ||
1701 | wake_up(&xprt->cong_wait); | ||
1702 | return 1; | ||
1703 | } | ||
1704 | |||
1705 | /* | ||
1706 | * Destroy an RPC transport, killing off all requests. | ||
1707 | */ | 991 | */ |
1708 | int | 992 | int xprt_destroy(struct rpc_xprt *xprt) |
1709 | xprt_destroy(struct rpc_xprt *xprt) | ||
1710 | { | 993 | { |
1711 | dprintk("RPC: destroying transport %p\n", xprt); | 994 | dprintk("RPC: destroying transport %p\n", xprt); |
1712 | xprt_shutdown(xprt); | 995 | xprt_shutdown(xprt); |
1713 | xprt_disconnect(xprt); | 996 | xprt->ops->destroy(xprt); |
1714 | xprt_close(xprt); | ||
1715 | kfree(xprt->slot); | ||
1716 | kfree(xprt); | 997 | kfree(xprt); |
1717 | 998 | ||
1718 | return 0; | 999 | return 0; |