aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/clnt.c18
-rw-r--r--net/sunrpc/sched.c106
-rw-r--r--net/sunrpc/svcsock.c32
-rw-r--r--net/sunrpc/xprt.c25
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c86
-rw-r--r--net/sunrpc/xprtrdma/verbs.c53
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h1
-rw-r--r--net/sunrpc/xprtsock.c3
8 files changed, 202 insertions, 122 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 57d344cf2256..e7a96e478f63 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -436,7 +436,9 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
436 if (!(rovr->tk_flags & RPC_TASK_KILLED)) { 436 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
437 rovr->tk_flags |= RPC_TASK_KILLED; 437 rovr->tk_flags |= RPC_TASK_KILLED;
438 rpc_exit(rovr, -EIO); 438 rpc_exit(rovr, -EIO);
439 rpc_wake_up_queued_task(rovr->tk_waitqueue, rovr); 439 if (RPC_IS_QUEUED(rovr))
440 rpc_wake_up_queued_task(rovr->tk_waitqueue,
441 rovr);
440 } 442 }
441 } 443 }
442 spin_unlock(&clnt->cl_lock); 444 spin_unlock(&clnt->cl_lock);
@@ -597,6 +599,14 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
597 } 599 }
598} 600}
599 601
602void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
603{
604 rpc_task_release_client(task);
605 rpc_task_set_client(task, clnt);
606}
607EXPORT_SYMBOL_GPL(rpc_task_reset_client);
608
609
600static void 610static void
601rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg) 611rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
602{ 612{
@@ -636,12 +646,6 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
636 rpc_task_set_client(task, task_setup_data->rpc_client); 646 rpc_task_set_client(task, task_setup_data->rpc_client);
637 rpc_task_set_rpc_message(task, task_setup_data->rpc_message); 647 rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
638 648
639 if (task->tk_status != 0) {
640 int ret = task->tk_status;
641 rpc_put_task(task);
642 return ERR_PTR(ret);
643 }
644
645 if (task->tk_action == NULL) 649 if (task->tk_action == NULL)
646 rpc_call_start(task); 650 rpc_call_start(task);
647 651
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 243fc09b164e..ffb687671da0 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)
252 252
253/* 253/*
254 * Mark an RPC call as having completed by clearing the 'active' bit 254 * Mark an RPC call as having completed by clearing the 'active' bit
255 * and then waking up all tasks that were sleeping.
255 */ 256 */
256static void rpc_mark_complete_task(struct rpc_task *task) 257static int rpc_complete_task(struct rpc_task *task)
257{ 258{
258 smp_mb__before_clear_bit(); 259 void *m = &task->tk_runstate;
260 wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
261 struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
262 unsigned long flags;
263 int ret;
264
265 spin_lock_irqsave(&wq->lock, flags);
259 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 266 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
260 smp_mb__after_clear_bit(); 267 ret = atomic_dec_and_test(&task->tk_count);
261 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); 268 if (waitqueue_active(wq))
269 __wake_up_locked_key(wq, TASK_NORMAL, &k);
270 spin_unlock_irqrestore(&wq->lock, flags);
271 return ret;
262} 272}
263 273
264/* 274/*
265 * Allow callers to wait for completion of an RPC call 275 * Allow callers to wait for completion of an RPC call
276 *
277 * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
278 * to enforce taking of the wq->lock and hence avoid races with
279 * rpc_complete_task().
266 */ 280 */
267int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 281int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
268{ 282{
269 if (action == NULL) 283 if (action == NULL)
270 action = rpc_wait_bit_killable; 284 action = rpc_wait_bit_killable;
271 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 285 return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
272 action, TASK_KILLABLE); 286 action, TASK_KILLABLE);
273} 287}
274EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); 288EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -285,15 +299,8 @@ static void rpc_make_runnable(struct rpc_task *task)
285 if (rpc_test_and_set_running(task)) 299 if (rpc_test_and_set_running(task))
286 return; 300 return;
287 if (RPC_IS_ASYNC(task)) { 301 if (RPC_IS_ASYNC(task)) {
288 int status;
289
290 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 302 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
291 status = queue_work(rpciod_workqueue, &task->u.tk_work); 303 queue_work(rpciod_workqueue, &task->u.tk_work);
292 if (status < 0) {
293 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
294 task->tk_status = status;
295 return;
296 }
297 } else 304 } else
298 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 305 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
299} 306}
@@ -623,14 +630,12 @@ static void __rpc_execute(struct rpc_task *task)
623 save_callback = task->tk_callback; 630 save_callback = task->tk_callback;
624 task->tk_callback = NULL; 631 task->tk_callback = NULL;
625 save_callback(task); 632 save_callback(task);
626 } 633 } else {
627 634 /*
628 /* 635 * Perform the next FSM step.
629 * Perform the next FSM step. 636 * tk_action may be NULL when the task has been killed
630 * tk_action may be NULL when the task has been killed 637 * by someone else.
631 * by someone else. 638 */
632 */
633 if (!RPC_IS_QUEUED(task)) {
634 if (task->tk_action == NULL) 639 if (task->tk_action == NULL)
635 break; 640 break;
636 task->tk_action(task); 641 task->tk_action(task);
@@ -829,12 +834,6 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
829 } 834 }
830 835
831 rpc_init_task(task, setup_data); 836 rpc_init_task(task, setup_data);
832 if (task->tk_status < 0) {
833 int err = task->tk_status;
834 rpc_put_task(task);
835 return ERR_PTR(err);
836 }
837
838 task->tk_flags |= flags; 837 task->tk_flags |= flags;
839 dprintk("RPC: allocated task %p\n", task); 838 dprintk("RPC: allocated task %p\n", task);
840 return task; 839 return task;
@@ -857,34 +856,67 @@ static void rpc_async_release(struct work_struct *work)
857 rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); 856 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
858} 857}
859 858
860void rpc_put_task(struct rpc_task *task) 859static void rpc_release_resources_task(struct rpc_task *task)
861{ 860{
862 if (!atomic_dec_and_test(&task->tk_count))
863 return;
864 /* Release resources */
865 if (task->tk_rqstp) 861 if (task->tk_rqstp)
866 xprt_release(task); 862 xprt_release(task);
867 if (task->tk_msg.rpc_cred) 863 if (task->tk_msg.rpc_cred)
868 put_rpccred(task->tk_msg.rpc_cred); 864 put_rpccred(task->tk_msg.rpc_cred);
869 rpc_task_release_client(task); 865 rpc_task_release_client(task);
870 if (task->tk_workqueue != NULL) { 866}
867
868static void rpc_final_put_task(struct rpc_task *task,
869 struct workqueue_struct *q)
870{
871 if (q != NULL) {
871 INIT_WORK(&task->u.tk_work, rpc_async_release); 872 INIT_WORK(&task->u.tk_work, rpc_async_release);
872 queue_work(task->tk_workqueue, &task->u.tk_work); 873 queue_work(q, &task->u.tk_work);
873 } else 874 } else
874 rpc_free_task(task); 875 rpc_free_task(task);
875} 876}
877
878static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
879{
880 if (atomic_dec_and_test(&task->tk_count)) {
881 rpc_release_resources_task(task);
882 rpc_final_put_task(task, q);
883 }
884}
885
886void rpc_put_task(struct rpc_task *task)
887{
888 rpc_do_put_task(task, NULL);
889}
876EXPORT_SYMBOL_GPL(rpc_put_task); 890EXPORT_SYMBOL_GPL(rpc_put_task);
877 891
892void rpc_put_task_async(struct rpc_task *task)
893{
894 rpc_do_put_task(task, task->tk_workqueue);
895}
896EXPORT_SYMBOL_GPL(rpc_put_task_async);
897
878static void rpc_release_task(struct rpc_task *task) 898static void rpc_release_task(struct rpc_task *task)
879{ 899{
880 dprintk("RPC: %5u release task\n", task->tk_pid); 900 dprintk("RPC: %5u release task\n", task->tk_pid);
881 901
882 BUG_ON (RPC_IS_QUEUED(task)); 902 BUG_ON (RPC_IS_QUEUED(task));
883 903
884 /* Wake up anyone who is waiting for task completion */ 904 rpc_release_resources_task(task);
885 rpc_mark_complete_task(task);
886 905
887 rpc_put_task(task); 906 /*
907 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
908 * so it should be safe to use task->tk_count as a test for whether
909 * or not any other processes still hold references to our rpc_task.
910 */
911 if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
912 /* Wake up anyone who may be waiting for task completion */
913 if (!rpc_complete_task(task))
914 return;
915 } else {
916 if (!atomic_dec_and_test(&task->tk_count))
917 return;
918 }
919 rpc_final_put_task(task, task->tk_workqueue);
888} 920}
889 921
890int rpciod_up(void) 922int rpciod_up(void)
@@ -908,7 +940,7 @@ static int rpciod_start(void)
908 * Create the rpciod thread and wait for it to start. 940 * Create the rpciod thread and wait for it to start.
909 */ 941 */
910 dprintk("RPC: creating workqueue rpciod\n"); 942 dprintk("RPC: creating workqueue rpciod\n");
911 wq = alloc_workqueue("rpciod", WQ_RESCUER, 0); 943 wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
912 rpciod_workqueue = wq; 944 rpciod_workqueue = wq;
913 return rpciod_workqueue != NULL; 945 return rpciod_workqueue != NULL;
914} 946}
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index d802e941d365..b7d435c3f19e 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -420,6 +420,7 @@ static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
420static void svc_udp_data_ready(struct sock *sk, int count) 420static void svc_udp_data_ready(struct sock *sk, int count)
421{ 421{
422 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 422 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
423 wait_queue_head_t *wq = sk_sleep(sk);
423 424
424 if (svsk) { 425 if (svsk) {
425 dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n", 426 dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
@@ -428,8 +429,8 @@ static void svc_udp_data_ready(struct sock *sk, int count)
428 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 429 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
429 svc_xprt_enqueue(&svsk->sk_xprt); 430 svc_xprt_enqueue(&svsk->sk_xprt);
430 } 431 }
431 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 432 if (wq && waitqueue_active(wq))
432 wake_up_interruptible(sk_sleep(sk)); 433 wake_up_interruptible(wq);
433} 434}
434 435
435/* 436/*
@@ -438,6 +439,7 @@ static void svc_udp_data_ready(struct sock *sk, int count)
438static void svc_write_space(struct sock *sk) 439static void svc_write_space(struct sock *sk)
439{ 440{
440 struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); 441 struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data);
442 wait_queue_head_t *wq = sk_sleep(sk);
441 443
442 if (svsk) { 444 if (svsk) {
443 dprintk("svc: socket %p(inet %p), write_space busy=%d\n", 445 dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
@@ -445,10 +447,10 @@ static void svc_write_space(struct sock *sk)
445 svc_xprt_enqueue(&svsk->sk_xprt); 447 svc_xprt_enqueue(&svsk->sk_xprt);
446 } 448 }
447 449
448 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) { 450 if (wq && waitqueue_active(wq)) {
449 dprintk("RPC svc_write_space: someone sleeping on %p\n", 451 dprintk("RPC svc_write_space: someone sleeping on %p\n",
450 svsk); 452 svsk);
451 wake_up_interruptible(sk_sleep(sk)); 453 wake_up_interruptible(wq);
452 } 454 }
453} 455}
454 456
@@ -739,6 +741,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
739static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) 741static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
740{ 742{
741 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 743 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
744 wait_queue_head_t *wq;
742 745
743 dprintk("svc: socket %p TCP (listen) state change %d\n", 746 dprintk("svc: socket %p TCP (listen) state change %d\n",
744 sk, sk->sk_state); 747 sk, sk->sk_state);
@@ -761,8 +764,9 @@ static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
761 printk("svc: socket %p: no user data\n", sk); 764 printk("svc: socket %p: no user data\n", sk);
762 } 765 }
763 766
764 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 767 wq = sk_sleep(sk);
765 wake_up_interruptible_all(sk_sleep(sk)); 768 if (wq && waitqueue_active(wq))
769 wake_up_interruptible_all(wq);
766} 770}
767 771
768/* 772/*
@@ -771,6 +775,7 @@ static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
771static void svc_tcp_state_change(struct sock *sk) 775static void svc_tcp_state_change(struct sock *sk)
772{ 776{
773 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 777 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
778 wait_queue_head_t *wq = sk_sleep(sk);
774 779
775 dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n", 780 dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
776 sk, sk->sk_state, sk->sk_user_data); 781 sk, sk->sk_state, sk->sk_user_data);
@@ -781,13 +786,14 @@ static void svc_tcp_state_change(struct sock *sk)
781 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 786 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
782 svc_xprt_enqueue(&svsk->sk_xprt); 787 svc_xprt_enqueue(&svsk->sk_xprt);
783 } 788 }
784 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 789 if (wq && waitqueue_active(wq))
785 wake_up_interruptible_all(sk_sleep(sk)); 790 wake_up_interruptible_all(wq);
786} 791}
787 792
788static void svc_tcp_data_ready(struct sock *sk, int count) 793static void svc_tcp_data_ready(struct sock *sk, int count)
789{ 794{
790 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 795 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
796 wait_queue_head_t *wq = sk_sleep(sk);
791 797
792 dprintk("svc: socket %p TCP data ready (svsk %p)\n", 798 dprintk("svc: socket %p TCP data ready (svsk %p)\n",
793 sk, sk->sk_user_data); 799 sk, sk->sk_user_data);
@@ -795,8 +801,8 @@ static void svc_tcp_data_ready(struct sock *sk, int count)
795 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 801 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
796 svc_xprt_enqueue(&svsk->sk_xprt); 802 svc_xprt_enqueue(&svsk->sk_xprt);
797 } 803 }
798 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 804 if (wq && waitqueue_active(wq))
799 wake_up_interruptible(sk_sleep(sk)); 805 wake_up_interruptible(wq);
800} 806}
801 807
802/* 808/*
@@ -1531,6 +1537,7 @@ static void svc_sock_detach(struct svc_xprt *xprt)
1531{ 1537{
1532 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 1538 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
1533 struct sock *sk = svsk->sk_sk; 1539 struct sock *sk = svsk->sk_sk;
1540 wait_queue_head_t *wq;
1534 1541
1535 dprintk("svc: svc_sock_detach(%p)\n", svsk); 1542 dprintk("svc: svc_sock_detach(%p)\n", svsk);
1536 1543
@@ -1539,8 +1546,9 @@ static void svc_sock_detach(struct svc_xprt *xprt)
1539 sk->sk_data_ready = svsk->sk_odata; 1546 sk->sk_data_ready = svsk->sk_odata;
1540 sk->sk_write_space = svsk->sk_owspace; 1547 sk->sk_write_space = svsk->sk_owspace;
1541 1548
1542 if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) 1549 wq = sk_sleep(sk);
1543 wake_up_interruptible(sk_sleep(sk)); 1550 if (wq && waitqueue_active(wq))
1551 wake_up_interruptible(wq);
1544} 1552}
1545 1553
1546/* 1554/*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 856274d7e85c..9494c3767356 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -202,10 +202,9 @@ int xprt_reserve_xprt(struct rpc_task *task)
202 goto out_sleep; 202 goto out_sleep;
203 } 203 }
204 xprt->snd_task = task; 204 xprt->snd_task = task;
205 if (req) { 205 req->rq_bytes_sent = 0;
206 req->rq_bytes_sent = 0; 206 req->rq_ntrans++;
207 req->rq_ntrans++; 207
208 }
209 return 1; 208 return 1;
210 209
211out_sleep: 210out_sleep:
@@ -213,7 +212,7 @@ out_sleep:
213 task->tk_pid, xprt); 212 task->tk_pid, xprt);
214 task->tk_timeout = 0; 213 task->tk_timeout = 0;
215 task->tk_status = -EAGAIN; 214 task->tk_status = -EAGAIN;
216 if (req && req->rq_ntrans) 215 if (req->rq_ntrans)
217 rpc_sleep_on(&xprt->resend, task, NULL); 216 rpc_sleep_on(&xprt->resend, task, NULL);
218 else 217 else
219 rpc_sleep_on(&xprt->sending, task, NULL); 218 rpc_sleep_on(&xprt->sending, task, NULL);
@@ -965,7 +964,7 @@ struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req)
965 xprt = kzalloc(size, GFP_KERNEL); 964 xprt = kzalloc(size, GFP_KERNEL);
966 if (xprt == NULL) 965 if (xprt == NULL)
967 goto out; 966 goto out;
968 kref_init(&xprt->kref); 967 atomic_set(&xprt->count, 1);
969 968
970 xprt->max_reqs = max_req; 969 xprt->max_reqs = max_req;
971 xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL); 970 xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL);
@@ -1145,13 +1144,11 @@ found:
1145 1144
1146/** 1145/**
1147 * xprt_destroy - destroy an RPC transport, killing off all requests. 1146 * xprt_destroy - destroy an RPC transport, killing off all requests.
1148 * @kref: kref for the transport to destroy 1147 * @xprt: transport to destroy
1149 * 1148 *
1150 */ 1149 */
1151static void xprt_destroy(struct kref *kref) 1150static void xprt_destroy(struct rpc_xprt *xprt)
1152{ 1151{
1153 struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1154
1155 dprintk("RPC: destroying transport %p\n", xprt); 1152 dprintk("RPC: destroying transport %p\n", xprt);
1156 xprt->shutdown = 1; 1153 xprt->shutdown = 1;
1157 del_timer_sync(&xprt->timer); 1154 del_timer_sync(&xprt->timer);
@@ -1175,7 +1172,8 @@ static void xprt_destroy(struct kref *kref)
1175 */ 1172 */
1176void xprt_put(struct rpc_xprt *xprt) 1173void xprt_put(struct rpc_xprt *xprt)
1177{ 1174{
1178 kref_put(&xprt->kref, xprt_destroy); 1175 if (atomic_dec_and_test(&xprt->count))
1176 xprt_destroy(xprt);
1179} 1177}
1180 1178
1181/** 1179/**
@@ -1185,6 +1183,7 @@ void xprt_put(struct rpc_xprt *xprt)
1185 */ 1183 */
1186struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1184struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1187{ 1185{
1188 kref_get(&xprt->kref); 1186 if (atomic_inc_not_zero(&xprt->count))
1189 return xprt; 1187 return xprt;
1188 return NULL;
1190} 1189}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 2ac3f6e8adff..554d0814c875 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -87,6 +87,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
87 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) 87 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
88{ 88{
89 int len, n = 0, p; 89 int len, n = 0, p;
90 int page_base;
91 struct page **ppages;
90 92
91 if (pos == 0 && xdrbuf->head[0].iov_len) { 93 if (pos == 0 && xdrbuf->head[0].iov_len) {
92 seg[n].mr_page = NULL; 94 seg[n].mr_page = NULL;
@@ -95,34 +97,32 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
95 ++n; 97 ++n;
96 } 98 }
97 99
98 if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) { 100 len = xdrbuf->page_len;
99 if (n == nsegs) 101 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
100 return 0; 102 page_base = xdrbuf->page_base & ~PAGE_MASK;
101 seg[n].mr_page = xdrbuf->pages[0]; 103 p = 0;
102 seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base; 104 while (len && n < nsegs) {
103 seg[n].mr_len = min_t(u32, 105 seg[n].mr_page = ppages[p];
104 PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len); 106 seg[n].mr_offset = (void *)(unsigned long) page_base;
105 len = xdrbuf->page_len - seg[n].mr_len; 107 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
108 BUG_ON(seg[n].mr_len > PAGE_SIZE);
109 len -= seg[n].mr_len;
106 ++n; 110 ++n;
107 p = 1; 111 ++p;
108 while (len > 0) { 112 page_base = 0; /* page offset only applies to first page */
109 if (n == nsegs)
110 return 0;
111 seg[n].mr_page = xdrbuf->pages[p];
112 seg[n].mr_offset = NULL;
113 seg[n].mr_len = min_t(u32, PAGE_SIZE, len);
114 len -= seg[n].mr_len;
115 ++n;
116 ++p;
117 }
118 } 113 }
119 114
115 /* Message overflows the seg array */
116 if (len && n == nsegs)
117 return 0;
118
120 if (xdrbuf->tail[0].iov_len) { 119 if (xdrbuf->tail[0].iov_len) {
121 /* the rpcrdma protocol allows us to omit any trailing 120 /* the rpcrdma protocol allows us to omit any trailing
122 * xdr pad bytes, saving the server an RDMA operation. */ 121 * xdr pad bytes, saving the server an RDMA operation. */
123 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) 122 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
124 return n; 123 return n;
125 if (n == nsegs) 124 if (n == nsegs)
125 /* Tail remains, but we're out of segments */
126 return 0; 126 return 0;
127 seg[n].mr_page = NULL; 127 seg[n].mr_page = NULL;
128 seg[n].mr_offset = xdrbuf->tail[0].iov_base; 128 seg[n].mr_offset = xdrbuf->tail[0].iov_base;
@@ -296,6 +296,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
296 int copy_len; 296 int copy_len;
297 unsigned char *srcp, *destp; 297 unsigned char *srcp, *destp;
298 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 298 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
299 int page_base;
300 struct page **ppages;
299 301
300 destp = rqst->rq_svec[0].iov_base; 302 destp = rqst->rq_svec[0].iov_base;
301 curlen = rqst->rq_svec[0].iov_len; 303 curlen = rqst->rq_svec[0].iov_len;
@@ -324,28 +326,25 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
324 __func__, destp + copy_len, curlen); 326 __func__, destp + copy_len, curlen);
325 rqst->rq_svec[0].iov_len += curlen; 327 rqst->rq_svec[0].iov_len += curlen;
326 } 328 }
327
328 r_xprt->rx_stats.pullup_copy_count += copy_len; 329 r_xprt->rx_stats.pullup_copy_count += copy_len;
329 npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT; 330
331 page_base = rqst->rq_snd_buf.page_base;
332 ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
333 page_base &= ~PAGE_MASK;
334 npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
330 for (i = 0; copy_len && i < npages; i++) { 335 for (i = 0; copy_len && i < npages; i++) {
331 if (i == 0) 336 curlen = PAGE_SIZE - page_base;
332 curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
333 else
334 curlen = PAGE_SIZE;
335 if (curlen > copy_len) 337 if (curlen > copy_len)
336 curlen = copy_len; 338 curlen = copy_len;
337 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", 339 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
338 __func__, i, destp, copy_len, curlen); 340 __func__, i, destp, copy_len, curlen);
339 srcp = kmap_atomic(rqst->rq_snd_buf.pages[i], 341 srcp = kmap_atomic(ppages[i], KM_SKB_SUNRPC_DATA);
340 KM_SKB_SUNRPC_DATA); 342 memcpy(destp, srcp+page_base, curlen);
341 if (i == 0)
342 memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
343 else
344 memcpy(destp, srcp, curlen);
345 kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA); 343 kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
346 rqst->rq_svec[0].iov_len += curlen; 344 rqst->rq_svec[0].iov_len += curlen;
347 destp += curlen; 345 destp += curlen;
348 copy_len -= curlen; 346 copy_len -= curlen;
347 page_base = 0;
349 } 348 }
350 /* header now contains entire send message */ 349 /* header now contains entire send message */
351 return pad; 350 return pad;
@@ -606,6 +605,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
606{ 605{
607 int i, npages, curlen, olen; 606 int i, npages, curlen, olen;
608 char *destp; 607 char *destp;
608 struct page **ppages;
609 int page_base;
609 610
610 curlen = rqst->rq_rcv_buf.head[0].iov_len; 611 curlen = rqst->rq_rcv_buf.head[0].iov_len;
611 if (curlen > copy_len) { /* write chunk header fixup */ 612 if (curlen > copy_len) { /* write chunk header fixup */
@@ -624,32 +625,29 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
624 olen = copy_len; 625 olen = copy_len;
625 i = 0; 626 i = 0;
626 rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; 627 rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
628 page_base = rqst->rq_rcv_buf.page_base;
629 ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
630 page_base &= ~PAGE_MASK;
631
627 if (copy_len && rqst->rq_rcv_buf.page_len) { 632 if (copy_len && rqst->rq_rcv_buf.page_len) {
628 npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base + 633 npages = PAGE_ALIGN(page_base +
629 rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; 634 rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
630 for (; i < npages; i++) { 635 for (; i < npages; i++) {
631 if (i == 0) 636 curlen = PAGE_SIZE - page_base;
632 curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
633 else
634 curlen = PAGE_SIZE;
635 if (curlen > copy_len) 637 if (curlen > copy_len)
636 curlen = copy_len; 638 curlen = copy_len;
637 dprintk("RPC: %s: page %d" 639 dprintk("RPC: %s: page %d"
638 " srcp 0x%p len %d curlen %d\n", 640 " srcp 0x%p len %d curlen %d\n",
639 __func__, i, srcp, copy_len, curlen); 641 __func__, i, srcp, copy_len, curlen);
640 destp = kmap_atomic(rqst->rq_rcv_buf.pages[i], 642 destp = kmap_atomic(ppages[i], KM_SKB_SUNRPC_DATA);
641 KM_SKB_SUNRPC_DATA); 643 memcpy(destp + page_base, srcp, curlen);
642 if (i == 0) 644 flush_dcache_page(ppages[i]);
643 memcpy(destp + rqst->rq_rcv_buf.page_base,
644 srcp, curlen);
645 else
646 memcpy(destp, srcp, curlen);
647 flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
648 kunmap_atomic(destp, KM_SKB_SUNRPC_DATA); 645 kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
649 srcp += curlen; 646 srcp += curlen;
650 copy_len -= curlen; 647 copy_len -= curlen;
651 if (copy_len == 0) 648 if (copy_len == 0)
652 break; 649 break;
650 page_base = 0;
653 } 651 }
654 rqst->rq_rcv_buf.page_len = olen - copy_len; 652 rqst->rq_rcv_buf.page_len = olen - copy_len;
655 } else 653 } else
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 5f4c7b3bc711..d4297dc43dc4 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -144,6 +144,7 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
144static inline 144static inline
145void rpcrdma_event_process(struct ib_wc *wc) 145void rpcrdma_event_process(struct ib_wc *wc)
146{ 146{
147 struct rpcrdma_mw *frmr;
147 struct rpcrdma_rep *rep = 148 struct rpcrdma_rep *rep =
148 (struct rpcrdma_rep *)(unsigned long) wc->wr_id; 149 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
149 150
@@ -154,15 +155,23 @@ void rpcrdma_event_process(struct ib_wc *wc)
154 return; 155 return;
155 156
156 if (IB_WC_SUCCESS != wc->status) { 157 if (IB_WC_SUCCESS != wc->status) {
157 dprintk("RPC: %s: %s WC status %X, connection lost\n", 158 dprintk("RPC: %s: WC opcode %d status %X, connection lost\n",
158 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send", 159 __func__, wc->opcode, wc->status);
159 wc->status);
160 rep->rr_len = ~0U; 160 rep->rr_len = ~0U;
161 rpcrdma_schedule_tasklet(rep); 161 if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
162 rpcrdma_schedule_tasklet(rep);
162 return; 163 return;
163 } 164 }
164 165
165 switch (wc->opcode) { 166 switch (wc->opcode) {
167 case IB_WC_FAST_REG_MR:
168 frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
169 frmr->r.frmr.state = FRMR_IS_VALID;
170 break;
171 case IB_WC_LOCAL_INV:
172 frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
173 frmr->r.frmr.state = FRMR_IS_INVALID;
174 break;
166 case IB_WC_RECV: 175 case IB_WC_RECV:
167 rep->rr_len = wc->byte_len; 176 rep->rr_len = wc->byte_len;
168 ib_dma_sync_single_for_cpu( 177 ib_dma_sync_single_for_cpu(
@@ -1450,6 +1459,12 @@ rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1450 seg->mr_dma = ib_dma_map_single(ia->ri_id->device, 1459 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1451 seg->mr_offset, 1460 seg->mr_offset,
1452 seg->mr_dmalen, seg->mr_dir); 1461 seg->mr_dmalen, seg->mr_dir);
1462 if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1463 dprintk("RPC: %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1464 __func__,
1465 (unsigned long long)seg->mr_dma,
1466 seg->mr_offset, seg->mr_dmalen);
1467 }
1453} 1468}
1454 1469
1455static void 1470static void
@@ -1469,7 +1484,8 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1469 struct rpcrdma_xprt *r_xprt) 1484 struct rpcrdma_xprt *r_xprt)
1470{ 1485{
1471 struct rpcrdma_mr_seg *seg1 = seg; 1486 struct rpcrdma_mr_seg *seg1 = seg;
1472 struct ib_send_wr frmr_wr, *bad_wr; 1487 struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1488
1473 u8 key; 1489 u8 key;
1474 int len, pageoff; 1490 int len, pageoff;
1475 int i, rc; 1491 int i, rc;
@@ -1484,6 +1500,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1484 rpcrdma_map_one(ia, seg, writing); 1500 rpcrdma_map_one(ia, seg, writing);
1485 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma; 1501 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1486 len += seg->mr_len; 1502 len += seg->mr_len;
1503 BUG_ON(seg->mr_len > PAGE_SIZE);
1487 ++seg; 1504 ++seg;
1488 ++i; 1505 ++i;
1489 /* Check for holes */ 1506 /* Check for holes */
@@ -1494,26 +1511,45 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1494 dprintk("RPC: %s: Using frmr %p to map %d segments\n", 1511 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1495 __func__, seg1->mr_chunk.rl_mw, i); 1512 __func__, seg1->mr_chunk.rl_mw, i);
1496 1513
1514 if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1515 dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n",
1516 __func__,
1517 seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1518 /* Invalidate before using. */
1519 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1520 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1521 invalidate_wr.next = &frmr_wr;
1522 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1523 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1524 invalidate_wr.ex.invalidate_rkey =
1525 seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1526 DECR_CQCOUNT(&r_xprt->rx_ep);
1527 post_wr = &invalidate_wr;
1528 } else
1529 post_wr = &frmr_wr;
1530
1497 /* Bump the key */ 1531 /* Bump the key */
1498 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); 1532 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1499 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); 1533 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1500 1534
1501 /* Prepare FRMR WR */ 1535 /* Prepare FRMR WR */
1502 memset(&frmr_wr, 0, sizeof frmr_wr); 1536 memset(&frmr_wr, 0, sizeof frmr_wr);
1537 frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1503 frmr_wr.opcode = IB_WR_FAST_REG_MR; 1538 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1504 frmr_wr.send_flags = 0; /* unsignaled */ 1539 frmr_wr.send_flags = IB_SEND_SIGNALED;
1505 frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma; 1540 frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1506 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl; 1541 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1507 frmr_wr.wr.fast_reg.page_list_len = i; 1542 frmr_wr.wr.fast_reg.page_list_len = i;
1508 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT; 1543 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1509 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT; 1544 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1545 BUG_ON(frmr_wr.wr.fast_reg.length < len);
1510 frmr_wr.wr.fast_reg.access_flags = (writing ? 1546 frmr_wr.wr.fast_reg.access_flags = (writing ?
1511 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : 1547 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1512 IB_ACCESS_REMOTE_READ); 1548 IB_ACCESS_REMOTE_READ);
1513 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; 1549 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1514 DECR_CQCOUNT(&r_xprt->rx_ep); 1550 DECR_CQCOUNT(&r_xprt->rx_ep);
1515 1551
1516 rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr); 1552 rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1517 1553
1518 if (rc) { 1554 if (rc) {
1519 dprintk("RPC: %s: failed ib_post_send for register," 1555 dprintk("RPC: %s: failed ib_post_send for register,"
@@ -1542,8 +1578,9 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1542 rpcrdma_unmap_one(ia, seg++); 1578 rpcrdma_unmap_one(ia, seg++);
1543 1579
1544 memset(&invalidate_wr, 0, sizeof invalidate_wr); 1580 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1581 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1545 invalidate_wr.opcode = IB_WR_LOCAL_INV; 1582 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1546 invalidate_wr.send_flags = 0; /* unsignaled */ 1583 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1547 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; 1584 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1548 DECR_CQCOUNT(&r_xprt->rx_ep); 1585 DECR_CQCOUNT(&r_xprt->rx_ep);
1549 1586
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c7a7eba991bc..cae761a8536c 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -164,6 +164,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
164 struct { 164 struct {
165 struct ib_fast_reg_page_list *fr_pgl; 165 struct ib_fast_reg_page_list *fr_pgl;
166 struct ib_mr *fr_mr; 166 struct ib_mr *fr_mr;
167 enum { FRMR_IS_INVALID, FRMR_IS_VALID } state;
167 } frmr; 168 } frmr;
168 } r; 169 } r;
169 struct list_head mw_list; 170 struct list_head mw_list;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index c431f5a57960..be96d429b475 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1631,7 +1631,8 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1631 } 1631 }
1632 xs_reclassify_socket(family, sock); 1632 xs_reclassify_socket(family, sock);
1633 1633
1634 if (xs_bind(transport, sock)) { 1634 err = xs_bind(transport, sock);
1635 if (err) {
1635 sock_release(sock); 1636 sock_release(sock);
1636 goto out; 1637 goto out;
1637 } 1638 }