aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c109
1 files changed, 67 insertions, 42 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 4b22910b4461..4c669121e607 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -45,7 +45,7 @@ static void rpc_release_task(struct rpc_task *task);
45/* 45/*
46 * RPC tasks sit here while waiting for conditions to improve. 46 * RPC tasks sit here while waiting for conditions to improve.
47 */ 47 */
48static RPC_WAITQ(delay_queue, "delayq"); 48static struct rpc_wait_queue delay_queue;
49 49
50/* 50/*
51 * rpciod-related stuff 51 * rpciod-related stuff
@@ -135,7 +135,7 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
135 if (unlikely(task->tk_priority > queue->maxpriority)) 135 if (unlikely(task->tk_priority > queue->maxpriority))
136 q = &queue->tasks[queue->maxpriority]; 136 q = &queue->tasks[queue->maxpriority];
137 list_for_each_entry(t, q, u.tk_wait.list) { 137 list_for_each_entry(t, q, u.tk_wait.list) {
138 if (t->tk_cookie == task->tk_cookie) { 138 if (t->tk_owner == task->tk_owner) {
139 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 139 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
140 return; 140 return;
141 } 141 }
@@ -208,26 +208,26 @@ static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int
208 queue->count = 1 << (priority * 2); 208 queue->count = 1 << (priority * 2);
209} 209}
210 210
211static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) 211static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
212{ 212{
213 queue->cookie = cookie; 213 queue->owner = pid;
214 queue->nr = RPC_BATCH_COUNT; 214 queue->nr = RPC_BATCH_COUNT;
215} 215}
216 216
217static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 217static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
218{ 218{
219 rpc_set_waitqueue_priority(queue, queue->maxpriority); 219 rpc_set_waitqueue_priority(queue, queue->maxpriority);
220 rpc_set_waitqueue_cookie(queue, 0); 220 rpc_set_waitqueue_owner(queue, 0);
221} 221}
222 222
223static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) 223static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
224{ 224{
225 int i; 225 int i;
226 226
227 spin_lock_init(&queue->lock); 227 spin_lock_init(&queue->lock);
228 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 228 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
229 INIT_LIST_HEAD(&queue->tasks[i]); 229 INIT_LIST_HEAD(&queue->tasks[i]);
230 queue->maxpriority = maxprio; 230 queue->maxpriority = nr_queues - 1;
231 rpc_reset_waitqueue_priority(queue); 231 rpc_reset_waitqueue_priority(queue);
232#ifdef RPC_DEBUG 232#ifdef RPC_DEBUG
233 queue->name = qname; 233 queue->name = qname;
@@ -236,14 +236,14 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
236 236
237void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 237void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
238{ 238{
239 __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); 239 __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
240} 240}
241 241
242void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 242void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
243{ 243{
244 __rpc_init_priority_wait_queue(queue, qname, 0); 244 __rpc_init_priority_wait_queue(queue, qname, 1);
245} 245}
246EXPORT_SYMBOL(rpc_init_wait_queue); 246EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
247 247
248static int rpc_wait_bit_killable(void *word) 248static int rpc_wait_bit_killable(void *word)
249{ 249{
@@ -303,7 +303,7 @@ int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
303 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 303 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
304 action, TASK_KILLABLE); 304 action, TASK_KILLABLE);
305} 305}
306EXPORT_SYMBOL(__rpc_wait_for_completion_task); 306EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
307 307
308/* 308/*
309 * Make an RPC task runnable. 309 * Make an RPC task runnable.
@@ -373,6 +373,7 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
373 __rpc_sleep_on(q, task, action, timer); 373 __rpc_sleep_on(q, task, action, timer);
374 spin_unlock_bh(&q->lock); 374 spin_unlock_bh(&q->lock);
375} 375}
376EXPORT_SYMBOL_GPL(rpc_sleep_on);
376 377
377/** 378/**
378 * __rpc_do_wake_up_task - wake up a single rpc_task 379 * __rpc_do_wake_up_task - wake up a single rpc_task
@@ -444,6 +445,7 @@ void rpc_wake_up_task(struct rpc_task *task)
444 } 445 }
445 rcu_read_unlock_bh(); 446 rcu_read_unlock_bh();
446} 447}
448EXPORT_SYMBOL_GPL(rpc_wake_up_task);
447 449
448/* 450/*
449 * Wake up the next task on a priority queue. 451 * Wake up the next task on a priority queue.
@@ -454,12 +456,12 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
454 struct rpc_task *task; 456 struct rpc_task *task;
455 457
456 /* 458 /*
457 * Service a batch of tasks from a single cookie. 459 * Service a batch of tasks from a single owner.
458 */ 460 */
459 q = &queue->tasks[queue->priority]; 461 q = &queue->tasks[queue->priority];
460 if (!list_empty(q)) { 462 if (!list_empty(q)) {
461 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 463 task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
462 if (queue->cookie == task->tk_cookie) { 464 if (queue->owner == task->tk_owner) {
463 if (--queue->nr) 465 if (--queue->nr)
464 goto out; 466 goto out;
465 list_move_tail(&task->u.tk_wait.list, q); 467 list_move_tail(&task->u.tk_wait.list, q);
@@ -468,7 +470,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
468 * Check if we need to switch queues. 470 * Check if we need to switch queues.
469 */ 471 */
470 if (--queue->count) 472 if (--queue->count)
471 goto new_cookie; 473 goto new_owner;
472 } 474 }
473 475
474 /* 476 /*
@@ -490,8 +492,8 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
490 492
491new_queue: 493new_queue:
492 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 494 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
493new_cookie: 495new_owner:
494 rpc_set_waitqueue_cookie(queue, task->tk_cookie); 496 rpc_set_waitqueue_owner(queue, task->tk_owner);
495out: 497out:
496 __rpc_wake_up_task(task); 498 __rpc_wake_up_task(task);
497 return task; 499 return task;
@@ -519,6 +521,7 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
519 521
520 return task; 522 return task;
521} 523}
524EXPORT_SYMBOL_GPL(rpc_wake_up_next);
522 525
523/** 526/**
524 * rpc_wake_up - wake up all rpc_tasks 527 * rpc_wake_up - wake up all rpc_tasks
@@ -544,6 +547,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
544 spin_unlock(&queue->lock); 547 spin_unlock(&queue->lock);
545 rcu_read_unlock_bh(); 548 rcu_read_unlock_bh();
546} 549}
550EXPORT_SYMBOL_GPL(rpc_wake_up);
547 551
548/** 552/**
549 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 553 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
@@ -572,6 +576,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
572 spin_unlock(&queue->lock); 576 spin_unlock(&queue->lock);
573 rcu_read_unlock_bh(); 577 rcu_read_unlock_bh();
574} 578}
579EXPORT_SYMBOL_GPL(rpc_wake_up_status);
575 580
576static void __rpc_atrun(struct rpc_task *task) 581static void __rpc_atrun(struct rpc_task *task)
577{ 582{
@@ -586,6 +591,7 @@ void rpc_delay(struct rpc_task *task, unsigned long delay)
586 task->tk_timeout = delay; 591 task->tk_timeout = delay;
587 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 592 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
588} 593}
594EXPORT_SYMBOL_GPL(rpc_delay);
589 595
590/* 596/*
591 * Helper to call task->tk_ops->rpc_call_prepare 597 * Helper to call task->tk_ops->rpc_call_prepare
@@ -614,7 +620,7 @@ void rpc_exit_task(struct rpc_task *task)
614 } 620 }
615 } 621 }
616} 622}
617EXPORT_SYMBOL(rpc_exit_task); 623EXPORT_SYMBOL_GPL(rpc_exit_task);
618 624
619void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) 625void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
620{ 626{
@@ -807,38 +813,47 @@ EXPORT_SYMBOL_GPL(rpc_free);
807/* 813/*
808 * Creation and deletion of RPC task structures 814 * Creation and deletion of RPC task structures
809 */ 815 */
810void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 816static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
811{ 817{
812 memset(task, 0, sizeof(*task)); 818 memset(task, 0, sizeof(*task));
813 init_timer(&task->tk_timer); 819 setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
814 task->tk_timer.data = (unsigned long) task; 820 (unsigned long)task);
815 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
816 atomic_set(&task->tk_count, 1); 821 atomic_set(&task->tk_count, 1);
817 task->tk_client = clnt; 822 task->tk_flags = task_setup_data->flags;
818 task->tk_flags = flags; 823 task->tk_ops = task_setup_data->callback_ops;
819 task->tk_ops = tk_ops; 824 task->tk_calldata = task_setup_data->callback_data;
820 if (tk_ops->rpc_call_prepare != NULL)
821 task->tk_action = rpc_prepare_task;
822 task->tk_calldata = calldata;
823 INIT_LIST_HEAD(&task->tk_task); 825 INIT_LIST_HEAD(&task->tk_task);
824 826
825 /* Initialize retry counters */ 827 /* Initialize retry counters */
826 task->tk_garb_retry = 2; 828 task->tk_garb_retry = 2;
827 task->tk_cred_retry = 2; 829 task->tk_cred_retry = 2;
828 830
829 task->tk_priority = RPC_PRIORITY_NORMAL; 831 task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
830 task->tk_cookie = (unsigned long)current; 832 task->tk_owner = current->tgid;
831 833
832 /* Initialize workqueue for async tasks */ 834 /* Initialize workqueue for async tasks */
833 task->tk_workqueue = rpciod_workqueue; 835 task->tk_workqueue = rpciod_workqueue;
834 836
835 if (clnt) { 837 task->tk_client = task_setup_data->rpc_client;
836 kref_get(&clnt->cl_kref); 838 if (task->tk_client != NULL) {
837 if (clnt->cl_softrtry) 839 kref_get(&task->tk_client->cl_kref);
840 if (task->tk_client->cl_softrtry)
838 task->tk_flags |= RPC_TASK_SOFT; 841 task->tk_flags |= RPC_TASK_SOFT;
839 } 842 }
840 843
841 BUG_ON(task->tk_ops == NULL); 844 if (task->tk_ops->rpc_call_prepare != NULL)
845 task->tk_action = rpc_prepare_task;
846
847 if (task_setup_data->rpc_message != NULL) {
848 memcpy(&task->tk_msg, task_setup_data->rpc_message, sizeof(task->tk_msg));
849 /* Bind the user cred */
850 if (task->tk_msg.rpc_cred != NULL)
851 rpcauth_holdcred(task);
852 else
853 rpcauth_bindcred(task);
854 if (task->tk_action == NULL)
855 rpc_call_start(task);
856 }
842 857
843 /* starting timestamp */ 858 /* starting timestamp */
844 task->tk_start = jiffies; 859 task->tk_start = jiffies;
@@ -863,18 +878,22 @@ static void rpc_free_task(struct rcu_head *rcu)
863/* 878/*
864 * Create a new task for the specified client. 879 * Create a new task for the specified client.
865 */ 880 */
866struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 881struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
867{ 882{
868 struct rpc_task *task; 883 struct rpc_task *task = setup_data->task;
869 884 unsigned short flags = 0;
870 task = rpc_alloc_task(); 885
871 if (!task) 886 if (task == NULL) {
872 goto out; 887 task = rpc_alloc_task();
888 if (task == NULL)
889 goto out;
890 flags = RPC_TASK_DYNAMIC;
891 }
873 892
874 rpc_init_task(task, clnt, flags, tk_ops, calldata); 893 rpc_init_task(task, setup_data);
875 894
895 task->tk_flags |= flags;
876 dprintk("RPC: allocated task %p\n", task); 896 dprintk("RPC: allocated task %p\n", task);
877 task->tk_flags |= RPC_TASK_DYNAMIC;
878out: 897out:
879 return task; 898 return task;
880} 899}
@@ -900,7 +919,7 @@ void rpc_put_task(struct rpc_task *task)
900 call_rcu_bh(&task->u.tk_rcu, rpc_free_task); 919 call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
901 rpc_release_calldata(tk_ops, calldata); 920 rpc_release_calldata(tk_ops, calldata);
902} 921}
903EXPORT_SYMBOL(rpc_put_task); 922EXPORT_SYMBOL_GPL(rpc_put_task);
904 923
905static void rpc_release_task(struct rpc_task *task) 924static void rpc_release_task(struct rpc_task *task)
906{ 925{
@@ -957,6 +976,7 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
957 } 976 }
958 spin_unlock(&clnt->cl_lock); 977 spin_unlock(&clnt->cl_lock);
959} 978}
979EXPORT_SYMBOL_GPL(rpc_killall_tasks);
960 980
961int rpciod_up(void) 981int rpciod_up(void)
962{ 982{
@@ -1036,6 +1056,11 @@ rpc_init_mempool(void)
1036 goto err_nomem; 1056 goto err_nomem;
1037 if (!rpciod_start()) 1057 if (!rpciod_start())
1038 goto err_nomem; 1058 goto err_nomem;
1059 /*
1060 * The following is not strictly a mempool initialisation,
1061 * but there is no harm in doing it here
1062 */
1063 rpc_init_wait_queue(&delay_queue, "delayq");
1039 return 0; 1064 return 0;
1040err_nomem: 1065err_nomem:
1041 rpc_destroy_mempool(); 1066 rpc_destroy_mempool();