aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTrond Myklebust <Trond.Myklebust@netapp.com>2011-02-21 14:05:41 -0500
committerTrond Myklebust <Trond.Myklebust@netapp.com>2011-03-10 15:04:52 -0500
commitbf294b41cefcb22fc3139e0f42c5b3f06728bd5e (patch)
tree250251c040a2d2e278b5a2ddd03c8d20a27be129
parent214d93b02c4fe93638ad268613c9702a81ed9192 (diff)
SUNRPC: Close a race in __rpc_wait_for_completion_task()
Although they run as rpciod background tasks, under normal operation (i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck() and nfs4_do_close() want to be fully synchronous. This means that when we exit, we want all references to the rpc_task to be gone, and we want any dentry references etc. held by that task to be released. For this reason these functions call __rpc_wait_for_completion_task(), followed by rpc_put_task() in the expectation that the latter will be releasing the last reference to the rpc_task, and thus ensuring that the callback_ops->rpc_release() has been called synchronously. This patch fixes a race which exists due to the fact that rpciod calls rpc_complete_task() (in order to wake up the callers of __rpc_wait_for_completion_task()) and then subsequently calls rpc_put_task() without ensuring that these two steps are done atomically. In order to avoid adding new spin locks, the patch uses the existing waitqueue spin lock to order the rpc_task reference count releases between the waiting process and rpciod. The common case where nobody is waiting for completion is optimised for by checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task reference count is 1: in those cases we drop trying to grab the spin lock, and immediately free up the rpc_task. Those few processes that need to put the rpc_task from inside an asynchronous context and that do not care about ordering are given a new helper: rpc_put_task_async(). Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
-rw-r--r--fs/nfs/nfs4proc.c4
-rw-r--r--fs/nfs/unlink.c2
-rw-r--r--include/linux/sunrpc/sched.h1
-rw-r--r--kernel/sched.c1
-rw-r--r--net/sunrpc/sched.c75
5 files changed, 66 insertions, 17 deletions
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 1ff76acc7e98..d1ed67145cf3 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -4150,7 +4150,7 @@ static void nfs4_lock_release(void *calldata)
4150 task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp, 4150 task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp,
4151 data->arg.lock_seqid); 4151 data->arg.lock_seqid);
4152 if (!IS_ERR(task)) 4152 if (!IS_ERR(task))
4153 rpc_put_task(task); 4153 rpc_put_task_async(task);
4154 dprintk("%s: cancelling lock!\n", __func__); 4154 dprintk("%s: cancelling lock!\n", __func__);
4155 } else 4155 } else
4156 nfs_free_seqid(data->arg.lock_seqid); 4156 nfs_free_seqid(data->arg.lock_seqid);
@@ -5227,7 +5227,7 @@ static int nfs41_proc_async_sequence(struct nfs_client *clp, struct rpc_cred *cr
5227 if (IS_ERR(task)) 5227 if (IS_ERR(task))
5228 ret = PTR_ERR(task); 5228 ret = PTR_ERR(task);
5229 else 5229 else
5230 rpc_put_task(task); 5230 rpc_put_task_async(task);
5231 dprintk("<-- %s status=%d\n", __func__, ret); 5231 dprintk("<-- %s status=%d\n", __func__, ret);
5232 return ret; 5232 return ret;
5233} 5233}
diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c
index e313a51acdd1..6481d537d69d 100644
--- a/fs/nfs/unlink.c
+++ b/fs/nfs/unlink.c
@@ -180,7 +180,7 @@ static int nfs_do_call_unlink(struct dentry *parent, struct inode *dir, struct n
180 task_setup_data.rpc_client = NFS_CLIENT(dir); 180 task_setup_data.rpc_client = NFS_CLIENT(dir);
181 task = rpc_run_task(&task_setup_data); 181 task = rpc_run_task(&task_setup_data);
182 if (!IS_ERR(task)) 182 if (!IS_ERR(task))
183 rpc_put_task(task); 183 rpc_put_task_async(task);
184 return 1; 184 return 1;
185} 185}
186 186
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 88513fd8e208..d81db8012c63 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -212,6 +212,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
212struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req, 212struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
213 const struct rpc_call_ops *ops); 213 const struct rpc_call_ops *ops);
214void rpc_put_task(struct rpc_task *); 214void rpc_put_task(struct rpc_task *);
215void rpc_put_task_async(struct rpc_task *);
215void rpc_exit_task(struct rpc_task *); 216void rpc_exit_task(struct rpc_task *);
216void rpc_exit(struct rpc_task *, int); 217void rpc_exit(struct rpc_task *, int);
217void rpc_release_calldata(const struct rpc_call_ops *, void *); 218void rpc_release_calldata(const struct rpc_call_ops *, void *);
diff --git a/kernel/sched.c b/kernel/sched.c
index 18d38e4ec7ba..42eab5a8437d 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -4213,6 +4213,7 @@ void __wake_up_locked_key(wait_queue_head_t *q, unsigned int mode, void *key)
4213{ 4213{
4214 __wake_up_common(q, mode, 1, 0, key); 4214 __wake_up_common(q, mode, 1, 0, key);
4215} 4215}
4216EXPORT_SYMBOL_GPL(__wake_up_locked_key);
4216 4217
4217/** 4218/**
4218 * __wake_up_sync_key - wake up threads blocked on a waitqueue. 4219 * __wake_up_sync_key - wake up threads blocked on a waitqueue.
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 243fc09b164e..59e599498e37 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)
252 252
253/* 253/*
254 * Mark an RPC call as having completed by clearing the 'active' bit 254 * Mark an RPC call as having completed by clearing the 'active' bit
255 * and then waking up all tasks that were sleeping.
255 */ 256 */
256static void rpc_mark_complete_task(struct rpc_task *task) 257static int rpc_complete_task(struct rpc_task *task)
257{ 258{
258 smp_mb__before_clear_bit(); 259 void *m = &task->tk_runstate;
260 wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
261 struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
262 unsigned long flags;
263 int ret;
264
265 spin_lock_irqsave(&wq->lock, flags);
259 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 266 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
260 smp_mb__after_clear_bit(); 267 ret = atomic_dec_and_test(&task->tk_count);
261 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); 268 if (waitqueue_active(wq))
269 __wake_up_locked_key(wq, TASK_NORMAL, &k);
270 spin_unlock_irqrestore(&wq->lock, flags);
271 return ret;
262} 272}
263 273
264/* 274/*
265 * Allow callers to wait for completion of an RPC call 275 * Allow callers to wait for completion of an RPC call
276 *
277 * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
278 * to enforce taking of the wq->lock and hence avoid races with
279 * rpc_complete_task().
266 */ 280 */
267int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 281int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
268{ 282{
269 if (action == NULL) 283 if (action == NULL)
270 action = rpc_wait_bit_killable; 284 action = rpc_wait_bit_killable;
271 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 285 return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
272 action, TASK_KILLABLE); 286 action, TASK_KILLABLE);
273} 287}
274EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); 288EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -857,34 +871,67 @@ static void rpc_async_release(struct work_struct *work)
857 rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); 871 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
858} 872}
859 873
860void rpc_put_task(struct rpc_task *task) 874static void rpc_release_resources_task(struct rpc_task *task)
861{ 875{
862 if (!atomic_dec_and_test(&task->tk_count))
863 return;
864 /* Release resources */
865 if (task->tk_rqstp) 876 if (task->tk_rqstp)
866 xprt_release(task); 877 xprt_release(task);
867 if (task->tk_msg.rpc_cred) 878 if (task->tk_msg.rpc_cred)
868 put_rpccred(task->tk_msg.rpc_cred); 879 put_rpccred(task->tk_msg.rpc_cred);
869 rpc_task_release_client(task); 880 rpc_task_release_client(task);
870 if (task->tk_workqueue != NULL) { 881}
882
883static void rpc_final_put_task(struct rpc_task *task,
884 struct workqueue_struct *q)
885{
886 if (q != NULL) {
871 INIT_WORK(&task->u.tk_work, rpc_async_release); 887 INIT_WORK(&task->u.tk_work, rpc_async_release);
872 queue_work(task->tk_workqueue, &task->u.tk_work); 888 queue_work(q, &task->u.tk_work);
873 } else 889 } else
874 rpc_free_task(task); 890 rpc_free_task(task);
875} 891}
892
893static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
894{
895 if (atomic_dec_and_test(&task->tk_count)) {
896 rpc_release_resources_task(task);
897 rpc_final_put_task(task, q);
898 }
899}
900
901void rpc_put_task(struct rpc_task *task)
902{
903 rpc_do_put_task(task, NULL);
904}
876EXPORT_SYMBOL_GPL(rpc_put_task); 905EXPORT_SYMBOL_GPL(rpc_put_task);
877 906
907void rpc_put_task_async(struct rpc_task *task)
908{
909 rpc_do_put_task(task, task->tk_workqueue);
910}
911EXPORT_SYMBOL_GPL(rpc_put_task_async);
912
878static void rpc_release_task(struct rpc_task *task) 913static void rpc_release_task(struct rpc_task *task)
879{ 914{
880 dprintk("RPC: %5u release task\n", task->tk_pid); 915 dprintk("RPC: %5u release task\n", task->tk_pid);
881 916
882 BUG_ON (RPC_IS_QUEUED(task)); 917 BUG_ON (RPC_IS_QUEUED(task));
883 918
884 /* Wake up anyone who is waiting for task completion */ 919 rpc_release_resources_task(task);
885 rpc_mark_complete_task(task);
886 920
887 rpc_put_task(task); 921 /*
922 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
923 * so it should be safe to use task->tk_count as a test for whether
924 * or not any other processes still hold references to our rpc_task.
925 */
926 if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
927 /* Wake up anyone who may be waiting for task completion */
928 if (!rpc_complete_task(task))
929 return;
930 } else {
931 if (!atomic_dec_and_test(&task->tk_count))
932 return;
933 }
934 rpc_final_put_task(task, task->tk_workqueue);
888} 935}
889 936
890int rpciod_up(void) 937int rpciod_up(void)