aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorRicardo Labiaga <Ricardo.Labiaga@netapp.com>2009-04-01 09:23:02 -0400
committerBenny Halevy <bhalevy@panasas.com>2009-06-17 16:06:16 -0400
commit44b98efdd0a205bdca2cb63493350d06ff6804b1 (patch)
treed375e3a8a6084672f15e9b12bec082613448340e /net
parent4a8d70bfef01f8e6b27785e2625e88e9a80924a5 (diff)
nfs41: New xs_tcp_read_data()
Handles RPC replies and backchannel callbacks. Traditionally the NFS client has expected only RPC replies on its open connections. With NFSv4.1, callbacks can arrive over an existing open connection. This patch refactors the old xs_tcp_read_request() into an RPC reply handler: xs_tcp_read_reply(), a new backchannel callback handler: xs_tcp_read_callback(), and a common routine to read the data off the transport: xs_tcp_read_common(). The new xs_tcp_read_callback() queues callback requests onto a queue where the callback service (a separate thread) is listening for the processing. This patch incorporates work and suggestions from Rahul Iyer (iyer@netapp.com) and Benny Halevy (bhalevy@panasas.com). xs_tcp_read_callback() drops the connection when the number of expected callbacks is exceeded. Use xprt_force_disconnect(), ensuring tasks on the pending queue are awaken on disconnect. [nfs41: Keep track of RPC call/reply direction with a flag] [nfs41: Preallocate rpc_rqst receive buffer for handling callbacks] Signed-off-by: Ricardo Labiaga <ricardo.labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com> [nfs41: sunrpc: xs_tcp_read_callback() should use xprt_force_disconnect()] Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com> [Moves embedded #ifdefs into #ifdef function blocks] Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com> Signed-off-by: Benny Halevy <bhalevy@panasas.com>
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/xprtsock.c144
1 files changed, 126 insertions, 18 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index a48df1449ece..e3e3a57116fb 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -34,6 +34,9 @@
34#include <linux/sunrpc/sched.h> 34#include <linux/sunrpc/sched.h>
35#include <linux/sunrpc/xprtsock.h> 35#include <linux/sunrpc/xprtsock.h>
36#include <linux/file.h> 36#include <linux/file.h>
37#ifdef CONFIG_NFS_V4_1
38#include <linux/sunrpc/bc_xprt.h>
39#endif
37 40
38#include <net/sock.h> 41#include <net/sock.h>
39#include <net/checksum.h> 42#include <net/checksum.h>
@@ -1044,25 +1047,16 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1044 xs_tcp_check_fraghdr(transport); 1047 xs_tcp_check_fraghdr(transport);
1045} 1048}
1046 1049
1047static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) 1050static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1051 struct xdr_skb_reader *desc,
1052 struct rpc_rqst *req)
1048{ 1053{
1049 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1054 struct sock_xprt *transport =
1050 struct rpc_rqst *req; 1055 container_of(xprt, struct sock_xprt, xprt);
1051 struct xdr_buf *rcvbuf; 1056 struct xdr_buf *rcvbuf;
1052 size_t len; 1057 size_t len;
1053 ssize_t r; 1058 ssize_t r;
1054 1059
1055 /* Find and lock the request corresponding to this xid */
1056 spin_lock(&xprt->transport_lock);
1057 req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1058 if (!req) {
1059 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1060 dprintk("RPC: XID %08x request not found!\n",
1061 ntohl(transport->tcp_xid));
1062 spin_unlock(&xprt->transport_lock);
1063 return;
1064 }
1065
1066 rcvbuf = &req->rq_private_buf; 1060 rcvbuf = &req->rq_private_buf;
1067 1061
1068 if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { 1062 if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
@@ -1114,7 +1108,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
1114 "tcp_offset = %u, tcp_reclen = %u\n", 1108 "tcp_offset = %u, tcp_reclen = %u\n",
1115 xprt, transport->tcp_copied, 1109 xprt, transport->tcp_copied,
1116 transport->tcp_offset, transport->tcp_reclen); 1110 transport->tcp_offset, transport->tcp_reclen);
1117 goto out; 1111 return;
1118 } 1112 }
1119 1113
1120 dprintk("RPC: XID %08x read %Zd bytes\n", 1114 dprintk("RPC: XID %08x read %Zd bytes\n",
@@ -1130,11 +1124,125 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea
1130 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1124 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1131 } 1125 }
1132 1126
1133out: 1127 return;
1128}
1129
1130/*
1131 * Finds the request corresponding to the RPC xid and invokes the common
1132 * tcp read code to read the data.
1133 */
1134static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1135 struct xdr_skb_reader *desc)
1136{
1137 struct sock_xprt *transport =
1138 container_of(xprt, struct sock_xprt, xprt);
1139 struct rpc_rqst *req;
1140
1141 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
1142
1143 /* Find and lock the request corresponding to this xid */
1144 spin_lock(&xprt->transport_lock);
1145 req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1146 if (!req) {
1147 dprintk("RPC: XID %08x request not found!\n",
1148 ntohl(transport->tcp_xid));
1149 spin_unlock(&xprt->transport_lock);
1150 return -1;
1151 }
1152
1153 xs_tcp_read_common(xprt, desc, req);
1154
1134 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1155 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1135 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1156 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1157
1136 spin_unlock(&xprt->transport_lock); 1158 spin_unlock(&xprt->transport_lock);
1137 xs_tcp_check_fraghdr(transport); 1159 return 0;
1160}
1161
1162#if defined(CONFIG_NFS_V4_1)
1163/*
1164 * Obtains an rpc_rqst previously allocated and invokes the common
1165 * tcp read code to read the data. The result is placed in the callback
1166 * queue.
1167 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1168 * connection and return -1.
1169 */
1170static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1171 struct xdr_skb_reader *desc)
1172{
1173 struct sock_xprt *transport =
1174 container_of(xprt, struct sock_xprt, xprt);
1175 struct rpc_rqst *req;
1176
1177 req = xprt_alloc_bc_request(xprt);
1178 if (req == NULL) {
1179 printk(KERN_WARNING "Callback slot table overflowed\n");
1180 xprt_force_disconnect(xprt);
1181 return -1;
1182 }
1183
1184 req->rq_xid = transport->tcp_xid;
1185 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid));
1186 xs_tcp_read_common(xprt, desc, req);
1187
1188 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1189 struct svc_serv *bc_serv = xprt->bc_serv;
1190
1191 /*
1192 * Add callback request to callback list. The callback
1193 * service sleeps on the sv_cb_waitq waiting for new
1194 * requests. Wake it up after adding enqueing the
1195 * request.
1196 */
1197 dprintk("RPC: add callback request to list\n");
1198 spin_lock(&bc_serv->sv_cb_lock);
1199 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1200 spin_unlock(&bc_serv->sv_cb_lock);
1201 wake_up(&bc_serv->sv_cb_waitq);
1202 }
1203
1204 req->rq_private_buf.len = transport->tcp_copied;
1205
1206 return 0;
1207}
1208
1209static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1210 struct xdr_skb_reader *desc)
1211{
1212 struct sock_xprt *transport =
1213 container_of(xprt, struct sock_xprt, xprt);
1214
1215 return (transport->tcp_flags & TCP_RPC_REPLY) ?
1216 xs_tcp_read_reply(xprt, desc) :
1217 xs_tcp_read_callback(xprt, desc);
1218}
1219#else
1220static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1221 struct xdr_skb_reader *desc)
1222{
1223 return xs_tcp_read_reply(xprt, desc);
1224}
1225#endif /* CONFIG_NFS_V4_1 */
1226
1227/*
1228 * Read data off the transport. This can be either an RPC_CALL or an
1229 * RPC_REPLY. Relay the processing to helper functions.
1230 */
1231static void xs_tcp_read_data(struct rpc_xprt *xprt,
1232 struct xdr_skb_reader *desc)
1233{
1234 struct sock_xprt *transport =
1235 container_of(xprt, struct sock_xprt, xprt);
1236
1237 if (_xs_tcp_read_data(xprt, desc) == 0)
1238 xs_tcp_check_fraghdr(transport);
1239 else {
1240 /*
1241 * The transport_lock protects the request handling.
1242 * There's no need to hold it to update the tcp_flags.
1243 */
1244 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1245 }
1138} 1246}
1139 1247
1140static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1248static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
@@ -1181,7 +1289,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
1181 } 1289 }
1182 /* Read in the request data */ 1290 /* Read in the request data */
1183 if (transport->tcp_flags & TCP_RCV_COPY_DATA) { 1291 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1184 xs_tcp_read_request(xprt, &desc); 1292 xs_tcp_read_data(xprt, &desc);
1185 continue; 1293 continue;
1186 } 1294 }
1187 /* Skip over any trailing bytes on short reads */ 1295 /* Skip over any trailing bytes on short reads */