summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2016-05-27 12:59:33 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2016-06-13 12:35:51 -0400
commitf1dc237c60a5fdecc83062a28a702193f881cb19 (patch)
treef4740ff8ad0c003333e78b0563ced90cd5f21077 /net
parent40a5f1b19bacb2de7a051be952dee85e38c9e5f5 (diff)
SUNRPC: Reduce latency when send queue is congested
Use the low latency transport workqueue to process the task that is next in line on the xprt->sending queue. Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/sched.c43
-rw-r--r--net/sunrpc/xprt.c6
2 files changed, 37 insertions, 12 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index a9f786247ffb..9ae588511aaf 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -330,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
330 * lockless RPC_IS_QUEUED() test) before we've had a chance to test 330 * lockless RPC_IS_QUEUED() test) before we've had a chance to test
331 * the RPC_TASK_RUNNING flag. 331 * the RPC_TASK_RUNNING flag.
332 */ 332 */
333static void rpc_make_runnable(struct rpc_task *task) 333static void rpc_make_runnable(struct workqueue_struct *wq,
334 struct rpc_task *task)
334{ 335{
335 bool need_wakeup = !rpc_test_and_set_running(task); 336 bool need_wakeup = !rpc_test_and_set_running(task);
336 337
@@ -339,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
339 return; 340 return;
340 if (RPC_IS_ASYNC(task)) { 341 if (RPC_IS_ASYNC(task)) {
341 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 342 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
342 queue_work(rpciod_workqueue, &task->u.tk_work); 343 queue_work(wq, &task->u.tk_work);
343 } else 344 } else
344 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 345 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
345} 346}
@@ -408,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
408EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); 409EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
409 410
410/** 411/**
411 * __rpc_do_wake_up_task - wake up a single rpc_task 412 * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
413 * @wq: workqueue on which to run task
412 * @queue: wait queue 414 * @queue: wait queue
413 * @task: task to be woken up 415 * @task: task to be woken up
414 * 416 *
415 * Caller must hold queue->lock, and have cleared the task queued flag. 417 * Caller must hold queue->lock, and have cleared the task queued flag.
416 */ 418 */
417static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) 419static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
420 struct rpc_wait_queue *queue,
421 struct rpc_task *task)
418{ 422{
419 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", 423 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
420 task->tk_pid, jiffies); 424 task->tk_pid, jiffies);
@@ -429,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
429 433
430 __rpc_remove_wait_queue(queue, task); 434 __rpc_remove_wait_queue(queue, task);
431 435
432 rpc_make_runnable(task); 436 rpc_make_runnable(wq, task);
433 437
434 dprintk("RPC: __rpc_wake_up_task done\n"); 438 dprintk("RPC: __rpc_wake_up_task done\n");
435} 439}
@@ -437,16 +441,25 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
437/* 441/*
438 * Wake up a queued task while the queue lock is being held 442 * Wake up a queued task while the queue lock is being held
439 */ 443 */
440static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) 444static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
445 struct rpc_wait_queue *queue, struct rpc_task *task)
441{ 446{
442 if (RPC_IS_QUEUED(task)) { 447 if (RPC_IS_QUEUED(task)) {
443 smp_rmb(); 448 smp_rmb();
444 if (task->tk_waitqueue == queue) 449 if (task->tk_waitqueue == queue)
445 __rpc_do_wake_up_task(queue, task); 450 __rpc_do_wake_up_task_on_wq(wq, queue, task);
446 } 451 }
447} 452}
448 453
449/* 454/*
455 * Wake up a queued task while the queue lock is being held
456 */
457static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
458{
459 rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
460}
461
462/*
450 * Wake up a task on a specific queue 463 * Wake up a task on a specific queue
451 */ 464 */
452void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task) 465void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
@@ -519,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
519/* 532/*
520 * Wake up the first task on the wait queue. 533 * Wake up the first task on the wait queue.
521 */ 534 */
522struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, 535struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
536 struct rpc_wait_queue *queue,
523 bool (*func)(struct rpc_task *, void *), void *data) 537 bool (*func)(struct rpc_task *, void *), void *data)
524{ 538{
525 struct rpc_task *task = NULL; 539 struct rpc_task *task = NULL;
@@ -530,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
530 task = __rpc_find_next_queued(queue); 544 task = __rpc_find_next_queued(queue);
531 if (task != NULL) { 545 if (task != NULL) {
532 if (func(task, data)) 546 if (func(task, data))
533 rpc_wake_up_task_queue_locked(queue, task); 547 rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
534 else 548 else
535 task = NULL; 549 task = NULL;
536 } 550 }
@@ -538,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
538 552
539 return task; 553 return task;
540} 554}
555
556/*
557 * Wake up the first task on the wait queue.
558 */
559struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
560 bool (*func)(struct rpc_task *, void *), void *data)
561{
562 return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
563}
541EXPORT_SYMBOL_GPL(rpc_wake_up_first); 564EXPORT_SYMBOL_GPL(rpc_wake_up_first);
542 565
543static bool rpc_wake_up_next_func(struct rpc_task *task, void *data) 566static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
@@ -815,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
815 bool is_async = RPC_IS_ASYNC(task); 838 bool is_async = RPC_IS_ASYNC(task);
816 839
817 rpc_set_active(task); 840 rpc_set_active(task);
818 rpc_make_runnable(task); 841 rpc_make_runnable(rpciod_workqueue, task);
819 if (!is_async) 842 if (!is_async)
820 __rpc_execute(task); 843 __rpc_execute(task);
821} 844}
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 71df082b84a9..8313960cac52 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
295 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 295 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
296 return; 296 return;
297 297
298 if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) 298 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
299 __xprt_lock_write_func, xprt))
299 return; 300 return;
300 xprt_clear_locked(xprt); 301 xprt_clear_locked(xprt);
301} 302}
@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
324 return; 325 return;
325 if (RPCXPRT_CONGESTED(xprt)) 326 if (RPCXPRT_CONGESTED(xprt))
326 goto out_unlock; 327 goto out_unlock;
327 if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) 328 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
329 __xprt_lock_write_cong_func, xprt))
328 return; 330 return;
329out_unlock: 331out_unlock:
330 xprt_clear_locked(xprt); 332 xprt_clear_locked(xprt);