aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c5
-rw-r--r--net/sunrpc/clnt.c8
-rw-r--r--net/sunrpc/rpcb_clnt.c2
-rw-r--r--net/sunrpc/sched.c255
-rw-r--r--net/sunrpc/xprt.c45
-rw-r--r--net/sunrpc/xprtsock.c7
6 files changed, 155 insertions, 167 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 6dac38792288..ef6384961808 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -266,6 +266,7 @@ gss_release_msg(struct gss_upcall_msg *gss_msg)
266 BUG_ON(!list_empty(&gss_msg->list)); 266 BUG_ON(!list_empty(&gss_msg->list));
267 if (gss_msg->ctx != NULL) 267 if (gss_msg->ctx != NULL)
268 gss_put_ctx(gss_msg->ctx); 268 gss_put_ctx(gss_msg->ctx);
269 rpc_destroy_wait_queue(&gss_msg->rpc_waitqueue);
269 kfree(gss_msg); 270 kfree(gss_msg);
270} 271}
271 272
@@ -408,13 +409,13 @@ gss_refresh_upcall(struct rpc_task *task)
408 } 409 }
409 spin_lock(&inode->i_lock); 410 spin_lock(&inode->i_lock);
410 if (gss_cred->gc_upcall != NULL) 411 if (gss_cred->gc_upcall != NULL)
411 rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL, NULL); 412 rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL);
412 else if (gss_msg->ctx == NULL && gss_msg->msg.errno >= 0) { 413 else if (gss_msg->ctx == NULL && gss_msg->msg.errno >= 0) {
413 task->tk_timeout = 0; 414 task->tk_timeout = 0;
414 gss_cred->gc_upcall = gss_msg; 415 gss_cred->gc_upcall = gss_msg;
415 /* gss_upcall_callback will release the reference to gss_upcall_msg */ 416 /* gss_upcall_callback will release the reference to gss_upcall_msg */
416 atomic_inc(&gss_msg->count); 417 atomic_inc(&gss_msg->count);
417 rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback, NULL); 418 rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback);
418 } else 419 } else
419 err = gss_msg->msg.errno; 420 err = gss_msg->msg.errno;
420 spin_unlock(&inode->i_lock); 421 spin_unlock(&inode->i_lock);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 8c6a7f1a25e9..ea14314331b0 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -548,7 +548,7 @@ EXPORT_SYMBOL_GPL(rpc_run_task);
548 * @msg: RPC call parameters 548 * @msg: RPC call parameters
549 * @flags: RPC call flags 549 * @flags: RPC call flags
550 */ 550 */
551int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags) 551int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
552{ 552{
553 struct rpc_task *task; 553 struct rpc_task *task;
554 struct rpc_task_setup task_setup_data = { 554 struct rpc_task_setup task_setup_data = {
@@ -579,7 +579,7 @@ EXPORT_SYMBOL_GPL(rpc_call_sync);
579 * @data: user call data 579 * @data: user call data
580 */ 580 */
581int 581int
582rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, 582rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
583 const struct rpc_call_ops *tk_ops, void *data) 583 const struct rpc_call_ops *tk_ops, void *data)
584{ 584{
585 struct rpc_task *task; 585 struct rpc_task *task;
@@ -1066,7 +1066,7 @@ call_transmit(struct rpc_task *task)
1066 if (task->tk_msg.rpc_proc->p_decode != NULL) 1066 if (task->tk_msg.rpc_proc->p_decode != NULL)
1067 return; 1067 return;
1068 task->tk_action = rpc_exit_task; 1068 task->tk_action = rpc_exit_task;
1069 rpc_wake_up_task(task); 1069 rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1070} 1070}
1071 1071
1072/* 1072/*
@@ -1535,7 +1535,7 @@ void rpc_show_tasks(void)
1535 proc = -1; 1535 proc = -1;
1536 1536
1537 if (RPC_IS_QUEUED(t)) 1537 if (RPC_IS_QUEUED(t))
1538 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1538 rpc_waitq = rpc_qname(t->tk_waitqueue);
1539 1539
1540 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", 1540 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1541 t->tk_pid, proc, 1541 t->tk_pid, proc,
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 3164a0871cf0..f480c718b400 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -298,7 +298,7 @@ void rpcb_getport_async(struct rpc_task *task)
298 298
299 /* Put self on queue before sending rpcbind request, in case 299 /* Put self on queue before sending rpcbind request, in case
300 * rpcb_getport_done completes before we return from rpc_run_task */ 300 * rpcb_getport_done completes before we return from rpc_run_task */
301 rpc_sleep_on(&xprt->binding, task, NULL, NULL); 301 rpc_sleep_on(&xprt->binding, task, NULL);
302 302
303 /* Someone else may have bound if we slept */ 303 /* Someone else may have bound if we slept */
304 if (xprt_bound(xprt)) { 304 if (xprt_bound(xprt)) {
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 4c669121e607..cae219c8caeb 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -38,9 +38,9 @@ static struct kmem_cache *rpc_buffer_slabp __read_mostly;
38static mempool_t *rpc_task_mempool __read_mostly; 38static mempool_t *rpc_task_mempool __read_mostly;
39static mempool_t *rpc_buffer_mempool __read_mostly; 39static mempool_t *rpc_buffer_mempool __read_mostly;
40 40
41static void __rpc_default_timer(struct rpc_task *task);
42static void rpc_async_schedule(struct work_struct *); 41static void rpc_async_schedule(struct work_struct *);
43static void rpc_release_task(struct rpc_task *task); 42static void rpc_release_task(struct rpc_task *task);
43static void __rpc_queue_timer_fn(unsigned long ptr);
44 44
45/* 45/*
46 * RPC tasks sit here while waiting for conditions to improve. 46 * RPC tasks sit here while waiting for conditions to improve.
@@ -57,41 +57,30 @@ struct workqueue_struct *rpciod_workqueue;
57 * queue->lock and bh_disabled in order to avoid races within 57 * queue->lock and bh_disabled in order to avoid races within
58 * rpc_run_timer(). 58 * rpc_run_timer().
59 */ 59 */
60static inline void 60static void
61__rpc_disable_timer(struct rpc_task *task) 61__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
62{ 62{
63 if (task->tk_timeout == 0)
64 return;
63 dprintk("RPC: %5u disabling timer\n", task->tk_pid); 65 dprintk("RPC: %5u disabling timer\n", task->tk_pid);
64 task->tk_timeout_fn = NULL;
65 task->tk_timeout = 0; 66 task->tk_timeout = 0;
67 list_del(&task->u.tk_wait.timer_list);
68 if (list_empty(&queue->timer_list.list))
69 del_timer(&queue->timer_list.timer);
66} 70}
67 71
68/* 72static void
69 * Run a timeout function. 73rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
70 * We use the callback in order to allow __rpc_wake_up_task()
71 * and friends to disable the timer synchronously on SMP systems
72 * without calling del_timer_sync(). The latter could cause a
73 * deadlock if called while we're holding spinlocks...
74 */
75static void rpc_run_timer(struct rpc_task *task)
76{ 74{
77 void (*callback)(struct rpc_task *); 75 queue->timer_list.expires = expires;
78 76 mod_timer(&queue->timer_list.timer, expires);
79 callback = task->tk_timeout_fn;
80 task->tk_timeout_fn = NULL;
81 if (callback && RPC_IS_QUEUED(task)) {
82 dprintk("RPC: %5u running timer\n", task->tk_pid);
83 callback(task);
84 }
85 smp_mb__before_clear_bit();
86 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
87 smp_mb__after_clear_bit();
88} 77}
89 78
90/* 79/*
91 * Set up a timer for the current task. 80 * Set up a timer for the current task.
92 */ 81 */
93static inline void 82static void
94__rpc_add_timer(struct rpc_task *task, rpc_action timer) 83__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
95{ 84{
96 if (!task->tk_timeout) 85 if (!task->tk_timeout)
97 return; 86 return;
@@ -99,27 +88,10 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer)
99 dprintk("RPC: %5u setting alarm for %lu ms\n", 88 dprintk("RPC: %5u setting alarm for %lu ms\n",
100 task->tk_pid, task->tk_timeout * 1000 / HZ); 89 task->tk_pid, task->tk_timeout * 1000 / HZ);
101 90
102 if (timer) 91 task->u.tk_wait.expires = jiffies + task->tk_timeout;
103 task->tk_timeout_fn = timer; 92 if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
104 else 93 rpc_set_queue_timer(queue, task->u.tk_wait.expires);
105 task->tk_timeout_fn = __rpc_default_timer; 94 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
106 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
107 mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
108}
109
110/*
111 * Delete any timer for the current task. Because we use del_timer_sync(),
112 * this function should never be called while holding queue->lock.
113 */
114static void
115rpc_delete_timer(struct rpc_task *task)
116{
117 if (RPC_IS_QUEUED(task))
118 return;
119 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
120 del_singleshot_timer_sync(&task->tk_timer);
121 dprintk("RPC: %5u deleting timer\n", task->tk_pid);
122 }
123} 95}
124 96
125/* 97/*
@@ -161,7 +133,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *
161 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 133 list_add(&task->u.tk_wait.list, &queue->tasks[0]);
162 else 134 else
163 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 135 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
164 task->u.tk_wait.rpc_waitq = queue; 136 task->tk_waitqueue = queue;
165 queue->qlen++; 137 queue->qlen++;
166 rpc_set_queued(task); 138 rpc_set_queued(task);
167 139
@@ -181,22 +153,18 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
181 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 153 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
182 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 154 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
183 } 155 }
184 list_del(&task->u.tk_wait.list);
185} 156}
186 157
187/* 158/*
188 * Remove request from queue. 159 * Remove request from queue.
189 * Note: must be called with spin lock held. 160 * Note: must be called with spin lock held.
190 */ 161 */
191static void __rpc_remove_wait_queue(struct rpc_task *task) 162static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
192{ 163{
193 struct rpc_wait_queue *queue; 164 __rpc_disable_timer(queue, task);
194 queue = task->u.tk_wait.rpc_waitq;
195
196 if (RPC_IS_PRIORITY(queue)) 165 if (RPC_IS_PRIORITY(queue))
197 __rpc_remove_wait_queue_priority(task); 166 __rpc_remove_wait_queue_priority(task);
198 else 167 list_del(&task->u.tk_wait.list);
199 list_del(&task->u.tk_wait.list);
200 queue->qlen--; 168 queue->qlen--;
201 dprintk("RPC: %5u removed from queue %p \"%s\"\n", 169 dprintk("RPC: %5u removed from queue %p \"%s\"\n",
202 task->tk_pid, queue, rpc_qname(queue)); 170 task->tk_pid, queue, rpc_qname(queue));
@@ -229,6 +197,9 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
229 INIT_LIST_HEAD(&queue->tasks[i]); 197 INIT_LIST_HEAD(&queue->tasks[i]);
230 queue->maxpriority = nr_queues - 1; 198 queue->maxpriority = nr_queues - 1;
231 rpc_reset_waitqueue_priority(queue); 199 rpc_reset_waitqueue_priority(queue);
200 queue->qlen = 0;
201 setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
202 INIT_LIST_HEAD(&queue->timer_list.list);
232#ifdef RPC_DEBUG 203#ifdef RPC_DEBUG
233 queue->name = qname; 204 queue->name = qname;
234#endif 205#endif
@@ -245,6 +216,12 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
245} 216}
246EXPORT_SYMBOL_GPL(rpc_init_wait_queue); 217EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
247 218
219void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
220{
221 del_timer_sync(&queue->timer_list.timer);
222}
223EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
224
248static int rpc_wait_bit_killable(void *word) 225static int rpc_wait_bit_killable(void *word)
249{ 226{
250 if (fatal_signal_pending(current)) 227 if (fatal_signal_pending(current))
@@ -313,7 +290,6 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
313 */ 290 */
314static void rpc_make_runnable(struct rpc_task *task) 291static void rpc_make_runnable(struct rpc_task *task)
315{ 292{
316 BUG_ON(task->tk_timeout_fn);
317 rpc_clear_queued(task); 293 rpc_clear_queued(task);
318 if (rpc_test_and_set_running(task)) 294 if (rpc_test_and_set_running(task))
319 return; 295 return;
@@ -326,7 +302,7 @@ static void rpc_make_runnable(struct rpc_task *task)
326 int status; 302 int status;
327 303
328 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 304 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
329 status = queue_work(task->tk_workqueue, &task->u.tk_work); 305 status = queue_work(rpciod_workqueue, &task->u.tk_work);
330 if (status < 0) { 306 if (status < 0) {
331 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 307 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
332 task->tk_status = status; 308 task->tk_status = status;
@@ -343,7 +319,7 @@ static void rpc_make_runnable(struct rpc_task *task)
343 * as it's on a wait queue. 319 * as it's on a wait queue.
344 */ 320 */
345static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 321static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
346 rpc_action action, rpc_action timer) 322 rpc_action action)
347{ 323{
348 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", 324 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
349 task->tk_pid, rpc_qname(q), jiffies); 325 task->tk_pid, rpc_qname(q), jiffies);
@@ -357,11 +333,11 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
357 333
358 BUG_ON(task->tk_callback != NULL); 334 BUG_ON(task->tk_callback != NULL);
359 task->tk_callback = action; 335 task->tk_callback = action;
360 __rpc_add_timer(task, timer); 336 __rpc_add_timer(q, task);
361} 337}
362 338
363void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 339void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
364 rpc_action action, rpc_action timer) 340 rpc_action action)
365{ 341{
366 /* Mark the task as being activated if so needed */ 342 /* Mark the task as being activated if so needed */
367 rpc_set_active(task); 343 rpc_set_active(task);
@@ -370,18 +346,19 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
370 * Protect the queue operations. 346 * Protect the queue operations.
371 */ 347 */
372 spin_lock_bh(&q->lock); 348 spin_lock_bh(&q->lock);
373 __rpc_sleep_on(q, task, action, timer); 349 __rpc_sleep_on(q, task, action);
374 spin_unlock_bh(&q->lock); 350 spin_unlock_bh(&q->lock);
375} 351}
376EXPORT_SYMBOL_GPL(rpc_sleep_on); 352EXPORT_SYMBOL_GPL(rpc_sleep_on);
377 353
378/** 354/**
379 * __rpc_do_wake_up_task - wake up a single rpc_task 355 * __rpc_do_wake_up_task - wake up a single rpc_task
356 * @queue: wait queue
380 * @task: task to be woken up 357 * @task: task to be woken up
381 * 358 *
382 * Caller must hold queue->lock, and have cleared the task queued flag. 359 * Caller must hold queue->lock, and have cleared the task queued flag.
383 */ 360 */
384static void __rpc_do_wake_up_task(struct rpc_task *task) 361static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
385{ 362{
386 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", 363 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
387 task->tk_pid, jiffies); 364 task->tk_pid, jiffies);
@@ -395,8 +372,7 @@ static void __rpc_do_wake_up_task(struct rpc_task *task)
395 return; 372 return;
396 } 373 }
397 374
398 __rpc_disable_timer(task); 375 __rpc_remove_wait_queue(queue, task);
399 __rpc_remove_wait_queue(task);
400 376
401 rpc_make_runnable(task); 377 rpc_make_runnable(task);
402 378
@@ -404,48 +380,32 @@ static void __rpc_do_wake_up_task(struct rpc_task *task)
404} 380}
405 381
406/* 382/*
407 * Wake up the specified task 383 * Wake up a queued task while the queue lock is being held
408 */ 384 */
409static void __rpc_wake_up_task(struct rpc_task *task) 385static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
410{ 386{
411 if (rpc_start_wakeup(task)) { 387 if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
412 if (RPC_IS_QUEUED(task)) 388 __rpc_do_wake_up_task(queue, task);
413 __rpc_do_wake_up_task(task);
414 rpc_finish_wakeup(task);
415 }
416} 389}
417 390
418/* 391/*
419 * Default timeout handler if none specified by user 392 * Wake up a task on a specific queue
420 */ 393 */
421static void 394void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
422__rpc_default_timer(struct rpc_task *task)
423{ 395{
424 dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid); 396 spin_lock_bh(&queue->lock);
425 task->tk_status = -ETIMEDOUT; 397 rpc_wake_up_task_queue_locked(queue, task);
426 rpc_wake_up_task(task); 398 spin_unlock_bh(&queue->lock);
427} 399}
400EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
428 401
429/* 402/*
430 * Wake up the specified task 403 * Wake up the specified task
431 */ 404 */
432void rpc_wake_up_task(struct rpc_task *task) 405static void rpc_wake_up_task(struct rpc_task *task)
433{ 406{
434 rcu_read_lock_bh(); 407 rpc_wake_up_queued_task(task->tk_waitqueue, task);
435 if (rpc_start_wakeup(task)) {
436 if (RPC_IS_QUEUED(task)) {
437 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
438
439 /* Note: we're already in a bh-safe context */
440 spin_lock(&queue->lock);
441 __rpc_do_wake_up_task(task);
442 spin_unlock(&queue->lock);
443 }
444 rpc_finish_wakeup(task);
445 }
446 rcu_read_unlock_bh();
447} 408}
448EXPORT_SYMBOL_GPL(rpc_wake_up_task);
449 409
450/* 410/*
451 * Wake up the next task on a priority queue. 411 * Wake up the next task on a priority queue.
@@ -495,7 +455,7 @@ new_queue:
495new_owner: 455new_owner:
496 rpc_set_waitqueue_owner(queue, task->tk_owner); 456 rpc_set_waitqueue_owner(queue, task->tk_owner);
497out: 457out:
498 __rpc_wake_up_task(task); 458 rpc_wake_up_task_queue_locked(queue, task);
499 return task; 459 return task;
500} 460}
501 461
@@ -508,16 +468,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
508 468
509 dprintk("RPC: wake_up_next(%p \"%s\")\n", 469 dprintk("RPC: wake_up_next(%p \"%s\")\n",
510 queue, rpc_qname(queue)); 470 queue, rpc_qname(queue));
511 rcu_read_lock_bh(); 471 spin_lock_bh(&queue->lock);
512 spin_lock(&queue->lock);
513 if (RPC_IS_PRIORITY(queue)) 472 if (RPC_IS_PRIORITY(queue))
514 task = __rpc_wake_up_next_priority(queue); 473 task = __rpc_wake_up_next_priority(queue);
515 else { 474 else {
516 task_for_first(task, &queue->tasks[0]) 475 task_for_first(task, &queue->tasks[0])
517 __rpc_wake_up_task(task); 476 rpc_wake_up_task_queue_locked(queue, task);
518 } 477 }
519 spin_unlock(&queue->lock); 478 spin_unlock_bh(&queue->lock);
520 rcu_read_unlock_bh();
521 479
522 return task; 480 return task;
523} 481}
@@ -534,18 +492,16 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
534 struct rpc_task *task, *next; 492 struct rpc_task *task, *next;
535 struct list_head *head; 493 struct list_head *head;
536 494
537 rcu_read_lock_bh(); 495 spin_lock_bh(&queue->lock);
538 spin_lock(&queue->lock);
539 head = &queue->tasks[queue->maxpriority]; 496 head = &queue->tasks[queue->maxpriority];
540 for (;;) { 497 for (;;) {
541 list_for_each_entry_safe(task, next, head, u.tk_wait.list) 498 list_for_each_entry_safe(task, next, head, u.tk_wait.list)
542 __rpc_wake_up_task(task); 499 rpc_wake_up_task_queue_locked(queue, task);
543 if (head == &queue->tasks[0]) 500 if (head == &queue->tasks[0])
544 break; 501 break;
545 head--; 502 head--;
546 } 503 }
547 spin_unlock(&queue->lock); 504 spin_unlock_bh(&queue->lock);
548 rcu_read_unlock_bh();
549} 505}
550EXPORT_SYMBOL_GPL(rpc_wake_up); 506EXPORT_SYMBOL_GPL(rpc_wake_up);
551 507
@@ -561,26 +517,48 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
561 struct rpc_task *task, *next; 517 struct rpc_task *task, *next;
562 struct list_head *head; 518 struct list_head *head;
563 519
564 rcu_read_lock_bh(); 520 spin_lock_bh(&queue->lock);
565 spin_lock(&queue->lock);
566 head = &queue->tasks[queue->maxpriority]; 521 head = &queue->tasks[queue->maxpriority];
567 for (;;) { 522 for (;;) {
568 list_for_each_entry_safe(task, next, head, u.tk_wait.list) { 523 list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
569 task->tk_status = status; 524 task->tk_status = status;
570 __rpc_wake_up_task(task); 525 rpc_wake_up_task_queue_locked(queue, task);
571 } 526 }
572 if (head == &queue->tasks[0]) 527 if (head == &queue->tasks[0])
573 break; 528 break;
574 head--; 529 head--;
575 } 530 }
576 spin_unlock(&queue->lock); 531 spin_unlock_bh(&queue->lock);
577 rcu_read_unlock_bh();
578} 532}
579EXPORT_SYMBOL_GPL(rpc_wake_up_status); 533EXPORT_SYMBOL_GPL(rpc_wake_up_status);
580 534
535static void __rpc_queue_timer_fn(unsigned long ptr)
536{
537 struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
538 struct rpc_task *task, *n;
539 unsigned long expires, now, timeo;
540
541 spin_lock(&queue->lock);
542 expires = now = jiffies;
543 list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
544 timeo = task->u.tk_wait.expires;
545 if (time_after_eq(now, timeo)) {
546 dprintk("RPC: %5u timeout\n", task->tk_pid);
547 task->tk_status = -ETIMEDOUT;
548 rpc_wake_up_task_queue_locked(queue, task);
549 continue;
550 }
551 if (expires == now || time_after(expires, timeo))
552 expires = timeo;
553 }
554 if (!list_empty(&queue->timer_list.list))
555 rpc_set_queue_timer(queue, expires);
556 spin_unlock(&queue->lock);
557}
558
581static void __rpc_atrun(struct rpc_task *task) 559static void __rpc_atrun(struct rpc_task *task)
582{ 560{
583 rpc_wake_up_task(task); 561 task->tk_status = 0;
584} 562}
585 563
586/* 564/*
@@ -589,7 +567,7 @@ static void __rpc_atrun(struct rpc_task *task)
589void rpc_delay(struct rpc_task *task, unsigned long delay) 567void rpc_delay(struct rpc_task *task, unsigned long delay)
590{ 568{
591 task->tk_timeout = delay; 569 task->tk_timeout = delay;
592 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 570 rpc_sleep_on(&delay_queue, task, __rpc_atrun);
593} 571}
594EXPORT_SYMBOL_GPL(rpc_delay); 572EXPORT_SYMBOL_GPL(rpc_delay);
595 573
@@ -644,10 +622,6 @@ static void __rpc_execute(struct rpc_task *task)
644 BUG_ON(RPC_IS_QUEUED(task)); 622 BUG_ON(RPC_IS_QUEUED(task));
645 623
646 for (;;) { 624 for (;;) {
647 /*
648 * Garbage collection of pending timers...
649 */
650 rpc_delete_timer(task);
651 625
652 /* 626 /*
653 * Execute any pending callback. 627 * Execute any pending callback.
@@ -816,8 +790,6 @@ EXPORT_SYMBOL_GPL(rpc_free);
816static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data) 790static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
817{ 791{
818 memset(task, 0, sizeof(*task)); 792 memset(task, 0, sizeof(*task));
819 setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
820 (unsigned long)task);
821 atomic_set(&task->tk_count, 1); 793 atomic_set(&task->tk_count, 1);
822 task->tk_flags = task_setup_data->flags; 794 task->tk_flags = task_setup_data->flags;
823 task->tk_ops = task_setup_data->callback_ops; 795 task->tk_ops = task_setup_data->callback_ops;
@@ -832,7 +804,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
832 task->tk_owner = current->tgid; 804 task->tk_owner = current->tgid;
833 805
834 /* Initialize workqueue for async tasks */ 806 /* Initialize workqueue for async tasks */
835 task->tk_workqueue = rpciod_workqueue; 807 task->tk_workqueue = task_setup_data->workqueue;
836 808
837 task->tk_client = task_setup_data->rpc_client; 809 task->tk_client = task_setup_data->rpc_client;
838 if (task->tk_client != NULL) { 810 if (task->tk_client != NULL) {
@@ -868,13 +840,6 @@ rpc_alloc_task(void)
868 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 840 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
869} 841}
870 842
871static void rpc_free_task(struct rcu_head *rcu)
872{
873 struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
874 dprintk("RPC: %5u freeing task\n", task->tk_pid);
875 mempool_free(task, rpc_task_mempool);
876}
877
878/* 843/*
879 * Create a new task for the specified client. 844 * Create a new task for the specified client.
880 */ 845 */
@@ -898,12 +863,25 @@ out:
898 return task; 863 return task;
899} 864}
900 865
901 866static void rpc_free_task(struct rpc_task *task)
902void rpc_put_task(struct rpc_task *task)
903{ 867{
904 const struct rpc_call_ops *tk_ops = task->tk_ops; 868 const struct rpc_call_ops *tk_ops = task->tk_ops;
905 void *calldata = task->tk_calldata; 869 void *calldata = task->tk_calldata;
906 870
871 if (task->tk_flags & RPC_TASK_DYNAMIC) {
872 dprintk("RPC: %5u freeing task\n", task->tk_pid);
873 mempool_free(task, rpc_task_mempool);
874 }
875 rpc_release_calldata(tk_ops, calldata);
876}
877
878static void rpc_async_release(struct work_struct *work)
879{
880 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
881}
882
883void rpc_put_task(struct rpc_task *task)
884{
907 if (!atomic_dec_and_test(&task->tk_count)) 885 if (!atomic_dec_and_test(&task->tk_count))
908 return; 886 return;
909 /* Release resources */ 887 /* Release resources */
@@ -915,9 +893,11 @@ void rpc_put_task(struct rpc_task *task)
915 rpc_release_client(task->tk_client); 893 rpc_release_client(task->tk_client);
916 task->tk_client = NULL; 894 task->tk_client = NULL;
917 } 895 }
918 if (task->tk_flags & RPC_TASK_DYNAMIC) 896 if (task->tk_workqueue != NULL) {
919 call_rcu_bh(&task->u.tk_rcu, rpc_free_task); 897 INIT_WORK(&task->u.tk_work, rpc_async_release);
920 rpc_release_calldata(tk_ops, calldata); 898 queue_work(task->tk_workqueue, &task->u.tk_work);
899 } else
900 rpc_free_task(task);
921} 901}
922EXPORT_SYMBOL_GPL(rpc_put_task); 902EXPORT_SYMBOL_GPL(rpc_put_task);
923 903
@@ -937,9 +917,6 @@ static void rpc_release_task(struct rpc_task *task)
937 } 917 }
938 BUG_ON (RPC_IS_QUEUED(task)); 918 BUG_ON (RPC_IS_QUEUED(task));
939 919
940 /* Synchronously delete any running timer */
941 rpc_delete_timer(task);
942
943#ifdef RPC_DEBUG 920#ifdef RPC_DEBUG
944 task->tk_magic = 0; 921 task->tk_magic = 0;
945#endif 922#endif
@@ -1029,11 +1006,20 @@ rpc_destroy_mempool(void)
1029 kmem_cache_destroy(rpc_task_slabp); 1006 kmem_cache_destroy(rpc_task_slabp);
1030 if (rpc_buffer_slabp) 1007 if (rpc_buffer_slabp)
1031 kmem_cache_destroy(rpc_buffer_slabp); 1008 kmem_cache_destroy(rpc_buffer_slabp);
1009 rpc_destroy_wait_queue(&delay_queue);
1032} 1010}
1033 1011
1034int 1012int
1035rpc_init_mempool(void) 1013rpc_init_mempool(void)
1036{ 1014{
1015 /*
1016 * The following is not strictly a mempool initialisation,
1017 * but there is no harm in doing it here
1018 */
1019 rpc_init_wait_queue(&delay_queue, "delayq");
1020 if (!rpciod_start())
1021 goto err_nomem;
1022
1037 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1023 rpc_task_slabp = kmem_cache_create("rpc_tasks",
1038 sizeof(struct rpc_task), 1024 sizeof(struct rpc_task),
1039 0, SLAB_HWCACHE_ALIGN, 1025 0, SLAB_HWCACHE_ALIGN,
@@ -1054,13 +1040,6 @@ rpc_init_mempool(void)
1054 rpc_buffer_slabp); 1040 rpc_buffer_slabp);
1055 if (!rpc_buffer_mempool) 1041 if (!rpc_buffer_mempool)
1056 goto err_nomem; 1042 goto err_nomem;
1057 if (!rpciod_start())
1058 goto err_nomem;
1059 /*
1060 * The following is not strictly a mempool initialisation,
1061 * but there is no harm in doing it here
1062 */
1063 rpc_init_wait_queue(&delay_queue, "delayq");
1064 return 0; 1043 return 0;
1065err_nomem: 1044err_nomem:
1066 rpc_destroy_mempool(); 1045 rpc_destroy_mempool();
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index d5553b8179f9..85199c647022 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -188,9 +188,9 @@ out_sleep:
188 task->tk_timeout = 0; 188 task->tk_timeout = 0;
189 task->tk_status = -EAGAIN; 189 task->tk_status = -EAGAIN;
190 if (req && req->rq_ntrans) 190 if (req && req->rq_ntrans)
191 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 191 rpc_sleep_on(&xprt->resend, task, NULL);
192 else 192 else
193 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 193 rpc_sleep_on(&xprt->sending, task, NULL);
194 return 0; 194 return 0;
195} 195}
196EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 196EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -238,9 +238,9 @@ out_sleep:
238 task->tk_timeout = 0; 238 task->tk_timeout = 0;
239 task->tk_status = -EAGAIN; 239 task->tk_status = -EAGAIN;
240 if (req && req->rq_ntrans) 240 if (req && req->rq_ntrans)
241 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 241 rpc_sleep_on(&xprt->resend, task, NULL);
242 else 242 else
243 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 243 rpc_sleep_on(&xprt->sending, task, NULL);
244 return 0; 244 return 0;
245} 245}
246EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 246EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -453,7 +453,7 @@ void xprt_wait_for_buffer_space(struct rpc_task *task)
453 struct rpc_xprt *xprt = req->rq_xprt; 453 struct rpc_xprt *xprt = req->rq_xprt;
454 454
455 task->tk_timeout = req->rq_timeout; 455 task->tk_timeout = req->rq_timeout;
456 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 456 rpc_sleep_on(&xprt->pending, task, NULL);
457} 457}
458EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 458EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
459 459
@@ -472,7 +472,7 @@ void xprt_write_space(struct rpc_xprt *xprt)
472 if (xprt->snd_task) { 472 if (xprt->snd_task) {
473 dprintk("RPC: write space: waking waiting task on " 473 dprintk("RPC: write space: waking waiting task on "
474 "xprt %p\n", xprt); 474 "xprt %p\n", xprt);
475 rpc_wake_up_task(xprt->snd_task); 475 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
476 } 476 }
477 spin_unlock_bh(&xprt->transport_lock); 477 spin_unlock_bh(&xprt->transport_lock);
478} 478}
@@ -602,8 +602,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
602 /* Try to schedule an autoclose RPC call */ 602 /* Try to schedule an autoclose RPC call */
603 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 603 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
604 queue_work(rpciod_workqueue, &xprt->task_cleanup); 604 queue_work(rpciod_workqueue, &xprt->task_cleanup);
605 else if (xprt->snd_task != NULL) 605 xprt_wake_pending_tasks(xprt, -ENOTCONN);
606 rpc_wake_up_task(xprt->snd_task);
607 spin_unlock_bh(&xprt->transport_lock); 606 spin_unlock_bh(&xprt->transport_lock);
608} 607}
609EXPORT_SYMBOL_GPL(xprt_force_disconnect); 608EXPORT_SYMBOL_GPL(xprt_force_disconnect);
@@ -653,7 +652,7 @@ void xprt_connect(struct rpc_task *task)
653 task->tk_rqstp->rq_bytes_sent = 0; 652 task->tk_rqstp->rq_bytes_sent = 0;
654 653
655 task->tk_timeout = xprt->connect_timeout; 654 task->tk_timeout = xprt->connect_timeout;
656 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 655 rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
657 xprt->stat.connect_start = jiffies; 656 xprt->stat.connect_start = jiffies;
658 xprt->ops->connect(task); 657 xprt->ops->connect(task);
659 } 658 }
@@ -749,18 +748,19 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt);
749void xprt_complete_rqst(struct rpc_task *task, int copied) 748void xprt_complete_rqst(struct rpc_task *task, int copied)
750{ 749{
751 struct rpc_rqst *req = task->tk_rqstp; 750 struct rpc_rqst *req = task->tk_rqstp;
751 struct rpc_xprt *xprt = req->rq_xprt;
752 752
753 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 753 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
754 task->tk_pid, ntohl(req->rq_xid), copied); 754 task->tk_pid, ntohl(req->rq_xid), copied);
755 755
756 task->tk_xprt->stat.recvs++; 756 xprt->stat.recvs++;
757 task->tk_rtt = (long)jiffies - req->rq_xtime; 757 task->tk_rtt = (long)jiffies - req->rq_xtime;
758 758
759 list_del_init(&req->rq_list); 759 list_del_init(&req->rq_list);
760 /* Ensure all writes are done before we update req->rq_received */ 760 /* Ensure all writes are done before we update req->rq_received */
761 smp_wmb(); 761 smp_wmb();
762 req->rq_received = req->rq_private_buf.len = copied; 762 req->rq_received = req->rq_private_buf.len = copied;
763 rpc_wake_up_task(task); 763 rpc_wake_up_queued_task(&xprt->pending, task);
764} 764}
765EXPORT_SYMBOL_GPL(xprt_complete_rqst); 765EXPORT_SYMBOL_GPL(xprt_complete_rqst);
766 766
@@ -769,17 +769,17 @@ static void xprt_timer(struct rpc_task *task)
769 struct rpc_rqst *req = task->tk_rqstp; 769 struct rpc_rqst *req = task->tk_rqstp;
770 struct rpc_xprt *xprt = req->rq_xprt; 770 struct rpc_xprt *xprt = req->rq_xprt;
771 771
772 if (task->tk_status != -ETIMEDOUT)
773 return;
772 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 774 dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
773 775
774 spin_lock(&xprt->transport_lock); 776 spin_lock_bh(&xprt->transport_lock);
775 if (!req->rq_received) { 777 if (!req->rq_received) {
776 if (xprt->ops->timer) 778 if (xprt->ops->timer)
777 xprt->ops->timer(task); 779 xprt->ops->timer(task);
778 task->tk_status = -ETIMEDOUT; 780 } else
779 } 781 task->tk_status = 0;
780 task->tk_timeout = 0; 782 spin_unlock_bh(&xprt->transport_lock);
781 rpc_wake_up_task(task);
782 spin_unlock(&xprt->transport_lock);
783} 783}
784 784
785/** 785/**
@@ -864,7 +864,7 @@ void xprt_transmit(struct rpc_task *task)
864 if (!xprt_connected(xprt)) 864 if (!xprt_connected(xprt))
865 task->tk_status = -ENOTCONN; 865 task->tk_status = -ENOTCONN;
866 else if (!req->rq_received) 866 else if (!req->rq_received)
867 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 867 rpc_sleep_on(&xprt->pending, task, xprt_timer);
868 spin_unlock_bh(&xprt->transport_lock); 868 spin_unlock_bh(&xprt->transport_lock);
869 return; 869 return;
870 } 870 }
@@ -875,7 +875,7 @@ void xprt_transmit(struct rpc_task *task)
875 */ 875 */
876 task->tk_status = status; 876 task->tk_status = status;
877 if (status == -ECONNREFUSED) 877 if (status == -ECONNREFUSED)
878 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 878 rpc_sleep_on(&xprt->sending, task, NULL);
879} 879}
880 880
881static inline void do_xprt_reserve(struct rpc_task *task) 881static inline void do_xprt_reserve(struct rpc_task *task)
@@ -895,7 +895,7 @@ static inline void do_xprt_reserve(struct rpc_task *task)
895 dprintk("RPC: waiting for request slot\n"); 895 dprintk("RPC: waiting for request slot\n");
896 task->tk_status = -EAGAIN; 896 task->tk_status = -EAGAIN;
897 task->tk_timeout = 0; 897 task->tk_timeout = 0;
898 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 898 rpc_sleep_on(&xprt->backlog, task, NULL);
899} 899}
900 900
901/** 901/**
@@ -1052,6 +1052,11 @@ static void xprt_destroy(struct kref *kref)
1052 xprt->shutdown = 1; 1052 xprt->shutdown = 1;
1053 del_timer_sync(&xprt->timer); 1053 del_timer_sync(&xprt->timer);
1054 1054
1055 rpc_destroy_wait_queue(&xprt->binding);
1056 rpc_destroy_wait_queue(&xprt->pending);
1057 rpc_destroy_wait_queue(&xprt->sending);
1058 rpc_destroy_wait_queue(&xprt->resend);
1059 rpc_destroy_wait_queue(&xprt->backlog);
1055 /* 1060 /*
1056 * Tear down transport state and free the rpc_xprt 1061 * Tear down transport state and free the rpc_xprt
1057 */ 1062 */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 30e7ac243a90..8bd3b0f73ac0 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1073,6 +1073,7 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes)
1073{ 1073{
1074 struct rpc_xprt *xprt; 1074 struct rpc_xprt *xprt;
1075 read_descriptor_t rd_desc; 1075 read_descriptor_t rd_desc;
1076 int read;
1076 1077
1077 dprintk("RPC: xs_tcp_data_ready...\n"); 1078 dprintk("RPC: xs_tcp_data_ready...\n");
1078 1079
@@ -1084,8 +1085,10 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes)
1084 1085
1085 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 1086 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1086 rd_desc.arg.data = xprt; 1087 rd_desc.arg.data = xprt;
1087 rd_desc.count = 65536; 1088 do {
1088 tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 1089 rd_desc.count = 65536;
1090 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1091 } while (read > 0);
1089out: 1092out:
1090 read_unlock(&sk->sk_callback_lock); 1093 read_unlock(&sk->sk_callback_lock);
1091} 1094}