aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c222
1 files changed, 135 insertions, 87 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 54e60a657500..7415406aa1ae 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -41,8 +41,6 @@ static mempool_t *rpc_buffer_mempool __read_mostly;
41 41
42static void __rpc_default_timer(struct rpc_task *task); 42static void __rpc_default_timer(struct rpc_task *task);
43static void rpciod_killall(void); 43static void rpciod_killall(void);
44static void rpc_free(struct rpc_task *task);
45
46static void rpc_async_schedule(void *); 44static void rpc_async_schedule(void *);
47 45
48/* 46/*
@@ -264,6 +262,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
264} 262}
265EXPORT_SYMBOL(rpc_init_wait_queue); 263EXPORT_SYMBOL(rpc_init_wait_queue);
266 264
265static int rpc_wait_bit_interruptible(void *word)
266{
267 if (signal_pending(current))
268 return -ERESTARTSYS;
269 schedule();
270 return 0;
271}
272
273/*
274 * Mark an RPC call as having completed by clearing the 'active' bit
275 */
276static inline void rpc_mark_complete_task(struct rpc_task *task)
277{
278 rpc_clear_active(task);
279 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
280}
281
282/*
283 * Allow callers to wait for completion of an RPC call
284 */
285int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
286{
287 if (action == NULL)
288 action = rpc_wait_bit_interruptible;
289 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
290 action, TASK_INTERRUPTIBLE);
291}
292EXPORT_SYMBOL(__rpc_wait_for_completion_task);
293
267/* 294/*
268 * Make an RPC task runnable. 295 * Make an RPC task runnable.
269 * 296 *
@@ -299,10 +326,7 @@ static void rpc_make_runnable(struct rpc_task *task)
299static inline void 326static inline void
300rpc_schedule_run(struct rpc_task *task) 327rpc_schedule_run(struct rpc_task *task)
301{ 328{
302 /* Don't run a child twice! */ 329 rpc_set_active(task);
303 if (RPC_IS_ACTIVATED(task))
304 return;
305 task->tk_active = 1;
306 rpc_make_runnable(task); 330 rpc_make_runnable(task);
307} 331}
308 332
@@ -324,8 +348,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
324 } 348 }
325 349
326 /* Mark the task as being activated if so needed */ 350 /* Mark the task as being activated if so needed */
327 if (!RPC_IS_ACTIVATED(task)) 351 rpc_set_active(task);
328 task->tk_active = 1;
329 352
330 __rpc_add_wait_queue(q, task); 353 __rpc_add_wait_queue(q, task);
331 354
@@ -555,36 +578,29 @@ __rpc_atrun(struct rpc_task *task)
555} 578}
556 579
557/* 580/*
558 * Helper that calls task->tk_exit if it exists and then returns 581 * Helper to call task->tk_ops->rpc_call_prepare
559 * true if we should exit __rpc_execute.
560 */ 582 */
561static inline int __rpc_do_exit(struct rpc_task *task) 583static void rpc_prepare_task(struct rpc_task *task)
562{ 584{
563 if (task->tk_exit != NULL) { 585 task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
564 lock_kernel();
565 task->tk_exit(task);
566 unlock_kernel();
567 /* If tk_action is non-null, we should restart the call */
568 if (task->tk_action != NULL) {
569 if (!RPC_ASSASSINATED(task)) {
570 /* Release RPC slot and buffer memory */
571 xprt_release(task);
572 rpc_free(task);
573 return 0;
574 }
575 printk(KERN_ERR "RPC: dead task tried to walk away.\n");
576 }
577 }
578 return 1;
579} 586}
580 587
581static int rpc_wait_bit_interruptible(void *word) 588/*
589 * Helper that calls task->tk_ops->rpc_call_done if it exists
590 */
591void rpc_exit_task(struct rpc_task *task)
582{ 592{
583 if (signal_pending(current)) 593 task->tk_action = NULL;
584 return -ERESTARTSYS; 594 if (task->tk_ops->rpc_call_done != NULL) {
585 schedule(); 595 task->tk_ops->rpc_call_done(task, task->tk_calldata);
586 return 0; 596 if (task->tk_action != NULL) {
597 WARN_ON(RPC_ASSASSINATED(task));
598 /* Always release the RPC slot and buffer memory */
599 xprt_release(task);
600 }
601 }
587} 602}
603EXPORT_SYMBOL(rpc_exit_task);
588 604
589/* 605/*
590 * This is the RPC `scheduler' (or rather, the finite state machine). 606 * This is the RPC `scheduler' (or rather, the finite state machine).
@@ -631,12 +647,11 @@ static int __rpc_execute(struct rpc_task *task)
631 * by someone else. 647 * by someone else.
632 */ 648 */
633 if (!RPC_IS_QUEUED(task)) { 649 if (!RPC_IS_QUEUED(task)) {
634 if (task->tk_action != NULL) { 650 if (task->tk_action == NULL)
635 lock_kernel();
636 task->tk_action(task);
637 unlock_kernel();
638 } else if (__rpc_do_exit(task))
639 break; 651 break;
652 lock_kernel();
653 task->tk_action(task);
654 unlock_kernel();
640 } 655 }
641 656
642 /* 657 /*
@@ -676,9 +691,9 @@ static int __rpc_execute(struct rpc_task *task)
676 dprintk("RPC: %4d sync task resuming\n", task->tk_pid); 691 dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
677 } 692 }
678 693
679 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); 694 dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
680 status = task->tk_status; 695 /* Wake up anyone who is waiting for task completion */
681 696 rpc_mark_complete_task(task);
682 /* Release all resources associated with the task */ 697 /* Release all resources associated with the task */
683 rpc_release_task(task); 698 rpc_release_task(task);
684 return status; 699 return status;
@@ -696,9 +711,7 @@ static int __rpc_execute(struct rpc_task *task)
696int 711int
697rpc_execute(struct rpc_task *task) 712rpc_execute(struct rpc_task *task)
698{ 713{
699 BUG_ON(task->tk_active); 714 rpc_set_active(task);
700
701 task->tk_active = 1;
702 rpc_set_running(task); 715 rpc_set_running(task);
703 return __rpc_execute(task); 716 return __rpc_execute(task);
704} 717}
@@ -708,17 +721,19 @@ static void rpc_async_schedule(void *arg)
708 __rpc_execute((struct rpc_task *)arg); 721 __rpc_execute((struct rpc_task *)arg);
709} 722}
710 723
711/* 724/**
712 * Allocate memory for RPC purposes. 725 * rpc_malloc - allocate an RPC buffer
726 * @task: RPC task that will use this buffer
727 * @size: requested byte size
713 * 728 *
714 * We try to ensure that some NFS reads and writes can always proceed 729 * We try to ensure that some NFS reads and writes can always proceed
715 * by using a mempool when allocating 'small' buffers. 730 * by using a mempool when allocating 'small' buffers.
716 * In order to avoid memory starvation triggering more writebacks of 731 * In order to avoid memory starvation triggering more writebacks of
717 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL. 732 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
718 */ 733 */
719void * 734void * rpc_malloc(struct rpc_task *task, size_t size)
720rpc_malloc(struct rpc_task *task, size_t size)
721{ 735{
736 struct rpc_rqst *req = task->tk_rqstp;
722 gfp_t gfp; 737 gfp_t gfp;
723 738
724 if (task->tk_flags & RPC_TASK_SWAPPER) 739 if (task->tk_flags & RPC_TASK_SWAPPER)
@@ -727,42 +742,52 @@ rpc_malloc(struct rpc_task *task, size_t size)
727 gfp = GFP_NOFS; 742 gfp = GFP_NOFS;
728 743
729 if (size > RPC_BUFFER_MAXSIZE) { 744 if (size > RPC_BUFFER_MAXSIZE) {
730 task->tk_buffer = kmalloc(size, gfp); 745 req->rq_buffer = kmalloc(size, gfp);
731 if (task->tk_buffer) 746 if (req->rq_buffer)
732 task->tk_bufsize = size; 747 req->rq_bufsize = size;
733 } else { 748 } else {
734 task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp); 749 req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
735 if (task->tk_buffer) 750 if (req->rq_buffer)
736 task->tk_bufsize = RPC_BUFFER_MAXSIZE; 751 req->rq_bufsize = RPC_BUFFER_MAXSIZE;
737 } 752 }
738 return task->tk_buffer; 753 return req->rq_buffer;
739} 754}
740 755
741static void 756/**
742rpc_free(struct rpc_task *task) 757 * rpc_free - free buffer allocated via rpc_malloc
758 * @task: RPC task with a buffer to be freed
759 *
760 */
761void rpc_free(struct rpc_task *task)
743{ 762{
744 if (task->tk_buffer) { 763 struct rpc_rqst *req = task->tk_rqstp;
745 if (task->tk_bufsize == RPC_BUFFER_MAXSIZE) 764
746 mempool_free(task->tk_buffer, rpc_buffer_mempool); 765 if (req->rq_buffer) {
766 if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
767 mempool_free(req->rq_buffer, rpc_buffer_mempool);
747 else 768 else
748 kfree(task->tk_buffer); 769 kfree(req->rq_buffer);
749 task->tk_buffer = NULL; 770 req->rq_buffer = NULL;
750 task->tk_bufsize = 0; 771 req->rq_bufsize = 0;
751 } 772 }
752} 773}
753 774
754/* 775/*
755 * Creation and deletion of RPC task structures 776 * Creation and deletion of RPC task structures
756 */ 777 */
757void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags) 778void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
758{ 779{
759 memset(task, 0, sizeof(*task)); 780 memset(task, 0, sizeof(*task));
760 init_timer(&task->tk_timer); 781 init_timer(&task->tk_timer);
761 task->tk_timer.data = (unsigned long) task; 782 task->tk_timer.data = (unsigned long) task;
762 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 783 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
784 atomic_set(&task->tk_count, 1);
763 task->tk_client = clnt; 785 task->tk_client = clnt;
764 task->tk_flags = flags; 786 task->tk_flags = flags;
765 task->tk_exit = callback; 787 task->tk_ops = tk_ops;
788 if (tk_ops->rpc_call_prepare != NULL)
789 task->tk_action = rpc_prepare_task;
790 task->tk_calldata = calldata;
766 791
767 /* Initialize retry counters */ 792 /* Initialize retry counters */
768 task->tk_garb_retry = 2; 793 task->tk_garb_retry = 2;
@@ -791,6 +816,8 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
791 list_add_tail(&task->tk_task, &all_tasks); 816 list_add_tail(&task->tk_task, &all_tasks);
792 spin_unlock(&rpc_sched_lock); 817 spin_unlock(&rpc_sched_lock);
793 818
819 BUG_ON(task->tk_ops == NULL);
820
794 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, 821 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
795 current->pid); 822 current->pid);
796} 823}
@@ -801,8 +828,7 @@ rpc_alloc_task(void)
801 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 828 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
802} 829}
803 830
804static void 831static void rpc_free_task(struct rpc_task *task)
805rpc_default_free_task(struct rpc_task *task)
806{ 832{
807 dprintk("RPC: %4d freeing task\n", task->tk_pid); 833 dprintk("RPC: %4d freeing task\n", task->tk_pid);
808 mempool_free(task, rpc_task_mempool); 834 mempool_free(task, rpc_task_mempool);
@@ -813,8 +839,7 @@ rpc_default_free_task(struct rpc_task *task)
813 * clean up after an allocation failure, as the client may 839 * clean up after an allocation failure, as the client may
814 * have specified "oneshot". 840 * have specified "oneshot".
815 */ 841 */
816struct rpc_task * 842struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
817rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
818{ 843{
819 struct rpc_task *task; 844 struct rpc_task *task;
820 845
@@ -822,10 +847,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
822 if (!task) 847 if (!task)
823 goto cleanup; 848 goto cleanup;
824 849
825 rpc_init_task(task, clnt, callback, flags); 850 rpc_init_task(task, clnt, flags, tk_ops, calldata);
826
827 /* Replace tk_release */
828 task->tk_release = rpc_default_free_task;
829 851
830 dprintk("RPC: %4d allocated task\n", task->tk_pid); 852 dprintk("RPC: %4d allocated task\n", task->tk_pid);
831 task->tk_flags |= RPC_TASK_DYNAMIC; 853 task->tk_flags |= RPC_TASK_DYNAMIC;
@@ -845,11 +867,15 @@ cleanup:
845 867
846void rpc_release_task(struct rpc_task *task) 868void rpc_release_task(struct rpc_task *task)
847{ 869{
848 dprintk("RPC: %4d release task\n", task->tk_pid); 870 const struct rpc_call_ops *tk_ops = task->tk_ops;
871 void *calldata = task->tk_calldata;
849 872
850#ifdef RPC_DEBUG 873#ifdef RPC_DEBUG
851 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 874 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
852#endif 875#endif
876 if (!atomic_dec_and_test(&task->tk_count))
877 return;
878 dprintk("RPC: %4d release task\n", task->tk_pid);
853 879
854 /* Remove from global task list */ 880 /* Remove from global task list */
855 spin_lock(&rpc_sched_lock); 881 spin_lock(&rpc_sched_lock);
@@ -857,7 +883,6 @@ void rpc_release_task(struct rpc_task *task)
857 spin_unlock(&rpc_sched_lock); 883 spin_unlock(&rpc_sched_lock);
858 884
859 BUG_ON (RPC_IS_QUEUED(task)); 885 BUG_ON (RPC_IS_QUEUED(task));
860 task->tk_active = 0;
861 886
862 /* Synchronously delete any running timer */ 887 /* Synchronously delete any running timer */
863 rpc_delete_timer(task); 888 rpc_delete_timer(task);
@@ -867,7 +892,6 @@ void rpc_release_task(struct rpc_task *task)
867 xprt_release(task); 892 xprt_release(task);
868 if (task->tk_msg.rpc_cred) 893 if (task->tk_msg.rpc_cred)
869 rpcauth_unbindcred(task); 894 rpcauth_unbindcred(task);
870 rpc_free(task);
871 if (task->tk_client) { 895 if (task->tk_client) {
872 rpc_release_client(task->tk_client); 896 rpc_release_client(task->tk_client);
873 task->tk_client = NULL; 897 task->tk_client = NULL;
@@ -876,11 +900,34 @@ void rpc_release_task(struct rpc_task *task)
876#ifdef RPC_DEBUG 900#ifdef RPC_DEBUG
877 task->tk_magic = 0; 901 task->tk_magic = 0;
878#endif 902#endif
879 if (task->tk_release) 903 if (task->tk_flags & RPC_TASK_DYNAMIC)
880 task->tk_release(task); 904 rpc_free_task(task);
905 if (tk_ops->rpc_release)
906 tk_ops->rpc_release(calldata);
881} 907}
882 908
883/** 909/**
910 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
911 * @clnt - pointer to RPC client
912 * @flags - RPC flags
913 * @ops - RPC call ops
914 * @data - user call data
915 */
916struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
917 const struct rpc_call_ops *ops,
918 void *data)
919{
920 struct rpc_task *task;
921 task = rpc_new_task(clnt, flags, ops, data);
922 if (task == NULL)
923 return ERR_PTR(-ENOMEM);
924 atomic_inc(&task->tk_count);
925 rpc_execute(task);
926 return task;
927}
928EXPORT_SYMBOL(rpc_run_task);
929
930/**
884 * rpc_find_parent - find the parent of a child task. 931 * rpc_find_parent - find the parent of a child task.
885 * @child: child task 932 * @child: child task
886 * 933 *
@@ -890,12 +937,11 @@ void rpc_release_task(struct rpc_task *task)
890 * 937 *
891 * Caller must hold childq.lock 938 * Caller must hold childq.lock
892 */ 939 */
893static inline struct rpc_task *rpc_find_parent(struct rpc_task *child) 940static inline struct rpc_task *rpc_find_parent(struct rpc_task *child, struct rpc_task *parent)
894{ 941{
895 struct rpc_task *task, *parent; 942 struct rpc_task *task;
896 struct list_head *le; 943 struct list_head *le;
897 944
898 parent = (struct rpc_task *) child->tk_calldata;
899 task_for_each(task, le, &childq.tasks[0]) 945 task_for_each(task, le, &childq.tasks[0])
900 if (task == parent) 946 if (task == parent)
901 return parent; 947 return parent;
@@ -903,18 +949,22 @@ static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
903 return NULL; 949 return NULL;
904} 950}
905 951
906static void rpc_child_exit(struct rpc_task *child) 952static void rpc_child_exit(struct rpc_task *child, void *calldata)
907{ 953{
908 struct rpc_task *parent; 954 struct rpc_task *parent;
909 955
910 spin_lock_bh(&childq.lock); 956 spin_lock_bh(&childq.lock);
911 if ((parent = rpc_find_parent(child)) != NULL) { 957 if ((parent = rpc_find_parent(child, calldata)) != NULL) {
912 parent->tk_status = child->tk_status; 958 parent->tk_status = child->tk_status;
913 __rpc_wake_up_task(parent); 959 __rpc_wake_up_task(parent);
914 } 960 }
915 spin_unlock_bh(&childq.lock); 961 spin_unlock_bh(&childq.lock);
916} 962}
917 963
964static const struct rpc_call_ops rpc_child_ops = {
965 .rpc_call_done = rpc_child_exit,
966};
967
918/* 968/*
919 * Note: rpc_new_task releases the client after a failure. 969 * Note: rpc_new_task releases the client after a failure.
920 */ 970 */
@@ -923,11 +973,9 @@ rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
923{ 973{
924 struct rpc_task *task; 974 struct rpc_task *task;
925 975
926 task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD); 976 task = rpc_new_task(clnt, RPC_TASK_ASYNC | RPC_TASK_CHILD, &rpc_child_ops, parent);
927 if (!task) 977 if (!task)
928 goto fail; 978 goto fail;
929 task->tk_exit = rpc_child_exit;
930 task->tk_calldata = parent;
931 return task; 979 return task;
932 980
933fail: 981fail:
@@ -1063,7 +1111,7 @@ void rpc_show_tasks(void)
1063 return; 1111 return;
1064 } 1112 }
1065 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1113 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1066 "-rpcwait -action- --exit--\n"); 1114 "-rpcwait -action- ---ops--\n");
1067 alltask_for_each(t, le, &all_tasks) { 1115 alltask_for_each(t, le, &all_tasks) {
1068 const char *rpc_waitq = "none"; 1116 const char *rpc_waitq = "none";
1069 1117
@@ -1078,7 +1126,7 @@ void rpc_show_tasks(void)
1078 (t->tk_client ? t->tk_client->cl_prog : 0), 1126 (t->tk_client ? t->tk_client->cl_prog : 0),
1079 t->tk_rqstp, t->tk_timeout, 1127 t->tk_rqstp, t->tk_timeout,
1080 rpc_waitq, 1128 rpc_waitq,
1081 t->tk_action, t->tk_exit); 1129 t->tk_action, t->tk_ops);
1082 } 1130 }
1083 spin_unlock(&rpc_sched_lock); 1131 spin_unlock(&rpc_sched_lock);
1084} 1132}