aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRicardo Labiaga <Ricardo.Labiaga@netapp.com>2009-04-01 09:23:03 -0400
committerBenny Halevy <bhalevy@panasas.com>2009-06-17 17:11:24 -0400
commit55ae1aabfb108106dd095de2578ceef1c755a8b8 (patch)
tree7ccb91f5aac0a3646f7d4e8fbe913955aa300f95
parent44b98efdd0a205bdca2cb63493350d06ff6804b1 (diff)
nfs41: Add backchannel processing support to RPC state machine
Adds rpc_run_bc_task() which is called by the NFS callback service to process backchannel requests. It performs similar work to rpc_run_task() though "schedules" the backchannel task to be executed starting at the call_trasmit state in the RPC state machine. It also introduces some miscellaneous updates to the argument validation, call_transmit, and transport cleanup functions to take into account that there are now forechannel and backchannel tasks. Backchannel requests do not carry an RPC message structure, since the payload has already been XDR encoded using the existing NFSv4 callback mechanism. Introduce a new transmit state for the client to reply on to backchannel requests. This new state simply reserves the transport and issues the reply. In case of a connection related error, disconnects the transport and drops the reply. It requires the forechannel to re-establish the connection and the server to retransmit the request, as stated in NFSv4.1 section 2.9.2 "Client and Server Transport Behavior". Note: There is no need to loop attempting to reserve the transport. If EAGAIN is returned by xprt_prepare_transmit(), return with tk_status == 0, setting tk_action to call_bc_transmit. rpc_execute() will invoke it again after the task is taken off the sleep queue. [nfs41: rpc_run_bc_task() need not be exported outside RPC module] [nfs41: New call_bc_transmit RPC state] Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com> [nfs41: Backchannel: No need to loop in call_bc_transmit()] Signed-off-by: Andy Adamson <andros@netapp.com> Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com> [rpc_count_iostats incorrectly exits early] Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com> [Convert rpc_reply_expected() to inline function] [Remove unnecessary BUG_ON()] [Rename variable] Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com>
-rw-r--r--include/linux/sunrpc/sched.h2
-rw-r--r--include/linux/sunrpc/xprt.h12
-rw-r--r--net/sunrpc/clnt.c117
-rw-r--r--net/sunrpc/stats.c6
-rw-r--r--net/sunrpc/sunrpc.h37
-rw-r--r--net/sunrpc/xprt.c38
6 files changed, 203 insertions, 9 deletions
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 177376880fab..401097781fc0 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -210,6 +210,8 @@ struct rpc_wait_queue {
210 */ 210 */
211struct rpc_task *rpc_new_task(const struct rpc_task_setup *); 211struct rpc_task *rpc_new_task(const struct rpc_task_setup *);
212struct rpc_task *rpc_run_task(const struct rpc_task_setup *); 212struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
213struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
214 const struct rpc_call_ops *ops);
213void rpc_put_task(struct rpc_task *); 215void rpc_put_task(struct rpc_task *);
214void rpc_exit_task(struct rpc_task *); 216void rpc_exit_task(struct rpc_task *);
215void rpc_release_calldata(const struct rpc_call_ops *, void *); 217void rpc_release_calldata(const struct rpc_call_ops *, void *);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index beae030e80b5..55c6c37e249e 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -215,6 +215,18 @@ struct rpc_xprt {
215 /* buffer in use */ 215 /* buffer in use */
216#endif /* CONFIG_NFS_V4_1 */ 216#endif /* CONFIG_NFS_V4_1 */
217 217
218#if defined(CONFIG_NFS_V4_1)
219static inline int bc_prealloc(struct rpc_rqst *req)
220{
221 return test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
222}
223#else
224static inline int bc_prealloc(struct rpc_rqst *req)
225{
226 return 0;
227}
228#endif /* CONFIG_NFS_V4_1 */
229
218struct xprt_create { 230struct xprt_create {
219 int ident; /* XPRT_TRANSPORT identifier */ 231 int ident; /* XPRT_TRANSPORT identifier */
220 struct sockaddr * srcaddr; /* optional local address */ 232 struct sockaddr * srcaddr; /* optional local address */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index aca3ab6fc140..f3e93b8eb90f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -36,7 +36,9 @@
36#include <linux/sunrpc/clnt.h> 36#include <linux/sunrpc/clnt.h>
37#include <linux/sunrpc/rpc_pipe_fs.h> 37#include <linux/sunrpc/rpc_pipe_fs.h>
38#include <linux/sunrpc/metrics.h> 38#include <linux/sunrpc/metrics.h>
39#include <linux/sunrpc/bc_xprt.h>
39 40
41#include "sunrpc.h"
40 42
41#ifdef RPC_DEBUG 43#ifdef RPC_DEBUG
42# define RPCDBG_FACILITY RPCDBG_CALL 44# define RPCDBG_FACILITY RPCDBG_CALL
@@ -63,6 +65,9 @@ static void call_decode(struct rpc_task *task);
63static void call_bind(struct rpc_task *task); 65static void call_bind(struct rpc_task *task);
64static void call_bind_status(struct rpc_task *task); 66static void call_bind_status(struct rpc_task *task);
65static void call_transmit(struct rpc_task *task); 67static void call_transmit(struct rpc_task *task);
68#if defined(CONFIG_NFS_V4_1)
69static void call_bc_transmit(struct rpc_task *task);
70#endif /* CONFIG_NFS_V4_1 */
66static void call_status(struct rpc_task *task); 71static void call_status(struct rpc_task *task);
67static void call_transmit_status(struct rpc_task *task); 72static void call_transmit_status(struct rpc_task *task);
68static void call_refresh(struct rpc_task *task); 73static void call_refresh(struct rpc_task *task);
@@ -613,6 +618,50 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
613} 618}
614EXPORT_SYMBOL_GPL(rpc_call_async); 619EXPORT_SYMBOL_GPL(rpc_call_async);
615 620
621#if defined(CONFIG_NFS_V4_1)
622/**
623 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
624 * rpc_execute against it
625 * @ops: RPC call ops
626 */
627struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
628 const struct rpc_call_ops *tk_ops)
629{
630 struct rpc_task *task;
631 struct xdr_buf *xbufp = &req->rq_snd_buf;
632 struct rpc_task_setup task_setup_data = {
633 .callback_ops = tk_ops,
634 };
635
636 dprintk("RPC: rpc_run_bc_task req= %p\n", req);
637 /*
638 * Create an rpc_task to send the data
639 */
640 task = rpc_new_task(&task_setup_data);
641 if (!task) {
642 xprt_free_bc_request(req);
643 goto out;
644 }
645 task->tk_rqstp = req;
646
647 /*
648 * Set up the xdr_buf length.
649 * This also indicates that the buffer is XDR encoded already.
650 */
651 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
652 xbufp->tail[0].iov_len;
653
654 task->tk_action = call_bc_transmit;
655 atomic_inc(&task->tk_count);
656 BUG_ON(atomic_read(&task->tk_count) != 2);
657 rpc_execute(task);
658
659out:
660 dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
661 return task;
662}
663#endif /* CONFIG_NFS_V4_1 */
664
616void 665void
617rpc_call_start(struct rpc_task *task) 666rpc_call_start(struct rpc_task *task)
618{ 667{
@@ -1098,7 +1147,7 @@ call_transmit(struct rpc_task *task)
1098 * in order to allow access to the socket to other RPC requests. 1147 * in order to allow access to the socket to other RPC requests.
1099 */ 1148 */
1100 call_transmit_status(task); 1149 call_transmit_status(task);
1101 if (task->tk_msg.rpc_proc->p_decode != NULL) 1150 if (rpc_reply_expected(task))
1102 return; 1151 return;
1103 task->tk_action = rpc_exit_task; 1152 task->tk_action = rpc_exit_task;
1104 rpc_wake_up_queued_task(&task->tk_xprt->pending, task); 1153 rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
@@ -1133,6 +1182,72 @@ call_transmit_status(struct rpc_task *task)
1133 } 1182 }
1134} 1183}
1135 1184
1185#if defined(CONFIG_NFS_V4_1)
1186/*
1187 * 5b. Send the backchannel RPC reply. On error, drop the reply. In
1188 * addition, disconnect on connectivity errors.
1189 */
1190static void
1191call_bc_transmit(struct rpc_task *task)
1192{
1193 struct rpc_rqst *req = task->tk_rqstp;
1194
1195 BUG_ON(task->tk_status != 0);
1196 task->tk_status = xprt_prepare_transmit(task);
1197 if (task->tk_status == -EAGAIN) {
1198 /*
1199 * Could not reserve the transport. Try again after the
1200 * transport is released.
1201 */
1202 task->tk_status = 0;
1203 task->tk_action = call_bc_transmit;
1204 return;
1205 }
1206
1207 task->tk_action = rpc_exit_task;
1208 if (task->tk_status < 0) {
1209 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1210 "error: %d\n", task->tk_status);
1211 return;
1212 }
1213
1214 xprt_transmit(task);
1215 xprt_end_transmit(task);
1216 dprint_status(task);
1217 switch (task->tk_status) {
1218 case 0:
1219 /* Success */
1220 break;
1221 case -EHOSTDOWN:
1222 case -EHOSTUNREACH:
1223 case -ENETUNREACH:
1224 case -ETIMEDOUT:
1225 /*
1226 * Problem reaching the server. Disconnect and let the
1227 * forechannel reestablish the connection. The server will
1228 * have to retransmit the backchannel request and we'll
1229 * reprocess it. Since these ops are idempotent, there's no
1230 * need to cache our reply at this time.
1231 */
1232 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1233 "error: %d\n", task->tk_status);
1234 xprt_conditional_disconnect(task->tk_xprt,
1235 req->rq_connect_cookie);
1236 break;
1237 default:
1238 /*
1239 * We were unable to reply and will have to drop the
1240 * request. The server should reconnect and retransmit.
1241 */
1242 BUG_ON(task->tk_status == -EAGAIN);
1243 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1244 "error: %d\n", task->tk_status);
1245 break;
1246 }
1247 rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1248}
1249#endif /* CONFIG_NFS_V4_1 */
1250
1136/* 1251/*
1137 * 6. Sort out the RPC call status 1252 * 6. Sort out the RPC call status
1138 */ 1253 */
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index 1ef6e46d9da2..8487aa0f1f5a 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -141,12 +141,14 @@ EXPORT_SYMBOL_GPL(rpc_free_iostats);
141void rpc_count_iostats(struct rpc_task *task) 141void rpc_count_iostats(struct rpc_task *task)
142{ 142{
143 struct rpc_rqst *req = task->tk_rqstp; 143 struct rpc_rqst *req = task->tk_rqstp;
144 struct rpc_iostats *stats = task->tk_client->cl_metrics; 144 struct rpc_iostats *stats;
145 struct rpc_iostats *op_metrics; 145 struct rpc_iostats *op_metrics;
146 long rtt, execute, queue; 146 long rtt, execute, queue;
147 147
148 if (!stats || !req) 148 if (!task->tk_client || !task->tk_client->cl_metrics || !req)
149 return; 149 return;
150
151 stats = task->tk_client->cl_metrics;
150 op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx]; 152 op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx];
151 153
152 op_metrics->om_ops++; 154 op_metrics->om_ops++;
diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
new file mode 100644
index 000000000000..5d9dd742264b
--- /dev/null
+++ b/net/sunrpc/sunrpc.h
@@ -0,0 +1,37 @@
1/******************************************************************************
2
3(c) 2008 NetApp. All Rights Reserved.
4
5NetApp provides this source code under the GPL v2 License.
6The GPL v2 license is available at
7http://opensource.org/licenses/gpl-license.php.
8
9THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
10"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
11LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
12A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
13CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
14EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
15PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
16PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
17LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
18NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
19SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
20
21******************************************************************************/
22
23/*
24 * Functions and macros used internally by RPC
25 */
26
27#ifndef _NET_SUNRPC_SUNRPC_H
28#define _NET_SUNRPC_SUNRPC_H
29
30static inline int rpc_reply_expected(struct rpc_task *task)
31{
32 return (task->tk_msg.rpc_proc != NULL) &&
33 (task->tk_msg.rpc_proc->p_decode != NULL);
34}
35
36#endif /* _NET_SUNRPC_SUNRPC_H */
37
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 52739f82df1e..0eea2bfe111b 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -12,8 +12,9 @@
12 * - Next, the caller puts together the RPC message, stuffs it into 12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_transmit(). 13 * the request struct, and calls xprt_transmit().
14 * - xprt_transmit sends the message and installs the caller on the 14 * - xprt_transmit sends the message and installs the caller on the
15 * transport's wait list. At the same time, it installs a timer that 15 * transport's wait list. At the same time, if a reply is expected,
16 * is run after the packet's timeout has expired. 16 * it installs a timer that is run after the packet's timeout has
17 * expired.
17 * - When a packet arrives, the data_ready handler walks the list of 18 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that transport. If a matching XID is found, the 19 * pending requests for that transport. If a matching XID is found, the
19 * caller is woken up, and the timer removed. 20 * caller is woken up, and the timer removed.
@@ -46,6 +47,8 @@
46#include <linux/sunrpc/clnt.h> 47#include <linux/sunrpc/clnt.h>
47#include <linux/sunrpc/metrics.h> 48#include <linux/sunrpc/metrics.h>
48 49
50#include "sunrpc.h"
51
49/* 52/*
50 * Local variables 53 * Local variables
51 */ 54 */
@@ -873,7 +876,10 @@ void xprt_transmit(struct rpc_task *task)
873 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 876 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
874 877
875 if (!req->rq_received) { 878 if (!req->rq_received) {
876 if (list_empty(&req->rq_list)) { 879 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
880 /*
881 * Add to the list only if we're expecting a reply
882 */
877 spin_lock_bh(&xprt->transport_lock); 883 spin_lock_bh(&xprt->transport_lock);
878 /* Update the softirq receive buffer */ 884 /* Update the softirq receive buffer */
879 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 885 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
@@ -908,8 +914,13 @@ void xprt_transmit(struct rpc_task *task)
908 /* Don't race with disconnect */ 914 /* Don't race with disconnect */
909 if (!xprt_connected(xprt)) 915 if (!xprt_connected(xprt))
910 task->tk_status = -ENOTCONN; 916 task->tk_status = -ENOTCONN;
911 else if (!req->rq_received) 917 else if (!req->rq_received && rpc_reply_expected(task)) {
918 /*
919 * Sleep on the pending queue since
920 * we're expecting a reply.
921 */
912 rpc_sleep_on(&xprt->pending, task, xprt_timer); 922 rpc_sleep_on(&xprt->pending, task, xprt_timer);
923 }
913 spin_unlock_bh(&xprt->transport_lock); 924 spin_unlock_bh(&xprt->transport_lock);
914} 925}
915 926
@@ -982,11 +993,17 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
982 */ 993 */
983void xprt_release(struct rpc_task *task) 994void xprt_release(struct rpc_task *task)
984{ 995{
985 struct rpc_xprt *xprt = task->tk_xprt; 996 struct rpc_xprt *xprt;
986 struct rpc_rqst *req; 997 struct rpc_rqst *req;
998 int is_bc_request;
987 999
988 if (!(req = task->tk_rqstp)) 1000 if (!(req = task->tk_rqstp))
989 return; 1001 return;
1002
1003 /* Preallocated backchannel request? */
1004 is_bc_request = bc_prealloc(req);
1005
1006 xprt = req->rq_xprt;
990 rpc_count_iostats(task); 1007 rpc_count_iostats(task);
991 spin_lock_bh(&xprt->transport_lock); 1008 spin_lock_bh(&xprt->transport_lock);
992 xprt->ops->release_xprt(xprt, task); 1009 xprt->ops->release_xprt(xprt, task);
@@ -999,10 +1016,19 @@ void xprt_release(struct rpc_task *task)
999 mod_timer(&xprt->timer, 1016 mod_timer(&xprt->timer,
1000 xprt->last_used + xprt->idle_timeout); 1017 xprt->last_used + xprt->idle_timeout);
1001 spin_unlock_bh(&xprt->transport_lock); 1018 spin_unlock_bh(&xprt->transport_lock);
1002 xprt->ops->buf_free(req->rq_buffer); 1019 if (!bc_prealloc(req))
1020 xprt->ops->buf_free(req->rq_buffer);
1003 task->tk_rqstp = NULL; 1021 task->tk_rqstp = NULL;
1004 if (req->rq_release_snd_buf) 1022 if (req->rq_release_snd_buf)
1005 req->rq_release_snd_buf(req); 1023 req->rq_release_snd_buf(req);
1024
1025 /*
1026 * Early exit if this is a backchannel preallocated request.
1027 * There is no need to have it added to the RPC slot list.
1028 */
1029 if (is_bc_request)
1030 return;
1031
1006 memset(req, 0, sizeof(*req)); /* mark unused */ 1032 memset(req, 0, sizeof(*req)); /* mark unused */
1007 1033
1008 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1034 dprintk("RPC: %5u release request %p\n", task->tk_pid, req);