aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c209
1 files changed, 60 insertions, 149 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 944d75396fb3..2ac43c41c3a9 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -25,7 +25,6 @@
25#ifdef RPC_DEBUG 25#ifdef RPC_DEBUG
26#define RPCDBG_FACILITY RPCDBG_SCHED 26#define RPCDBG_FACILITY RPCDBG_SCHED
27#define RPC_TASK_MAGIC_ID 0xf00baa 27#define RPC_TASK_MAGIC_ID 0xf00baa
28static int rpc_task_id;
29#endif 28#endif
30 29
31/* 30/*
@@ -40,7 +39,6 @@ static mempool_t *rpc_task_mempool __read_mostly;
40static mempool_t *rpc_buffer_mempool __read_mostly; 39static mempool_t *rpc_buffer_mempool __read_mostly;
41 40
42static void __rpc_default_timer(struct rpc_task *task); 41static void __rpc_default_timer(struct rpc_task *task);
43static void rpciod_killall(void);
44static void rpc_async_schedule(struct work_struct *); 42static void rpc_async_schedule(struct work_struct *);
45static void rpc_release_task(struct rpc_task *task); 43static void rpc_release_task(struct rpc_task *task);
46 44
@@ -50,23 +48,13 @@ static void rpc_release_task(struct rpc_task *task);
50static RPC_WAITQ(delay_queue, "delayq"); 48static RPC_WAITQ(delay_queue, "delayq");
51 49
52/* 50/*
53 * All RPC tasks are linked into this list
54 */
55static LIST_HEAD(all_tasks);
56
57/*
58 * rpciod-related stuff 51 * rpciod-related stuff
59 */ 52 */
60static DEFINE_MUTEX(rpciod_mutex); 53static DEFINE_MUTEX(rpciod_mutex);
61static unsigned int rpciod_users; 54static atomic_t rpciod_users = ATOMIC_INIT(0);
62struct workqueue_struct *rpciod_workqueue; 55struct workqueue_struct *rpciod_workqueue;
63 56
64/* 57/*
65 * Spinlock for other critical sections of code.
66 */
67static DEFINE_SPINLOCK(rpc_sched_lock);
68
69/*
70 * Disable the timer for a given RPC task. Should be called with 58 * Disable the timer for a given RPC task. Should be called with
71 * queue->lock and bh_disabled in order to avoid races within 59 * queue->lock and bh_disabled in order to avoid races within
72 * rpc_run_timer(). 60 * rpc_run_timer().
@@ -267,18 +255,33 @@ static int rpc_wait_bit_interruptible(void *word)
267 return 0; 255 return 0;
268} 256}
269 257
258#ifdef RPC_DEBUG
259static void rpc_task_set_debuginfo(struct rpc_task *task)
260{
261 static atomic_t rpc_pid;
262
263 task->tk_magic = RPC_TASK_MAGIC_ID;
264 task->tk_pid = atomic_inc_return(&rpc_pid);
265}
266#else
267static inline void rpc_task_set_debuginfo(struct rpc_task *task)
268{
269}
270#endif
271
270static void rpc_set_active(struct rpc_task *task) 272static void rpc_set_active(struct rpc_task *task)
271{ 273{
274 struct rpc_clnt *clnt;
272 if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0) 275 if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
273 return; 276 return;
274 spin_lock(&rpc_sched_lock); 277 rpc_task_set_debuginfo(task);
275#ifdef RPC_DEBUG
276 task->tk_magic = RPC_TASK_MAGIC_ID;
277 task->tk_pid = rpc_task_id++;
278#endif
279 /* Add to global list of all tasks */ 278 /* Add to global list of all tasks */
280 list_add_tail(&task->tk_task, &all_tasks); 279 clnt = task->tk_client;
281 spin_unlock(&rpc_sched_lock); 280 if (clnt != NULL) {
281 spin_lock(&clnt->cl_lock);
282 list_add_tail(&task->tk_task, &clnt->cl_tasks);
283 spin_unlock(&clnt->cl_lock);
284 }
282} 285}
283 286
284/* 287/*
@@ -818,6 +821,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
818 if (tk_ops->rpc_call_prepare != NULL) 821 if (tk_ops->rpc_call_prepare != NULL)
819 task->tk_action = rpc_prepare_task; 822 task->tk_action = rpc_prepare_task;
820 task->tk_calldata = calldata; 823 task->tk_calldata = calldata;
824 INIT_LIST_HEAD(&task->tk_task);
821 825
822 /* Initialize retry counters */ 826 /* Initialize retry counters */
823 task->tk_garb_retry = 2; 827 task->tk_garb_retry = 2;
@@ -830,7 +834,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
830 task->tk_workqueue = rpciod_workqueue; 834 task->tk_workqueue = rpciod_workqueue;
831 835
832 if (clnt) { 836 if (clnt) {
833 atomic_inc(&clnt->cl_users); 837 kref_get(&clnt->cl_kref);
834 if (clnt->cl_softrtry) 838 if (clnt->cl_softrtry)
835 task->tk_flags |= RPC_TASK_SOFT; 839 task->tk_flags |= RPC_TASK_SOFT;
836 if (!clnt->cl_intr) 840 if (!clnt->cl_intr)
@@ -860,9 +864,7 @@ static void rpc_free_task(struct rcu_head *rcu)
860} 864}
861 865
862/* 866/*
863 * Create a new task for the specified client. We have to 867 * Create a new task for the specified client.
864 * clean up after an allocation failure, as the client may
865 * have specified "oneshot".
866 */ 868 */
867struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 869struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
868{ 870{
@@ -870,7 +872,7 @@ struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc
870 872
871 task = rpc_alloc_task(); 873 task = rpc_alloc_task();
872 if (!task) 874 if (!task)
873 goto cleanup; 875 goto out;
874 876
875 rpc_init_task(task, clnt, flags, tk_ops, calldata); 877 rpc_init_task(task, clnt, flags, tk_ops, calldata);
876 878
@@ -878,16 +880,6 @@ struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc
878 task->tk_flags |= RPC_TASK_DYNAMIC; 880 task->tk_flags |= RPC_TASK_DYNAMIC;
879out: 881out:
880 return task; 882 return task;
881
882cleanup:
883 /* Check whether to release the client */
884 if (clnt) {
885 printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
886 atomic_read(&clnt->cl_users), clnt->cl_oneshot);
887 atomic_inc(&clnt->cl_users); /* pretend we were used ... */
888 rpc_release_client(clnt);
889 }
890 goto out;
891} 883}
892 884
893 885
@@ -920,11 +912,13 @@ static void rpc_release_task(struct rpc_task *task)
920#endif 912#endif
921 dprintk("RPC: %5u release task\n", task->tk_pid); 913 dprintk("RPC: %5u release task\n", task->tk_pid);
922 914
923 /* Remove from global task list */ 915 if (!list_empty(&task->tk_task)) {
924 spin_lock(&rpc_sched_lock); 916 struct rpc_clnt *clnt = task->tk_client;
925 list_del(&task->tk_task); 917 /* Remove from client task list */
926 spin_unlock(&rpc_sched_lock); 918 spin_lock(&clnt->cl_lock);
927 919 list_del(&task->tk_task);
920 spin_unlock(&clnt->cl_lock);
921 }
928 BUG_ON (RPC_IS_QUEUED(task)); 922 BUG_ON (RPC_IS_QUEUED(task));
929 923
930 /* Synchronously delete any running timer */ 924 /* Synchronously delete any running timer */
@@ -939,29 +933,6 @@ static void rpc_release_task(struct rpc_task *task)
939 rpc_put_task(task); 933 rpc_put_task(task);
940} 934}
941 935
942/**
943 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
944 * @clnt: pointer to RPC client
945 * @flags: RPC flags
946 * @ops: RPC call ops
947 * @data: user call data
948 */
949struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
950 const struct rpc_call_ops *ops,
951 void *data)
952{
953 struct rpc_task *task;
954 task = rpc_new_task(clnt, flags, ops, data);
955 if (task == NULL) {
956 rpc_release_calldata(ops, data);
957 return ERR_PTR(-ENOMEM);
958 }
959 atomic_inc(&task->tk_count);
960 rpc_execute(task);
961 return task;
962}
963EXPORT_SYMBOL(rpc_run_task);
964
965/* 936/*
966 * Kill all tasks for the given client. 937 * Kill all tasks for the given client.
967 * XXX: kill their descendants as well? 938 * XXX: kill their descendants as well?
@@ -969,44 +940,25 @@ EXPORT_SYMBOL(rpc_run_task);
969void rpc_killall_tasks(struct rpc_clnt *clnt) 940void rpc_killall_tasks(struct rpc_clnt *clnt)
970{ 941{
971 struct rpc_task *rovr; 942 struct rpc_task *rovr;
972 struct list_head *le;
973 943
974 dprintk("RPC: killing all tasks for client %p\n", clnt);
975 944
945 if (list_empty(&clnt->cl_tasks))
946 return;
947 dprintk("RPC: killing all tasks for client %p\n", clnt);
976 /* 948 /*
977 * Spin lock all_tasks to prevent changes... 949 * Spin lock all_tasks to prevent changes...
978 */ 950 */
979 spin_lock(&rpc_sched_lock); 951 spin_lock(&clnt->cl_lock);
980 alltask_for_each(rovr, le, &all_tasks) { 952 list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
981 if (! RPC_IS_ACTIVATED(rovr)) 953 if (! RPC_IS_ACTIVATED(rovr))
982 continue; 954 continue;
983 if (!clnt || rovr->tk_client == clnt) { 955 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
984 rovr->tk_flags |= RPC_TASK_KILLED; 956 rovr->tk_flags |= RPC_TASK_KILLED;
985 rpc_exit(rovr, -EIO); 957 rpc_exit(rovr, -EIO);
986 rpc_wake_up_task(rovr); 958 rpc_wake_up_task(rovr);
987 } 959 }
988 } 960 }
989 spin_unlock(&rpc_sched_lock); 961 spin_unlock(&clnt->cl_lock);
990}
991
992static void rpciod_killall(void)
993{
994 unsigned long flags;
995
996 while (!list_empty(&all_tasks)) {
997 clear_thread_flag(TIF_SIGPENDING);
998 rpc_killall_tasks(NULL);
999 flush_workqueue(rpciod_workqueue);
1000 if (!list_empty(&all_tasks)) {
1001 dprintk("RPC: rpciod_killall: waiting for tasks "
1002 "to exit\n");
1003 yield();
1004 }
1005 }
1006
1007 spin_lock_irqsave(&current->sighand->siglock, flags);
1008 recalc_sigpending();
1009 spin_unlock_irqrestore(&current->sighand->siglock, flags);
1010} 962}
1011 963
1012/* 964/*
@@ -1018,28 +970,27 @@ rpciod_up(void)
1018 struct workqueue_struct *wq; 970 struct workqueue_struct *wq;
1019 int error = 0; 971 int error = 0;
1020 972
973 if (atomic_inc_not_zero(&rpciod_users))
974 return 0;
975
1021 mutex_lock(&rpciod_mutex); 976 mutex_lock(&rpciod_mutex);
1022 dprintk("RPC: rpciod_up: users %u\n", rpciod_users); 977
1023 rpciod_users++; 978 /* Guard against races with rpciod_down() */
1024 if (rpciod_workqueue) 979 if (rpciod_workqueue != NULL)
1025 goto out; 980 goto out_ok;
1026 /*
1027 * If there's no pid, we should be the first user.
1028 */
1029 if (rpciod_users > 1)
1030 printk(KERN_WARNING "rpciod_up: no workqueue, %u users??\n", rpciod_users);
1031 /* 981 /*
1032 * Create the rpciod thread and wait for it to start. 982 * Create the rpciod thread and wait for it to start.
1033 */ 983 */
984 dprintk("RPC: creating workqueue rpciod\n");
1034 error = -ENOMEM; 985 error = -ENOMEM;
1035 wq = create_workqueue("rpciod"); 986 wq = create_workqueue("rpciod");
1036 if (wq == NULL) { 987 if (wq == NULL)
1037 printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
1038 rpciod_users--;
1039 goto out; 988 goto out;
1040 } 989
1041 rpciod_workqueue = wq; 990 rpciod_workqueue = wq;
1042 error = 0; 991 error = 0;
992out_ok:
993 atomic_inc(&rpciod_users);
1043out: 994out:
1044 mutex_unlock(&rpciod_mutex); 995 mutex_unlock(&rpciod_mutex);
1045 return error; 996 return error;
@@ -1048,59 +999,19 @@ out:
1048void 999void
1049rpciod_down(void) 1000rpciod_down(void)
1050{ 1001{
1002 if (!atomic_dec_and_test(&rpciod_users))
1003 return;
1004
1051 mutex_lock(&rpciod_mutex); 1005 mutex_lock(&rpciod_mutex);
1052 dprintk("RPC: rpciod_down sema %u\n", rpciod_users); 1006 dprintk("RPC: destroying workqueue rpciod\n");
1053 if (rpciod_users) {
1054 if (--rpciod_users)
1055 goto out;
1056 } else
1057 printk(KERN_WARNING "rpciod_down: no users??\n");
1058 1007
1059 if (!rpciod_workqueue) { 1008 if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) {
1060 dprintk("RPC: rpciod_down: Nothing to do!\n"); 1009 destroy_workqueue(rpciod_workqueue);
1061 goto out; 1010 rpciod_workqueue = NULL;
1062 } 1011 }
1063 rpciod_killall();
1064
1065 destroy_workqueue(rpciod_workqueue);
1066 rpciod_workqueue = NULL;
1067 out:
1068 mutex_unlock(&rpciod_mutex); 1012 mutex_unlock(&rpciod_mutex);
1069} 1013}
1070 1014
1071#ifdef RPC_DEBUG
1072void rpc_show_tasks(void)
1073{
1074 struct list_head *le;
1075 struct rpc_task *t;
1076
1077 spin_lock(&rpc_sched_lock);
1078 if (list_empty(&all_tasks)) {
1079 spin_unlock(&rpc_sched_lock);
1080 return;
1081 }
1082 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1083 "-rpcwait -action- ---ops--\n");
1084 alltask_for_each(t, le, &all_tasks) {
1085 const char *rpc_waitq = "none";
1086
1087 if (RPC_IS_QUEUED(t))
1088 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
1089
1090 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1091 t->tk_pid,
1092 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
1093 t->tk_flags, t->tk_status,
1094 t->tk_client,
1095 (t->tk_client ? t->tk_client->cl_prog : 0),
1096 t->tk_rqstp, t->tk_timeout,
1097 rpc_waitq,
1098 t->tk_action, t->tk_ops);
1099 }
1100 spin_unlock(&rpc_sched_lock);
1101}
1102#endif
1103
1104void 1015void
1105rpc_destroy_mempool(void) 1016rpc_destroy_mempool(void)
1106{ 1017{