aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c110
1 files changed, 72 insertions, 38 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 243fc09b164e..6b43ee7221d5 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)
252 252
253/* 253/*
254 * Mark an RPC call as having completed by clearing the 'active' bit 254 * Mark an RPC call as having completed by clearing the 'active' bit
255 * and then waking up all tasks that were sleeping.
255 */ 256 */
256static void rpc_mark_complete_task(struct rpc_task *task) 257static int rpc_complete_task(struct rpc_task *task)
257{ 258{
258 smp_mb__before_clear_bit(); 259 void *m = &task->tk_runstate;
260 wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
261 struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
262 unsigned long flags;
263 int ret;
264
265 spin_lock_irqsave(&wq->lock, flags);
259 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 266 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
260 smp_mb__after_clear_bit(); 267 ret = atomic_dec_and_test(&task->tk_count);
261 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); 268 if (waitqueue_active(wq))
269 __wake_up_locked_key(wq, TASK_NORMAL, &k);
270 spin_unlock_irqrestore(&wq->lock, flags);
271 return ret;
262} 272}
263 273
264/* 274/*
265 * Allow callers to wait for completion of an RPC call 275 * Allow callers to wait for completion of an RPC call
276 *
277 * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
278 * to enforce taking of the wq->lock and hence avoid races with
279 * rpc_complete_task().
266 */ 280 */
267int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 281int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
268{ 282{
269 if (action == NULL) 283 if (action == NULL)
270 action = rpc_wait_bit_killable; 284 action = rpc_wait_bit_killable;
271 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 285 return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
272 action, TASK_KILLABLE); 286 action, TASK_KILLABLE);
273} 287}
274EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); 288EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -285,15 +299,8 @@ static void rpc_make_runnable(struct rpc_task *task)
285 if (rpc_test_and_set_running(task)) 299 if (rpc_test_and_set_running(task))
286 return; 300 return;
287 if (RPC_IS_ASYNC(task)) { 301 if (RPC_IS_ASYNC(task)) {
288 int status;
289
290 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 302 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
291 status = queue_work(rpciod_workqueue, &task->u.tk_work); 303 queue_work(rpciod_workqueue, &task->u.tk_work);
292 if (status < 0) {
293 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
294 task->tk_status = status;
295 return;
296 }
297 } else 304 } else
298 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 305 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
299} 306}
@@ -623,14 +630,12 @@ static void __rpc_execute(struct rpc_task *task)
623 save_callback = task->tk_callback; 630 save_callback = task->tk_callback;
624 task->tk_callback = NULL; 631 task->tk_callback = NULL;
625 save_callback(task); 632 save_callback(task);
626 } 633 } else {
627 634 /*
628 /* 635 * Perform the next FSM step.
629 * Perform the next FSM step. 636 * tk_action may be NULL when the task has been killed
630 * tk_action may be NULL when the task has been killed 637 * by someone else.
631 * by someone else. 638 */
632 */
633 if (!RPC_IS_QUEUED(task)) {
634 if (task->tk_action == NULL) 639 if (task->tk_action == NULL)
635 break; 640 break;
636 task->tk_action(task); 641 task->tk_action(task);
@@ -829,12 +834,6 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
829 } 834 }
830 835
831 rpc_init_task(task, setup_data); 836 rpc_init_task(task, setup_data);
832 if (task->tk_status < 0) {
833 int err = task->tk_status;
834 rpc_put_task(task);
835 return ERR_PTR(err);
836 }
837
838 task->tk_flags |= flags; 837 task->tk_flags |= flags;
839 dprintk("RPC: allocated task %p\n", task); 838 dprintk("RPC: allocated task %p\n", task);
840 return task; 839 return task;
@@ -857,34 +856,69 @@ static void rpc_async_release(struct work_struct *work)
857 rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); 856 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
858} 857}
859 858
860void rpc_put_task(struct rpc_task *task) 859static void rpc_release_resources_task(struct rpc_task *task)
861{ 860{
862 if (!atomic_dec_and_test(&task->tk_count))
863 return;
864 /* Release resources */
865 if (task->tk_rqstp) 861 if (task->tk_rqstp)
866 xprt_release(task); 862 xprt_release(task);
867 if (task->tk_msg.rpc_cred) 863 if (task->tk_msg.rpc_cred) {
868 put_rpccred(task->tk_msg.rpc_cred); 864 put_rpccred(task->tk_msg.rpc_cred);
865 task->tk_msg.rpc_cred = NULL;
866 }
869 rpc_task_release_client(task); 867 rpc_task_release_client(task);
870 if (task->tk_workqueue != NULL) { 868}
869
870static void rpc_final_put_task(struct rpc_task *task,
871 struct workqueue_struct *q)
872{
873 if (q != NULL) {
871 INIT_WORK(&task->u.tk_work, rpc_async_release); 874 INIT_WORK(&task->u.tk_work, rpc_async_release);
872 queue_work(task->tk_workqueue, &task->u.tk_work); 875 queue_work(q, &task->u.tk_work);
873 } else 876 } else
874 rpc_free_task(task); 877 rpc_free_task(task);
875} 878}
879
880static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
881{
882 if (atomic_dec_and_test(&task->tk_count)) {
883 rpc_release_resources_task(task);
884 rpc_final_put_task(task, q);
885 }
886}
887
888void rpc_put_task(struct rpc_task *task)
889{
890 rpc_do_put_task(task, NULL);
891}
876EXPORT_SYMBOL_GPL(rpc_put_task); 892EXPORT_SYMBOL_GPL(rpc_put_task);
877 893
894void rpc_put_task_async(struct rpc_task *task)
895{
896 rpc_do_put_task(task, task->tk_workqueue);
897}
898EXPORT_SYMBOL_GPL(rpc_put_task_async);
899
878static void rpc_release_task(struct rpc_task *task) 900static void rpc_release_task(struct rpc_task *task)
879{ 901{
880 dprintk("RPC: %5u release task\n", task->tk_pid); 902 dprintk("RPC: %5u release task\n", task->tk_pid);
881 903
882 BUG_ON (RPC_IS_QUEUED(task)); 904 BUG_ON (RPC_IS_QUEUED(task));
883 905
884 /* Wake up anyone who is waiting for task completion */ 906 rpc_release_resources_task(task);
885 rpc_mark_complete_task(task);
886 907
887 rpc_put_task(task); 908 /*
909 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
910 * so it should be safe to use task->tk_count as a test for whether
911 * or not any other processes still hold references to our rpc_task.
912 */
913 if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
914 /* Wake up anyone who may be waiting for task completion */
915 if (!rpc_complete_task(task))
916 return;
917 } else {
918 if (!atomic_dec_and_test(&task->tk_count))
919 return;
920 }
921 rpc_final_put_task(task, task->tk_workqueue);
888} 922}
889 923
890int rpciod_up(void) 924int rpciod_up(void)
@@ -908,7 +942,7 @@ static int rpciod_start(void)
908 * Create the rpciod thread and wait for it to start. 942 * Create the rpciod thread and wait for it to start.
909 */ 943 */
910 dprintk("RPC: creating workqueue rpciod\n"); 944 dprintk("RPC: creating workqueue rpciod\n");
911 wq = alloc_workqueue("rpciod", WQ_RESCUER, 0); 945 wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
912 rpciod_workqueue = wq; 946 rpciod_workqueue = wq;
913 return rpciod_workqueue != NULL; 947 return rpciod_workqueue != NULL;
914} 948}