aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
authorTrond Myklebust <Trond.Myklebust@netapp.com>2006-01-03 03:55:06 -0500
committerTrond Myklebust <Trond.Myklebust@netapp.com>2006-01-06 14:58:40 -0500
commit44c288732fdbd7e38460d156a40d29590bf93bce (patch)
treed4239fe37529b4799e85443f803db754ef66f874 /net/sunrpc/sched.c
parent4ce70ada1ff1d0b80916ec9ec5764ce44a50a54f (diff)
NFSv4: stateful NFSv4 RPC call interface
The NFSv4 model requires us to complete all RPC calls that might establish state on the server whether or not the user wants to interrupt it. We may also need to schedule new work (including new RPC calls) in order to cancel the new state. The asynchronous RPC model will allow us to ensure that RPC calls always complete, but in order to allow for "synchronous" RPC, we want to add the ability to wait for completion. The waits are, of course, interruptible. Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c78
1 files changed, 59 insertions, 19 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 2d74a1672028..82d158dad16d 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -264,6 +264,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
264} 264}
265EXPORT_SYMBOL(rpc_init_wait_queue); 265EXPORT_SYMBOL(rpc_init_wait_queue);
266 266
267static int rpc_wait_bit_interruptible(void *word)
268{
269 if (signal_pending(current))
270 return -ERESTARTSYS;
271 schedule();
272 return 0;
273}
274
275/*
276 * Mark an RPC call as having completed by clearing the 'active' bit
277 */
278static inline void rpc_mark_complete_task(struct rpc_task *task)
279{
280 rpc_clear_active(task);
281 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
282}
283
284/*
285 * Allow callers to wait for completion of an RPC call
286 */
287int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
288{
289 if (action == NULL)
290 action = rpc_wait_bit_interruptible;
291 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
292 action, TASK_INTERRUPTIBLE);
293}
294EXPORT_SYMBOL(__rpc_wait_for_completion_task);
295
267/* 296/*
268 * Make an RPC task runnable. 297 * Make an RPC task runnable.
269 * 298 *
@@ -299,10 +328,7 @@ static void rpc_make_runnable(struct rpc_task *task)
299static inline void 328static inline void
300rpc_schedule_run(struct rpc_task *task) 329rpc_schedule_run(struct rpc_task *task)
301{ 330{
302 /* Don't run a child twice! */ 331 rpc_set_active(task);
303 if (RPC_IS_ACTIVATED(task))
304 return;
305 task->tk_active = 1;
306 rpc_make_runnable(task); 332 rpc_make_runnable(task);
307} 333}
308 334
@@ -324,8 +350,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
324 } 350 }
325 351
326 /* Mark the task as being activated if so needed */ 352 /* Mark the task as being activated if so needed */
327 if (!RPC_IS_ACTIVATED(task)) 353 rpc_set_active(task);
328 task->tk_active = 1;
329 354
330 __rpc_add_wait_queue(q, task); 355 __rpc_add_wait_queue(q, task);
331 356
@@ -580,14 +605,6 @@ void rpc_exit_task(struct rpc_task *task)
580} 605}
581EXPORT_SYMBOL(rpc_exit_task); 606EXPORT_SYMBOL(rpc_exit_task);
582 607
583static int rpc_wait_bit_interruptible(void *word)
584{
585 if (signal_pending(current))
586 return -ERESTARTSYS;
587 schedule();
588 return 0;
589}
590
591/* 608/*
592 * This is the RPC `scheduler' (or rather, the finite state machine). 609 * This is the RPC `scheduler' (or rather, the finite state machine).
593 */ 610 */
@@ -680,6 +697,8 @@ static int __rpc_execute(struct rpc_task *task)
680 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); 697 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
681 status = task->tk_status; 698 status = task->tk_status;
682 699
700 /* Wake up anyone who is waiting for task completion */
701 rpc_mark_complete_task(task);
683 /* Release all resources associated with the task */ 702 /* Release all resources associated with the task */
684 rpc_release_task(task); 703 rpc_release_task(task);
685 return status; 704 return status;
@@ -697,9 +716,7 @@ static int __rpc_execute(struct rpc_task *task)
697int 716int
698rpc_execute(struct rpc_task *task) 717rpc_execute(struct rpc_task *task)
699{ 718{
700 BUG_ON(task->tk_active); 719 rpc_set_active(task);
701
702 task->tk_active = 1;
703 rpc_set_running(task); 720 rpc_set_running(task);
704 return __rpc_execute(task); 721 return __rpc_execute(task);
705} 722}
@@ -761,6 +778,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
761 init_timer(&task->tk_timer); 778 init_timer(&task->tk_timer);
762 task->tk_timer.data = (unsigned long) task; 779 task->tk_timer.data = (unsigned long) task;
763 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 780 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
781 atomic_set(&task->tk_count, 1);
764 task->tk_client = clnt; 782 task->tk_client = clnt;
765 task->tk_flags = flags; 783 task->tk_flags = flags;
766 task->tk_ops = tk_ops; 784 task->tk_ops = tk_ops;
@@ -848,11 +866,13 @@ void rpc_release_task(struct rpc_task *task)
848{ 866{
849 const struct rpc_call_ops *tk_ops = task->tk_ops; 867 const struct rpc_call_ops *tk_ops = task->tk_ops;
850 void *calldata = task->tk_calldata; 868 void *calldata = task->tk_calldata;
851 dprintk("RPC: %4d release task\n", task->tk_pid);
852 869
853#ifdef RPC_DEBUG 870#ifdef RPC_DEBUG
854 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 871 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
855#endif 872#endif
873 if (!atomic_dec_and_test(&task->tk_count))
874 return;
875 dprintk("RPC: %4d release task\n", task->tk_pid);
856 876
857 /* Remove from global task list */ 877 /* Remove from global task list */
858 spin_lock(&rpc_sched_lock); 878 spin_lock(&rpc_sched_lock);
@@ -860,7 +880,6 @@ void rpc_release_task(struct rpc_task *task)
860 spin_unlock(&rpc_sched_lock); 880 spin_unlock(&rpc_sched_lock);
861 881
862 BUG_ON (RPC_IS_QUEUED(task)); 882 BUG_ON (RPC_IS_QUEUED(task));
863 task->tk_active = 0;
864 883
865 /* Synchronously delete any running timer */ 884 /* Synchronously delete any running timer */
866 rpc_delete_timer(task); 885 rpc_delete_timer(task);
@@ -886,6 +905,27 @@ void rpc_release_task(struct rpc_task *task)
886} 905}
887 906
888/** 907/**
908 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
909 * @clnt - pointer to RPC client
910 * @flags - RPC flags
911 * @ops - RPC call ops
912 * @data - user call data
913 */
914struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
915 const struct rpc_call_ops *ops,
916 void *data)
917{
918 struct rpc_task *task;
919 task = rpc_new_task(clnt, flags, ops, data);
920 if (task == NULL)
921 return ERR_PTR(-ENOMEM);
922 atomic_inc(&task->tk_count);
923 rpc_execute(task);
924 return task;
925}
926EXPORT_SYMBOL(rpc_run_task);
927
928/**
889 * rpc_find_parent - find the parent of a child task. 929 * rpc_find_parent - find the parent of a child task.
890 * @child: child task 930 * @child: child task
891 * 931 *