aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c101
1 files changed, 67 insertions, 34 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 6357fcb00c7e..bfa31714581f 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -98,6 +98,23 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
98 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list); 98 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
99} 99}
100 100
101static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
102{
103 queue->priority = priority;
104}
105
106static void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
107{
108 queue->owner = pid;
109 queue->nr = RPC_BATCH_COUNT;
110}
111
112static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
113{
114 rpc_set_waitqueue_priority(queue, queue->maxpriority);
115 rpc_set_waitqueue_owner(queue, 0);
116}
117
101/* 118/*
102 * Add new request to a priority queue. 119 * Add new request to a priority queue.
103 */ 120 */
@@ -109,9 +126,11 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
109 struct rpc_task *t; 126 struct rpc_task *t;
110 127
111 INIT_LIST_HEAD(&task->u.tk_wait.links); 128 INIT_LIST_HEAD(&task->u.tk_wait.links);
112 q = &queue->tasks[queue_priority];
113 if (unlikely(queue_priority > queue->maxpriority)) 129 if (unlikely(queue_priority > queue->maxpriority))
114 q = &queue->tasks[queue->maxpriority]; 130 queue_priority = queue->maxpriority;
131 if (queue_priority > queue->priority)
132 rpc_set_waitqueue_priority(queue, queue_priority);
133 q = &queue->tasks[queue_priority];
115 list_for_each_entry(t, q, u.tk_wait.list) { 134 list_for_each_entry(t, q, u.tk_wait.list) {
116 if (t->tk_owner == task->tk_owner) { 135 if (t->tk_owner == task->tk_owner) {
117 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 136 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
@@ -133,7 +152,9 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
133 struct rpc_task *task, 152 struct rpc_task *task,
134 unsigned char queue_priority) 153 unsigned char queue_priority)
135{ 154{
136 BUG_ON (RPC_IS_QUEUED(task)); 155 WARN_ON_ONCE(RPC_IS_QUEUED(task));
156 if (RPC_IS_QUEUED(task))
157 return;
137 158
138 if (RPC_IS_PRIORITY(queue)) 159 if (RPC_IS_PRIORITY(queue))
139 __rpc_add_wait_queue_priority(queue, task, queue_priority); 160 __rpc_add_wait_queue_priority(queue, task, queue_priority);
@@ -178,24 +199,6 @@ static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_tas
178 task->tk_pid, queue, rpc_qname(queue)); 199 task->tk_pid, queue, rpc_qname(queue));
179} 200}
180 201
181static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
182{
183 queue->priority = priority;
184 queue->count = 1 << (priority * 2);
185}
186
187static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
188{
189 queue->owner = pid;
190 queue->nr = RPC_BATCH_COUNT;
191}
192
193static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
194{
195 rpc_set_waitqueue_priority(queue, queue->maxpriority);
196 rpc_set_waitqueue_owner(queue, 0);
197}
198
199static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues) 202static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
200{ 203{
201 int i; 204 int i;
@@ -334,7 +337,7 @@ static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
334 337
335 __rpc_add_wait_queue(q, task, queue_priority); 338 __rpc_add_wait_queue(q, task, queue_priority);
336 339
337 BUG_ON(task->tk_callback != NULL); 340 WARN_ON_ONCE(task->tk_callback != NULL);
338 task->tk_callback = action; 341 task->tk_callback = action;
339 __rpc_add_timer(q, task); 342 __rpc_add_timer(q, task);
340} 343}
@@ -343,7 +346,12 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
343 rpc_action action) 346 rpc_action action)
344{ 347{
345 /* We shouldn't ever put an inactive task to sleep */ 348 /* We shouldn't ever put an inactive task to sleep */
346 BUG_ON(!RPC_IS_ACTIVATED(task)); 349 WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
350 if (!RPC_IS_ACTIVATED(task)) {
351 task->tk_status = -EIO;
352 rpc_put_task_async(task);
353 return;
354 }
347 355
348 /* 356 /*
349 * Protect the queue operations. 357 * Protect the queue operations.
@@ -358,7 +366,12 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
358 rpc_action action, int priority) 366 rpc_action action, int priority)
359{ 367{
360 /* We shouldn't ever put an inactive task to sleep */ 368 /* We shouldn't ever put an inactive task to sleep */
361 BUG_ON(!RPC_IS_ACTIVATED(task)); 369 WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
370 if (!RPC_IS_ACTIVATED(task)) {
371 task->tk_status = -EIO;
372 rpc_put_task_async(task);
373 return;
374 }
362 375
363 /* 376 /*
364 * Protect the queue operations. 377 * Protect the queue operations.
@@ -367,6 +380,7 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
367 __rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW); 380 __rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
368 spin_unlock_bh(&q->lock); 381 spin_unlock_bh(&q->lock);
369} 382}
383EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
370 384
371/** 385/**
372 * __rpc_do_wake_up_task - wake up a single rpc_task 386 * __rpc_do_wake_up_task - wake up a single rpc_task
@@ -451,8 +465,7 @@ static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *q
451 /* 465 /*
452 * Check if we need to switch queues. 466 * Check if we need to switch queues.
453 */ 467 */
454 if (--queue->count) 468 goto new_owner;
455 goto new_owner;
456 } 469 }
457 470
458 /* 471 /*
@@ -697,7 +710,9 @@ static void __rpc_execute(struct rpc_task *task)
697 dprintk("RPC: %5u __rpc_execute flags=0x%x\n", 710 dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
698 task->tk_pid, task->tk_flags); 711 task->tk_pid, task->tk_flags);
699 712
700 BUG_ON(RPC_IS_QUEUED(task)); 713 WARN_ON_ONCE(RPC_IS_QUEUED(task));
714 if (RPC_IS_QUEUED(task))
715 return;
701 716
702 for (;;) { 717 for (;;) {
703 void (*do_action)(struct rpc_task *); 718 void (*do_action)(struct rpc_task *);
@@ -919,16 +934,35 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
919 return task; 934 return task;
920} 935}
921 936
937/*
938 * rpc_free_task - release rpc task and perform cleanups
939 *
940 * Note that we free up the rpc_task _after_ rpc_release_calldata()
941 * in order to work around a workqueue dependency issue.
942 *
943 * Tejun Heo states:
944 * "Workqueue currently considers two work items to be the same if they're
945 * on the same address and won't execute them concurrently - ie. it
946 * makes a work item which is queued again while being executed wait
947 * for the previous execution to complete.
948 *
949 * If a work function frees the work item, and then waits for an event
950 * which should be performed by another work item and *that* work item
951 * recycles the freed work item, it can create a false dependency loop.
952 * There really is no reliable way to detect this short of verifying
953 * every memory free."
954 *
955 */
922static void rpc_free_task(struct rpc_task *task) 956static void rpc_free_task(struct rpc_task *task)
923{ 957{
924 const struct rpc_call_ops *tk_ops = task->tk_ops; 958 unsigned short tk_flags = task->tk_flags;
925 void *calldata = task->tk_calldata; 959
960 rpc_release_calldata(task->tk_ops, task->tk_calldata);
926 961
927 if (task->tk_flags & RPC_TASK_DYNAMIC) { 962 if (tk_flags & RPC_TASK_DYNAMIC) {
928 dprintk("RPC: %5u freeing task\n", task->tk_pid); 963 dprintk("RPC: %5u freeing task\n", task->tk_pid);
929 mempool_free(task, rpc_task_mempool); 964 mempool_free(task, rpc_task_mempool);
930 } 965 }
931 rpc_release_calldata(tk_ops, calldata);
932} 966}
933 967
934static void rpc_async_release(struct work_struct *work) 968static void rpc_async_release(struct work_struct *work)
@@ -938,8 +972,7 @@ static void rpc_async_release(struct work_struct *work)
938 972
939static void rpc_release_resources_task(struct rpc_task *task) 973static void rpc_release_resources_task(struct rpc_task *task)
940{ 974{
941 if (task->tk_rqstp) 975 xprt_release(task);
942 xprt_release(task);
943 if (task->tk_msg.rpc_cred) { 976 if (task->tk_msg.rpc_cred) {
944 put_rpccred(task->tk_msg.rpc_cred); 977 put_rpccred(task->tk_msg.rpc_cred);
945 task->tk_msg.rpc_cred = NULL; 978 task->tk_msg.rpc_cred = NULL;
@@ -981,7 +1014,7 @@ static void rpc_release_task(struct rpc_task *task)
981{ 1014{
982 dprintk("RPC: %5u release task\n", task->tk_pid); 1015 dprintk("RPC: %5u release task\n", task->tk_pid);
983 1016
984 BUG_ON (RPC_IS_QUEUED(task)); 1017 WARN_ON_ONCE(RPC_IS_QUEUED(task));
985 1018
986 rpc_release_resources_task(task); 1019 rpc_release_resources_task(task);
987 1020