aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorTrond Myklebust <Trond.Myklebust@netapp.com>2013-04-14 10:49:37 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2013-04-14 12:26:02 -0400
commitba60eb25ff6be6f8e60488cdfd454e5c612bce60 (patch)
treeee5ebe0a500e5926cf52af808096d8c56eae8657 /net/sunrpc
parentb570a975ed276335dc7d148658c1f880ac0a507f (diff)
SUNRPC: Fix a livelock problem in the xprt->backlog queue
This patch ensures that we throttle new RPC requests if there are requests already waiting in the xprt->backlog queue. The reason for doing this is to fix livelock issues that can occur when an existing (high priority) task is waiting in the backlog queue, gets woken up by xprt_free_slot(), but a new task then steals the slot. Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/clnt.c17
-rw-r--r--net/sunrpc/xprt.c61
2 files changed, 74 insertions, 4 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b95a0a2d5eea..a80ee9b80dcf 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1306,6 +1306,8 @@ call_reserve(struct rpc_task *task)
1306 xprt_reserve(task); 1306 xprt_reserve(task);
1307} 1307}
1308 1308
1309static void call_retry_reserve(struct rpc_task *task);
1310
1309/* 1311/*
1310 * 1b. Grok the result of xprt_reserve() 1312 * 1b. Grok the result of xprt_reserve()
1311 */ 1313 */
@@ -1347,7 +1349,7 @@ call_reserveresult(struct rpc_task *task)
1347 case -ENOMEM: 1349 case -ENOMEM:
1348 rpc_delay(task, HZ >> 2); 1350 rpc_delay(task, HZ >> 2);
1349 case -EAGAIN: /* woken up; retry */ 1351 case -EAGAIN: /* woken up; retry */
1350 task->tk_action = call_reserve; 1352 task->tk_action = call_retry_reserve;
1351 return; 1353 return;
1352 case -EIO: /* probably a shutdown */ 1354 case -EIO: /* probably a shutdown */
1353 break; 1355 break;
@@ -1360,6 +1362,19 @@ call_reserveresult(struct rpc_task *task)
1360} 1362}
1361 1363
1362/* 1364/*
1365 * 1c. Retry reserving an RPC call slot
1366 */
1367static void
1368call_retry_reserve(struct rpc_task *task)
1369{
1370 dprint_status(task);
1371
1372 task->tk_status = 0;
1373 task->tk_action = call_reserveresult;
1374 xprt_retry_reserve(task);
1375}
1376
1377/*
1363 * 2. Bind and/or refresh the credentials 1378 * 2. Bind and/or refresh the credentials
1364 */ 1379 */
1365static void 1380static void
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index b7478d5e7ffd..745fca3cfd36 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -948,6 +948,34 @@ void xprt_transmit(struct rpc_task *task)
948 spin_unlock_bh(&xprt->transport_lock); 948 spin_unlock_bh(&xprt->transport_lock);
949} 949}
950 950
951static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
952{
953 set_bit(XPRT_CONGESTED, &xprt->state);
954 rpc_sleep_on(&xprt->backlog, task, NULL);
955}
956
957static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
958{
959 if (rpc_wake_up_next(&xprt->backlog) == NULL)
960 clear_bit(XPRT_CONGESTED, &xprt->state);
961}
962
963static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
964{
965 bool ret = false;
966
967 if (!test_bit(XPRT_CONGESTED, &xprt->state))
968 goto out;
969 spin_lock(&xprt->reserve_lock);
970 if (test_bit(XPRT_CONGESTED, &xprt->state)) {
971 rpc_sleep_on(&xprt->backlog, task, NULL);
972 ret = true;
973 }
974 spin_unlock(&xprt->reserve_lock);
975out:
976 return ret;
977}
978
951static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) 979static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
952{ 980{
953 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 981 struct rpc_rqst *req = ERR_PTR(-EAGAIN);
@@ -992,7 +1020,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
992 task->tk_status = -ENOMEM; 1020 task->tk_status = -ENOMEM;
993 break; 1021 break;
994 case -EAGAIN: 1022 case -EAGAIN:
995 rpc_sleep_on(&xprt->backlog, task, NULL); 1023 xprt_add_backlog(xprt, task);
996 dprintk("RPC: waiting for request slot\n"); 1024 dprintk("RPC: waiting for request slot\n");
997 default: 1025 default:
998 task->tk_status = -EAGAIN; 1026 task->tk_status = -EAGAIN;
@@ -1028,7 +1056,7 @@ static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1028 memset(req, 0, sizeof(*req)); /* mark unused */ 1056 memset(req, 0, sizeof(*req)); /* mark unused */
1029 list_add(&req->rq_list, &xprt->free); 1057 list_add(&req->rq_list, &xprt->free);
1030 } 1058 }
1031 rpc_wake_up_next(&xprt->backlog); 1059 xprt_wake_up_backlog(xprt);
1032 spin_unlock(&xprt->reserve_lock); 1060 spin_unlock(&xprt->reserve_lock);
1033} 1061}
1034 1062
@@ -1092,7 +1120,8 @@ EXPORT_SYMBOL_GPL(xprt_free);
1092 * xprt_reserve - allocate an RPC request slot 1120 * xprt_reserve - allocate an RPC request slot
1093 * @task: RPC task requesting a slot allocation 1121 * @task: RPC task requesting a slot allocation
1094 * 1122 *
1095 * If no more slots are available, place the task on the transport's 1123 * If the transport is marked as being congested, or if no more
1124 * slots are available, place the task on the transport's
1096 * backlog queue. 1125 * backlog queue.
1097 */ 1126 */
1098void xprt_reserve(struct rpc_task *task) 1127void xprt_reserve(struct rpc_task *task)
@@ -1107,6 +1136,32 @@ void xprt_reserve(struct rpc_task *task)
1107 task->tk_status = -EAGAIN; 1136 task->tk_status = -EAGAIN;
1108 rcu_read_lock(); 1137 rcu_read_lock();
1109 xprt = rcu_dereference(task->tk_client->cl_xprt); 1138 xprt = rcu_dereference(task->tk_client->cl_xprt);
1139 if (!xprt_throttle_congested(xprt, task))
1140 xprt->ops->alloc_slot(xprt, task);
1141 rcu_read_unlock();
1142}
1143
1144/**
1145 * xprt_retry_reserve - allocate an RPC request slot
1146 * @task: RPC task requesting a slot allocation
1147 *
1148 * If no more slots are available, place the task on the transport's
1149 * backlog queue.
1150 * Note that the only difference with xprt_reserve is that we now
1151 * ignore the value of the XPRT_CONGESTED flag.
1152 */
1153void xprt_retry_reserve(struct rpc_task *task)
1154{
1155 struct rpc_xprt *xprt;
1156
1157 task->tk_status = 0;
1158 if (task->tk_rqstp != NULL)
1159 return;
1160
1161 task->tk_timeout = 0;
1162 task->tk_status = -EAGAIN;
1163 rcu_read_lock();
1164 xprt = rcu_dereference(task->tk_client->cl_xprt);
1110 xprt->ops->alloc_slot(xprt, task); 1165 xprt->ops->alloc_slot(xprt, task);
1111 rcu_read_unlock(); 1166 rcu_read_unlock();
1112} 1167}