aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2014-02-10 11:18:39 -0500
committerTrond Myklebust <trond.myklebust@primarydata.com>2014-02-11 14:01:20 -0500
commit2ea24497a1b30dd03dd42b873fa5097913587f4d (patch)
tree1e1b3d1c95e712dcce68be4d011620b46a6271c7
parent628356791b04ea988fee070f66a748a823d001bb (diff)
SUNRPC: RPC callbacks may be split across several TCP segments
Since TCP is a stream protocol, our callback read code needs to take into account the fact that RPC callbacks are not always confined to a single TCP segment. This patch adds support for multiple TCP segments by ensuring that we only remove the rpc_rqst structure from the 'free backchannel requests' list once the data has been completely received. We rely on the fact that TCP data is ordered for the duration of the connection. Reported-by: shaobingqing <shaobingqing@bwstor.com.cn> Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
-rw-r--r--include/linux/sunrpc/bc_xprt.h3
-rw-r--r--net/sunrpc/backchannel_rqst.c93
-rw-r--r--net/sunrpc/xprtsock.c28
3 files changed, 74 insertions, 50 deletions
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 969c0a671dbf..2ca67b55e0fe 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -32,7 +32,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32#include <linux/sunrpc/sched.h> 32#include <linux/sunrpc/sched.h>
33 33
34#ifdef CONFIG_SUNRPC_BACKCHANNEL 34#ifdef CONFIG_SUNRPC_BACKCHANNEL
35struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt); 35struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid);
36void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied);
36void xprt_free_bc_request(struct rpc_rqst *req); 37void xprt_free_bc_request(struct rpc_rqst *req);
37int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs); 38int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
38void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs); 39void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index e860d4f7ed2a..3513d559bc45 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -212,39 +212,23 @@ out:
212} 212}
213EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 213EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
214 214
215/* 215static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
216 * One or more rpc_rqst structure have been preallocated during the
217 * backchannel setup. Buffer space for the send and private XDR buffers
218 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
219 * to this request. Use xprt_free_bc_request to return it.
220 *
221 * We know that we're called in soft interrupt context, grab the spin_lock
222 * since there is no need to grab the bottom half spin_lock.
223 *
224 * Return an available rpc_rqst, otherwise NULL if non are available.
225 */
226struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt)
227{ 216{
228 struct rpc_rqst *req; 217 struct rpc_rqst *req = NULL;
229 218
230 dprintk("RPC: allocate a backchannel request\n"); 219 dprintk("RPC: allocate a backchannel request\n");
231 spin_lock(&xprt->bc_pa_lock); 220 if (list_empty(&xprt->bc_pa_list))
232 if (!list_empty(&xprt->bc_pa_list)) { 221 goto not_found;
233 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
234 rq_bc_pa_list);
235 list_del(&req->rq_bc_pa_list);
236 } else {
237 req = NULL;
238 }
239 spin_unlock(&xprt->bc_pa_lock);
240 222
241 if (req != NULL) { 223 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
242 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 224 rq_bc_pa_list);
243 req->rq_reply_bytes_recvd = 0; 225 req->rq_reply_bytes_recvd = 0;
244 req->rq_bytes_sent = 0; 226 req->rq_bytes_sent = 0;
245 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 227 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
246 sizeof(req->rq_private_buf)); 228 sizeof(req->rq_private_buf));
247 } 229 req->rq_xid = xid;
230 req->rq_connect_cookie = xprt->connect_cookie;
231not_found:
248 dprintk("RPC: backchannel req=%p\n", req); 232 dprintk("RPC: backchannel req=%p\n", req);
249 return req; 233 return req;
250} 234}
@@ -259,6 +243,7 @@ void xprt_free_bc_request(struct rpc_rqst *req)
259 243
260 dprintk("RPC: free backchannel req=%p\n", req); 244 dprintk("RPC: free backchannel req=%p\n", req);
261 245
246 req->rq_connect_cookie = xprt->connect_cookie - 1;
262 smp_mb__before_clear_bit(); 247 smp_mb__before_clear_bit();
263 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 248 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
264 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 249 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
@@ -281,7 +266,57 @@ void xprt_free_bc_request(struct rpc_rqst *req)
281 * may be reused by a new callback request. 266 * may be reused by a new callback request.
282 */ 267 */
283 spin_lock_bh(&xprt->bc_pa_lock); 268 spin_lock_bh(&xprt->bc_pa_lock);
284 list_add(&req->rq_bc_pa_list, &xprt->bc_pa_list); 269 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
285 spin_unlock_bh(&xprt->bc_pa_lock); 270 spin_unlock_bh(&xprt->bc_pa_lock);
286} 271}
287 272
273/*
274 * One or more rpc_rqst structure have been preallocated during the
275 * backchannel setup. Buffer space for the send and private XDR buffers
276 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
277 * to this request. Use xprt_free_bc_request to return it.
278 *
279 * We know that we're called in soft interrupt context, grab the spin_lock
280 * since there is no need to grab the bottom half spin_lock.
281 *
282 * Return an available rpc_rqst, otherwise NULL if non are available.
283 */
284struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
285{
286 struct rpc_rqst *req;
287
288 spin_lock(&xprt->bc_pa_lock);
289 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
290 if (req->rq_connect_cookie != xprt->connect_cookie)
291 continue;
292 if (req->rq_xid == xid)
293 goto found;
294 }
295 req = xprt_alloc_bc_request(xprt, xid);
296found:
297 spin_unlock(&xprt->bc_pa_lock);
298 return req;
299}
300
301/*
302 * Add callback request to callback list. The callback
303 * service sleeps on the sv_cb_waitq waiting for new
304 * requests. Wake it up after adding enqueing the
305 * request.
306 */
307void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
308{
309 struct rpc_xprt *xprt = req->rq_xprt;
310 struct svc_serv *bc_serv = xprt->bc_serv;
311
312 req->rq_private_buf.len = copied;
313 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
314
315 dprintk("RPC: add callback request to list\n");
316 spin_lock(&bc_serv->sv_cb_lock);
317 list_del(&req->rq_bc_pa_list);
318 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
319 wake_up(&bc_serv->sv_cb_waitq);
320 spin_unlock(&bc_serv->sv_cb_lock);
321}
322
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 0addefca8e77..966763d735e9 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1306,41 +1306,29 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1306 * If we're unable to obtain the rpc_rqst we schedule the closing of the 1306 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1307 * connection and return -1. 1307 * connection and return -1.
1308 */ 1308 */
1309static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, 1309static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1310 struct xdr_skb_reader *desc) 1310 struct xdr_skb_reader *desc)
1311{ 1311{
1312 struct sock_xprt *transport = 1312 struct sock_xprt *transport =
1313 container_of(xprt, struct sock_xprt, xprt); 1313 container_of(xprt, struct sock_xprt, xprt);
1314 struct rpc_rqst *req; 1314 struct rpc_rqst *req;
1315 1315
1316 req = xprt_alloc_bc_request(xprt); 1316 /* Look up and lock the request corresponding to the given XID */
1317 spin_lock(&xprt->transport_lock);
1318 req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1317 if (req == NULL) { 1319 if (req == NULL) {
1320 spin_unlock(&xprt->transport_lock);
1318 printk(KERN_WARNING "Callback slot table overflowed\n"); 1321 printk(KERN_WARNING "Callback slot table overflowed\n");
1319 xprt_force_disconnect(xprt); 1322 xprt_force_disconnect(xprt);
1320 return -1; 1323 return -1;
1321 } 1324 }
1322 1325
1323 req->rq_xid = transport->tcp_xid;
1324 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); 1326 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid));
1325 xs_tcp_read_common(xprt, desc, req); 1327 xs_tcp_read_common(xprt, desc, req);
1326 1328
1327 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { 1329 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1328 struct svc_serv *bc_serv = xprt->bc_serv; 1330 xprt_complete_bc_request(req, transport->tcp_copied);
1329 1331 spin_unlock(&xprt->transport_lock);
1330 /*
1331 * Add callback request to callback list. The callback
1332 * service sleeps on the sv_cb_waitq waiting for new
1333 * requests. Wake it up after adding enqueing the
1334 * request.
1335 */
1336 dprintk("RPC: add callback request to list\n");
1337 spin_lock(&bc_serv->sv_cb_lock);
1338 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1339 spin_unlock(&bc_serv->sv_cb_lock);
1340 wake_up(&bc_serv->sv_cb_waitq);
1341 }
1342
1343 req->rq_private_buf.len = transport->tcp_copied;
1344 1332
1345 return 0; 1333 return 0;
1346} 1334}