aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTrond Myklebust <Trond.Myklebust@netapp.com>2006-01-03 03:55:06 -0500
committerTrond Myklebust <Trond.Myklebust@netapp.com>2006-01-06 14:58:40 -0500
commit44c288732fdbd7e38460d156a40d29590bf93bce (patch)
treed4239fe37529b4799e85443f803db754ef66f874
parent4ce70ada1ff1d0b80916ec9ec5764ce44a50a54f (diff)
NFSv4: stateful NFSv4 RPC call interface
The NFSv4 model requires us to complete all RPC calls that might establish state on the server whether or not the user wants to interrupt it. We may also need to schedule new work (including new RPC calls) in order to cancel the new state. The asynchronous RPC model will allow us to ensure that RPC calls always complete, but in order to allow for "synchronous" RPC, we want to add the ability to wait for completion. The waits are, of course, interruptible. Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
-rw-r--r--fs/nfs/direct.c1
-rw-r--r--include/linux/sunrpc/sched.h21
-rw-r--r--net/sunrpc/sched.c78
3 files changed, 78 insertions, 22 deletions
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index a834423942c7..ae2be0744191 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -268,7 +268,6 @@ static void nfs_direct_read_schedule(struct nfs_direct_req *dreq,
268 NFS_PROTO(inode)->read_setup(data); 268 NFS_PROTO(inode)->read_setup(data);
269 269
270 data->task.tk_cookie = (unsigned long) inode; 270 data->task.tk_cookie = (unsigned long) inode;
271 data->task.tk_calldata = data;
272 data->complete = nfs_direct_read_result; 271 data->complete = nfs_direct_read_result;
273 272
274 lock_kernel(); 273 lock_kernel();
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index ac1326fc3e1a..94b0afa4ab05 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -42,6 +42,7 @@ struct rpc_task {
42#ifdef RPC_DEBUG 42#ifdef RPC_DEBUG
43 unsigned long tk_magic; /* 0xf00baa */ 43 unsigned long tk_magic; /* 0xf00baa */
44#endif 44#endif
45 atomic_t tk_count; /* Reference count */
45 struct list_head tk_task; /* global list of tasks */ 46 struct list_head tk_task; /* global list of tasks */
46 struct rpc_clnt * tk_client; /* RPC client */ 47 struct rpc_clnt * tk_client; /* RPC client */
47 struct rpc_rqst * tk_rqstp; /* RPC request */ 48 struct rpc_rqst * tk_rqstp; /* RPC request */
@@ -78,7 +79,6 @@ struct rpc_task {
78 struct timer_list tk_timer; /* kernel timer */ 79 struct timer_list tk_timer; /* kernel timer */
79 unsigned long tk_timeout; /* timeout for rpc_sleep() */ 80 unsigned long tk_timeout; /* timeout for rpc_sleep() */
80 unsigned short tk_flags; /* misc flags */ 81 unsigned short tk_flags; /* misc flags */
81 unsigned char tk_active : 1;/* Task has been activated */
82 unsigned char tk_priority : 2;/* Task priority */ 82 unsigned char tk_priority : 2;/* Task priority */
83 unsigned long tk_runstate; /* Task run status */ 83 unsigned long tk_runstate; /* Task run status */
84 struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could 84 struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
@@ -136,7 +136,6 @@ struct rpc_call_ops {
136#define RPC_IS_SWAPPER(t) ((t)->tk_flags & RPC_TASK_SWAPPER) 136#define RPC_IS_SWAPPER(t) ((t)->tk_flags & RPC_TASK_SWAPPER)
137#define RPC_DO_ROOTOVERRIDE(t) ((t)->tk_flags & RPC_TASK_ROOTCREDS) 137#define RPC_DO_ROOTOVERRIDE(t) ((t)->tk_flags & RPC_TASK_ROOTCREDS)
138#define RPC_ASSASSINATED(t) ((t)->tk_flags & RPC_TASK_KILLED) 138#define RPC_ASSASSINATED(t) ((t)->tk_flags & RPC_TASK_KILLED)
139#define RPC_IS_ACTIVATED(t) ((t)->tk_active)
140#define RPC_DO_CALLBACK(t) ((t)->tk_callback != NULL) 139#define RPC_DO_CALLBACK(t) ((t)->tk_callback != NULL)
141#define RPC_IS_SOFT(t) ((t)->tk_flags & RPC_TASK_SOFT) 140#define RPC_IS_SOFT(t) ((t)->tk_flags & RPC_TASK_SOFT)
142#define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR) 141#define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR)
@@ -145,6 +144,7 @@ struct rpc_call_ops {
145#define RPC_TASK_QUEUED 1 144#define RPC_TASK_QUEUED 1
146#define RPC_TASK_WAKEUP 2 145#define RPC_TASK_WAKEUP 2
147#define RPC_TASK_HAS_TIMER 3 146#define RPC_TASK_HAS_TIMER 3
147#define RPC_TASK_ACTIVE 4
148 148
149#define RPC_IS_RUNNING(t) (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) 149#define RPC_IS_RUNNING(t) (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
150#define rpc_set_running(t) (set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) 150#define rpc_set_running(t) (set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
@@ -175,6 +175,15 @@ struct rpc_call_ops {
175 smp_mb__after_clear_bit(); \ 175 smp_mb__after_clear_bit(); \
176 } while (0) 176 } while (0)
177 177
178#define RPC_IS_ACTIVATED(t) (test_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate))
179#define rpc_set_active(t) (set_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate))
180#define rpc_clear_active(t) \
181 do { \
182 smp_mb__before_clear_bit(); \
183 clear_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate); \
184 smp_mb__after_clear_bit(); \
185 } while(0)
186
178/* 187/*
179 * Task priorities. 188 * Task priorities.
180 * Note: if you change these, you must also change 189 * Note: if you change these, you must also change
@@ -237,6 +246,8 @@ struct rpc_wait_queue {
237 */ 246 */
238struct rpc_task *rpc_new_task(struct rpc_clnt *, int flags, 247struct rpc_task *rpc_new_task(struct rpc_clnt *, int flags,
239 const struct rpc_call_ops *ops, void *data); 248 const struct rpc_call_ops *ops, void *data);
249struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
250 const struct rpc_call_ops *ops, void *data);
240struct rpc_task *rpc_new_child(struct rpc_clnt *, struct rpc_task *parent); 251struct rpc_task *rpc_new_child(struct rpc_clnt *, struct rpc_task *parent);
241void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, 252void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
242 int flags, const struct rpc_call_ops *ops, 253 int flags, const struct rpc_call_ops *ops,
@@ -260,6 +271,7 @@ void * rpc_malloc(struct rpc_task *, size_t);
260int rpciod_up(void); 271int rpciod_up(void);
261void rpciod_down(void); 272void rpciod_down(void);
262void rpciod_wake_up(void); 273void rpciod_wake_up(void);
274int __rpc_wait_for_completion_task(struct rpc_task *task, int (*)(void *));
263#ifdef RPC_DEBUG 275#ifdef RPC_DEBUG
264void rpc_show_tasks(void); 276void rpc_show_tasks(void);
265#endif 277#endif
@@ -272,6 +284,11 @@ static inline void rpc_exit(struct rpc_task *task, int status)
272 task->tk_action = rpc_exit_task; 284 task->tk_action = rpc_exit_task;
273} 285}
274 286
287static inline int rpc_wait_for_completion_task(struct rpc_task *task)
288{
289 return __rpc_wait_for_completion_task(task, NULL);
290}
291
275#ifdef RPC_DEBUG 292#ifdef RPC_DEBUG
276static inline const char * rpc_qname(struct rpc_wait_queue *q) 293static inline const char * rpc_qname(struct rpc_wait_queue *q)
277{ 294{
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 2d74a1672028..82d158dad16d 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -264,6 +264,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
264} 264}
265EXPORT_SYMBOL(rpc_init_wait_queue); 265EXPORT_SYMBOL(rpc_init_wait_queue);
266 266
267static int rpc_wait_bit_interruptible(void *word)
268{
269 if (signal_pending(current))
270 return -ERESTARTSYS;
271 schedule();
272 return 0;
273}
274
275/*
276 * Mark an RPC call as having completed by clearing the 'active' bit
277 */
278static inline void rpc_mark_complete_task(struct rpc_task *task)
279{
280 rpc_clear_active(task);
281 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
282}
283
284/*
285 * Allow callers to wait for completion of an RPC call
286 */
287int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
288{
289 if (action == NULL)
290 action = rpc_wait_bit_interruptible;
291 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
292 action, TASK_INTERRUPTIBLE);
293}
294EXPORT_SYMBOL(__rpc_wait_for_completion_task);
295
267/* 296/*
268 * Make an RPC task runnable. 297 * Make an RPC task runnable.
269 * 298 *
@@ -299,10 +328,7 @@ static void rpc_make_runnable(struct rpc_task *task)
299static inline void 328static inline void
300rpc_schedule_run(struct rpc_task *task) 329rpc_schedule_run(struct rpc_task *task)
301{ 330{
302 /* Don't run a child twice! */ 331 rpc_set_active(task);
303 if (RPC_IS_ACTIVATED(task))
304 return;
305 task->tk_active = 1;
306 rpc_make_runnable(task); 332 rpc_make_runnable(task);
307} 333}
308 334
@@ -324,8 +350,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
324 } 350 }
325 351
326 /* Mark the task as being activated if so needed */ 352 /* Mark the task as being activated if so needed */
327 if (!RPC_IS_ACTIVATED(task)) 353 rpc_set_active(task);
328 task->tk_active = 1;
329 354
330 __rpc_add_wait_queue(q, task); 355 __rpc_add_wait_queue(q, task);
331 356
@@ -580,14 +605,6 @@ void rpc_exit_task(struct rpc_task *task)
580} 605}
581EXPORT_SYMBOL(rpc_exit_task); 606EXPORT_SYMBOL(rpc_exit_task);
582 607
583static int rpc_wait_bit_interruptible(void *word)
584{
585 if (signal_pending(current))
586 return -ERESTARTSYS;
587 schedule();
588 return 0;
589}
590
591/* 608/*
592 * This is the RPC `scheduler' (or rather, the finite state machine). 609 * This is the RPC `scheduler' (or rather, the finite state machine).
593 */ 610 */
@@ -680,6 +697,8 @@ static int __rpc_execute(struct rpc_task *task)
680 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); 697 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
681 status = task->tk_status; 698 status = task->tk_status;
682 699
700 /* Wake up anyone who is waiting for task completion */
701 rpc_mark_complete_task(task);
683 /* Release all resources associated with the task */ 702 /* Release all resources associated with the task */
684 rpc_release_task(task); 703 rpc_release_task(task);
685 return status; 704 return status;
@@ -697,9 +716,7 @@ static int __rpc_execute(struct rpc_task *task)
697int 716int
698rpc_execute(struct rpc_task *task) 717rpc_execute(struct rpc_task *task)
699{ 718{
700 BUG_ON(task->tk_active); 719 rpc_set_active(task);
701
702 task->tk_active = 1;
703 rpc_set_running(task); 720 rpc_set_running(task);
704 return __rpc_execute(task); 721 return __rpc_execute(task);
705} 722}
@@ -761,6 +778,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
761 init_timer(&task->tk_timer); 778 init_timer(&task->tk_timer);
762 task->tk_timer.data = (unsigned long) task; 779 task->tk_timer.data = (unsigned long) task;
763 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 780 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
781 atomic_set(&task->tk_count, 1);
764 task->tk_client = clnt; 782 task->tk_client = clnt;
765 task->tk_flags = flags; 783 task->tk_flags = flags;
766 task->tk_ops = tk_ops; 784 task->tk_ops = tk_ops;
@@ -848,11 +866,13 @@ void rpc_release_task(struct rpc_task *task)
848{ 866{
849 const struct rpc_call_ops *tk_ops = task->tk_ops; 867 const struct rpc_call_ops *tk_ops = task->tk_ops;
850 void *calldata = task->tk_calldata; 868 void *calldata = task->tk_calldata;
851 dprintk("RPC: %4d release task\n", task->tk_pid);
852 869
853#ifdef RPC_DEBUG 870#ifdef RPC_DEBUG
854 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 871 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
855#endif 872#endif
873 if (!atomic_dec_and_test(&task->tk_count))
874 return;
875 dprintk("RPC: %4d release task\n", task->tk_pid);
856 876
857 /* Remove from global task list */ 877 /* Remove from global task list */
858 spin_lock(&rpc_sched_lock); 878 spin_lock(&rpc_sched_lock);
@@ -860,7 +880,6 @@ void rpc_release_task(struct rpc_task *task)
860 spin_unlock(&rpc_sched_lock); 880 spin_unlock(&rpc_sched_lock);
861 881
862 BUG_ON (RPC_IS_QUEUED(task)); 882 BUG_ON (RPC_IS_QUEUED(task));
863 task->tk_active = 0;
864 883
865 /* Synchronously delete any running timer */ 884 /* Synchronously delete any running timer */
866 rpc_delete_timer(task); 885 rpc_delete_timer(task);
@@ -886,6 +905,27 @@ void rpc_release_task(struct rpc_task *task)
886} 905}
887 906
888/** 907/**
908 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
909 * @clnt - pointer to RPC client
910 * @flags - RPC flags
911 * @ops - RPC call ops
912 * @data - user call data
913 */
914struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
915 const struct rpc_call_ops *ops,
916 void *data)
917{
918 struct rpc_task *task;
919 task = rpc_new_task(clnt, flags, ops, data);
920 if (task == NULL)
921 return ERR_PTR(-ENOMEM);
922 atomic_inc(&task->tk_count);
923 rpc_execute(task);
924 return task;
925}
926EXPORT_SYMBOL(rpc_run_task);
927
928/**
889 * rpc_find_parent - find the parent of a child task. 929 * rpc_find_parent - find the parent of a child task.
890 * @child: child task 930 * @child: child task
891 * 931 *