aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtsock.c
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@hammerspace.com>2018-09-14 14:32:45 -0400
committerTrond Myklebust <trond.myklebust@hammerspace.com>2018-09-30 15:35:16 -0400
commit550aebfe1c573518c35ae85d6ffbdc2d44c92703 (patch)
tree41d2dbb649d488986e78ee4290a1fa0313765133 /net/sunrpc/xprtsock.c
parentc50b8ee02f1cb9506ac06d22e8414e9fef7d6890 (diff)
SUNRPC: Allow AF_LOCAL sockets to use the generic stream receive
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Diffstat (limited to 'net/sunrpc/xprtsock.c')
-rw-r--r--net/sunrpc/xprtsock.c137
1 files changed, 16 insertions, 121 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 55df1fadab27..90d4c92177b7 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -670,6 +670,17 @@ static void xs_stream_data_receive_workfn(struct work_struct *work)
670 xs_stream_data_receive(transport); 670 xs_stream_data_receive(transport);
671} 671}
672 672
673static void
674xs_stream_reset_connect(struct sock_xprt *transport)
675{
676 transport->recv.offset = 0;
677 transport->recv.len = 0;
678 transport->recv.copied = 0;
679 transport->xmit.offset = 0;
680 transport->xprt.stat.connect_count++;
681 transport->xprt.stat.connect_start = jiffies;
682}
683
673#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 684#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
674 685
675static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) 686static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@@ -1266,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt)
1266 module_put(THIS_MODULE); 1277 module_put(THIS_MODULE);
1267} 1278}
1268 1279
1269static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
1270{
1271 struct xdr_skb_reader desc = {
1272 .skb = skb,
1273 .offset = sizeof(rpc_fraghdr),
1274 .count = skb->len - sizeof(rpc_fraghdr),
1275 };
1276
1277 if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
1278 return -1;
1279 if (desc.count)
1280 return -1;
1281 return 0;
1282}
1283
1284/**
1285 * xs_local_data_read_skb
1286 * @xprt: transport
1287 * @sk: socket
1288 * @skb: skbuff
1289 *
1290 * Currently this assumes we can read the whole reply in a single gulp.
1291 */
1292static void xs_local_data_read_skb(struct rpc_xprt *xprt,
1293 struct sock *sk,
1294 struct sk_buff *skb)
1295{
1296 struct rpc_task *task;
1297 struct rpc_rqst *rovr;
1298 int repsize, copied;
1299 u32 _xid;
1300 __be32 *xp;
1301
1302 repsize = skb->len - sizeof(rpc_fraghdr);
1303 if (repsize < 4) {
1304 dprintk("RPC: impossible RPC reply size %d\n", repsize);
1305 return;
1306 }
1307
1308 /* Copy the XID from the skb... */
1309 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
1310 if (xp == NULL)
1311 return;
1312
1313 /* Look up and lock the request corresponding to the given XID */
1314 spin_lock(&xprt->queue_lock);
1315 rovr = xprt_lookup_rqst(xprt, *xp);
1316 if (!rovr)
1317 goto out_unlock;
1318 xprt_pin_rqst(rovr);
1319 spin_unlock(&xprt->queue_lock);
1320 task = rovr->rq_task;
1321
1322 copied = rovr->rq_private_buf.buflen;
1323 if (copied > repsize)
1324 copied = repsize;
1325
1326 if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1327 dprintk("RPC: sk_buff copy failed\n");
1328 spin_lock(&xprt->queue_lock);
1329 goto out_unpin;
1330 }
1331
1332 spin_lock(&xprt->queue_lock);
1333 xprt_complete_rqst(task, copied);
1334out_unpin:
1335 xprt_unpin_rqst(rovr);
1336 out_unlock:
1337 spin_unlock(&xprt->queue_lock);
1338}
1339
1340static void xs_local_data_receive(struct sock_xprt *transport)
1341{
1342 struct sk_buff *skb;
1343 struct sock *sk;
1344 int err;
1345
1346restart:
1347 mutex_lock(&transport->recv_mutex);
1348 sk = transport->inet;
1349 if (sk == NULL)
1350 goto out;
1351 for (;;) {
1352 skb = skb_recv_datagram(sk, 0, 1, &err);
1353 if (skb != NULL) {
1354 xs_local_data_read_skb(&transport->xprt, sk, skb);
1355 skb_free_datagram(sk, skb);
1356 continue;
1357 }
1358 if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
1359 break;
1360 if (need_resched()) {
1361 mutex_unlock(&transport->recv_mutex);
1362 cond_resched();
1363 goto restart;
1364 }
1365 }
1366out:
1367 mutex_unlock(&transport->recv_mutex);
1368}
1369
1370static void xs_local_data_receive_workfn(struct work_struct *work)
1371{
1372 struct sock_xprt *transport =
1373 container_of(work, struct sock_xprt, recv_worker);
1374 xs_local_data_receive(transport);
1375}
1376
1377/** 1280/**
1378 * xs_udp_data_read_skb - receive callback for UDP sockets 1281 * xs_udp_data_read_skb - receive callback for UDP sockets
1379 * @xprt: transport 1282 * @xprt: transport
@@ -1974,11 +1877,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1974 write_unlock_bh(&sk->sk_callback_lock); 1877 write_unlock_bh(&sk->sk_callback_lock);
1975 } 1878 }
1976 1879
1977 transport->xmit.offset = 0; 1880 xs_stream_reset_connect(transport);
1978 1881
1979 /* Tell the socket layer to start connecting... */
1980 xprt->stat.connect_count++;
1981 xprt->stat.connect_start = jiffies;
1982 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); 1882 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1983} 1883}
1984 1884
@@ -2335,14 +2235,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2335 xs_set_memalloc(xprt); 2235 xs_set_memalloc(xprt);
2336 2236
2337 /* Reset TCP record info */ 2237 /* Reset TCP record info */
2338 transport->recv.offset = 0; 2238 xs_stream_reset_connect(transport);
2339 transport->recv.len = 0;
2340 transport->recv.copied = 0;
2341 transport->xmit.offset = 0;
2342 2239
2343 /* Tell the socket layer to start connecting... */ 2240 /* Tell the socket layer to start connecting... */
2344 xprt->stat.connect_count++;
2345 xprt->stat.connect_start = jiffies;
2346 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); 2241 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
2347 ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); 2242 ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2348 switch (ret) { 2243 switch (ret) {
@@ -2717,6 +2612,7 @@ static const struct rpc_xprt_ops xs_local_ops = {
2717 .connect = xs_local_connect, 2612 .connect = xs_local_connect,
2718 .buf_alloc = rpc_malloc, 2613 .buf_alloc = rpc_malloc,
2719 .buf_free = rpc_free, 2614 .buf_free = rpc_free,
2615 .prepare_request = xs_stream_prepare_request,
2720 .send_request = xs_local_send_request, 2616 .send_request = xs_local_send_request,
2721 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2617 .set_retrans_timeout = xprt_set_retrans_timeout_def,
2722 .close = xs_close, 2618 .close = xs_close,
@@ -2901,9 +2797,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2901 xprt->ops = &xs_local_ops; 2797 xprt->ops = &xs_local_ops;
2902 xprt->timeout = &xs_local_default_timeout; 2798 xprt->timeout = &xs_local_default_timeout;
2903 2799
2904 INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); 2800 INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
2905 INIT_DELAYED_WORK(&transport->connect_worker, 2801 INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
2906 xs_dummy_setup_socket);
2907 2802
2908 switch (sun->sun_family) { 2803 switch (sun->sun_family) {
2909 case AF_LOCAL: 2804 case AF_LOCAL: