aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJeff Layton <jlayton@primarydata.com>2014-11-21 14:19:30 -0500
committerJ. Bruce Fields <bfields@redhat.com>2014-12-09 11:22:22 -0500
commitb1691bc03d4eddb959234409167bef9be9e62d74 (patch)
tree2ccfca41ffcffc69a3bacc0bb1e9e5754595a0e6 /net
parent403c7b44441d60aba7f8a134c31279ffa60ea769 (diff)
sunrpc: convert to lockless lookup of queued server threads
Testing has shown that the pool->sp_lock can be a bottleneck on a busy server. Every time data is received on a socket, the server must take that lock in order to dequeue a thread from the sp_threads list. Address this problem by eliminating the sp_threads list (which contains threads that are currently idle) and replacing it with a RQ_BUSY flag in svc_rqst. This allows us to walk the sp_all_threads list under the rcu_read_lock and find a suitable thread for the xprt by doing a test_and_set_bit. Note that we do still have a potential atomicity problem however with this approach. We don't want svc_xprt_do_enqueue to set the rqst->rq_xprt pointer unless a test_and_set_bit of RQ_BUSY returned zero (which indicates that the thread was idle). But, by the time we check that, the bit could be flipped by a waking thread. To address this, we acquire a new per-rqst spinlock (rq_lock) and take that before doing the test_and_set_bit. If that returns false, then we can set rq_xprt and drop the spinlock. Then, when the thread wakes up, it must set the bit under the same spinlock and can trust that if it was already set then the rq_xprt is also properly set. With this scheme, the case where we have an idle thread no longer needs to take the highly contended pool->sp_lock at all, and that removes the bottleneck. That still leaves one issue: What of the case where we walk the whole sp_all_threads list and don't find an idle thread? Because the search is lockess, it's possible for the queueing to race with a thread that is going to sleep. To address that, we queue the xprt and then search again. If we find an idle thread at that point, we can't attach the xprt to it directly since that might race with a different thread waking up and finding it. All we can do is wake the idle thread back up and let it attempt to find the now-queued xprt. Signed-off-by: Jeff Layton <jlayton@primarydata.com> Tested-by: Chris Worley <chris.worley@primarydata.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/svc.c7
-rw-r--r--net/sunrpc/svc_xprt.c221
2 files changed, 128 insertions, 100 deletions
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index b90d1bca4349..91eaef1844c8 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -476,7 +476,6 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
476 i, serv->sv_name); 476 i, serv->sv_name);
477 477
478 pool->sp_id = i; 478 pool->sp_id = i;
479 INIT_LIST_HEAD(&pool->sp_threads);
480 INIT_LIST_HEAD(&pool->sp_sockets); 479 INIT_LIST_HEAD(&pool->sp_sockets);
481 INIT_LIST_HEAD(&pool->sp_all_threads); 480 INIT_LIST_HEAD(&pool->sp_all_threads);
482 spin_lock_init(&pool->sp_lock); 481 spin_lock_init(&pool->sp_lock);
@@ -614,12 +613,14 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
614 goto out_enomem; 613 goto out_enomem;
615 614
616 serv->sv_nrthreads++; 615 serv->sv_nrthreads++;
616 __set_bit(RQ_BUSY, &rqstp->rq_flags);
617 spin_lock_init(&rqstp->rq_lock);
618 rqstp->rq_server = serv;
619 rqstp->rq_pool = pool;
617 spin_lock_bh(&pool->sp_lock); 620 spin_lock_bh(&pool->sp_lock);
618 pool->sp_nrthreads++; 621 pool->sp_nrthreads++;
619 list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); 622 list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
620 spin_unlock_bh(&pool->sp_lock); 623 spin_unlock_bh(&pool->sp_lock);
621 rqstp->rq_server = serv;
622 rqstp->rq_pool = pool;
623 624
624 rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); 625 rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
625 if (!rqstp->rq_argp) 626 if (!rqstp->rq_argp)
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 579ff2249562..ed90d955f733 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -310,25 +310,6 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
310} 310}
311EXPORT_SYMBOL_GPL(svc_print_addr); 311EXPORT_SYMBOL_GPL(svc_print_addr);
312 312
313/*
314 * Queue up an idle server thread. Must have pool->sp_lock held.
315 * Note: this is really a stack rather than a queue, so that we only
316 * use as many different threads as we need, and the rest don't pollute
317 * the cache.
318 */
319static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
320{
321 list_add(&rqstp->rq_list, &pool->sp_threads);
322}
323
324/*
325 * Dequeue an nfsd thread. Must have pool->sp_lock held.
326 */
327static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
328{
329 list_del(&rqstp->rq_list);
330}
331
332static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) 313static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
333{ 314{
334 if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) 315 if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
@@ -343,6 +324,7 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
343 struct svc_pool *pool; 324 struct svc_pool *pool;
344 struct svc_rqst *rqstp; 325 struct svc_rqst *rqstp;
345 int cpu; 326 int cpu;
327 bool queued = false;
346 328
347 if (!svc_xprt_has_something_to_do(xprt)) 329 if (!svc_xprt_has_something_to_do(xprt))
348 return; 330 return;
@@ -360,37 +342,60 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
360 342
361 cpu = get_cpu(); 343 cpu = get_cpu();
362 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 344 pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
363 spin_lock_bh(&pool->sp_lock);
364 345
365 atomic_long_inc(&pool->sp_stats.packets); 346 atomic_long_inc(&pool->sp_stats.packets);
366 347
367 if (!list_empty(&pool->sp_threads)) { 348redo_search:
368 rqstp = list_entry(pool->sp_threads.next, 349 /* find a thread for this xprt */
369 struct svc_rqst, 350 rcu_read_lock();
370 rq_list); 351 list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
371 dprintk("svc: transport %p served by daemon %p\n", 352 /* Do a lockless check first */
372 xprt, rqstp); 353 if (test_bit(RQ_BUSY, &rqstp->rq_flags))
373 svc_thread_dequeue(pool, rqstp); 354 continue;
374 if (rqstp->rq_xprt) 355
375 printk(KERN_ERR 356 /*
376 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 357 * Once the xprt has been queued, it can only be dequeued by
377 rqstp, rqstp->rq_xprt); 358 * the task that intends to service it. All we can do at that
378 /* Note the order of the following 3 lines: 359 * point is to try to wake this thread back up so that it can
379 * We want to assign xprt to rqstp->rq_xprt only _after_ 360 * do so.
380 * we've woken up the process, so that we don't race with
381 * the lockless check in svc_get_next_xprt().
382 */ 361 */
383 svc_xprt_get(xprt); 362 if (!queued) {
384 wake_up_process(rqstp->rq_task); 363 spin_lock_bh(&rqstp->rq_lock);
385 rqstp->rq_xprt = xprt; 364 if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
365 /* already busy, move on... */
366 spin_unlock_bh(&rqstp->rq_lock);
367 continue;
368 }
369
370 /* this one will do */
371 rqstp->rq_xprt = xprt;
372 svc_xprt_get(xprt);
373 spin_unlock_bh(&rqstp->rq_lock);
374 }
375 rcu_read_unlock();
376
386 atomic_long_inc(&pool->sp_stats.threads_woken); 377 atomic_long_inc(&pool->sp_stats.threads_woken);
387 } else { 378 wake_up_process(rqstp->rq_task);
379 put_cpu();
380 return;
381 }
382 rcu_read_unlock();
383
384 /*
385 * We didn't find an idle thread to use, so we need to queue the xprt.
386 * Do so and then search again. If we find one, we can't hook this one
387 * up to it directly but we can wake the thread up in the hopes that it
388 * will pick it up once it searches for a xprt to service.
389 */
390 if (!queued) {
391 queued = true;
388 dprintk("svc: transport %p put into queue\n", xprt); 392 dprintk("svc: transport %p put into queue\n", xprt);
393 spin_lock_bh(&pool->sp_lock);
389 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 394 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
390 pool->sp_stats.sockets_queued++; 395 pool->sp_stats.sockets_queued++;
396 spin_unlock_bh(&pool->sp_lock);
397 goto redo_search;
391 } 398 }
392
393 spin_unlock_bh(&pool->sp_lock);
394 put_cpu(); 399 put_cpu();
395} 400}
396 401
@@ -408,21 +413,26 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
408EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 413EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
409 414
410/* 415/*
411 * Dequeue the first transport. Must be called with the pool->sp_lock held. 416 * Dequeue the first transport, if there is one.
412 */ 417 */
413static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 418static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
414{ 419{
415 struct svc_xprt *xprt; 420 struct svc_xprt *xprt = NULL;
416 421
417 if (list_empty(&pool->sp_sockets)) 422 if (list_empty(&pool->sp_sockets))
418 return NULL; 423 return NULL;
419 424
420 xprt = list_entry(pool->sp_sockets.next, 425 spin_lock_bh(&pool->sp_lock);
421 struct svc_xprt, xpt_ready); 426 if (likely(!list_empty(&pool->sp_sockets))) {
422 list_del_init(&xprt->xpt_ready); 427 xprt = list_first_entry(&pool->sp_sockets,
428 struct svc_xprt, xpt_ready);
429 list_del_init(&xprt->xpt_ready);
430 svc_xprt_get(xprt);
423 431
424 dprintk("svc: transport %p dequeued, inuse=%d\n", 432 dprintk("svc: transport %p dequeued, inuse=%d\n",
425 xprt, atomic_read(&xprt->xpt_ref.refcount)); 433 xprt, atomic_read(&xprt->xpt_ref.refcount));
434 }
435 spin_unlock_bh(&pool->sp_lock);
426 436
427 return xprt; 437 return xprt;
428} 438}
@@ -497,16 +507,21 @@ void svc_wake_up(struct svc_serv *serv)
497 507
498 pool = &serv->sv_pools[0]; 508 pool = &serv->sv_pools[0];
499 509
500 spin_lock_bh(&pool->sp_lock); 510 rcu_read_lock();
501 if (!list_empty(&pool->sp_threads)) { 511 list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
502 rqstp = list_entry(pool->sp_threads.next, 512 /* skip any that aren't queued */
503 struct svc_rqst, 513 if (test_bit(RQ_BUSY, &rqstp->rq_flags))
504 rq_list); 514 continue;
515 rcu_read_unlock();
505 dprintk("svc: daemon %p woken up.\n", rqstp); 516 dprintk("svc: daemon %p woken up.\n", rqstp);
506 wake_up_process(rqstp->rq_task); 517 wake_up_process(rqstp->rq_task);
507 } else 518 return;
508 set_bit(SP_TASK_PENDING, &pool->sp_flags); 519 }
509 spin_unlock_bh(&pool->sp_lock); 520 rcu_read_unlock();
521
522 /* No free entries available */
523 set_bit(SP_TASK_PENDING, &pool->sp_flags);
524 smp_wmb();
510} 525}
511EXPORT_SYMBOL_GPL(svc_wake_up); 526EXPORT_SYMBOL_GPL(svc_wake_up);
512 527
@@ -617,22 +632,47 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
617 return 0; 632 return 0;
618} 633}
619 634
635static bool
636rqst_should_sleep(struct svc_rqst *rqstp)
637{
638 struct svc_pool *pool = rqstp->rq_pool;
639
640 /* did someone call svc_wake_up? */
641 if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
642 return false;
643
644 /* was a socket queued? */
645 if (!list_empty(&pool->sp_sockets))
646 return false;
647
648 /* are we shutting down? */
649 if (signalled() || kthread_should_stop())
650 return false;
651
652 /* are we freezing? */
653 if (freezing(current))
654 return false;
655
656 return true;
657}
658
620static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) 659static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
621{ 660{
622 struct svc_xprt *xprt; 661 struct svc_xprt *xprt;
623 struct svc_pool *pool = rqstp->rq_pool; 662 struct svc_pool *pool = rqstp->rq_pool;
624 long time_left = 0; 663 long time_left = 0;
625 664
665 /* rq_xprt should be clear on entry */
666 WARN_ON_ONCE(rqstp->rq_xprt);
667
626 /* Normally we will wait up to 5 seconds for any required 668 /* Normally we will wait up to 5 seconds for any required
627 * cache information to be provided. 669 * cache information to be provided.
628 */ 670 */
629 rqstp->rq_chandle.thread_wait = 5*HZ; 671 rqstp->rq_chandle.thread_wait = 5*HZ;
630 672
631 spin_lock_bh(&pool->sp_lock);
632 xprt = svc_xprt_dequeue(pool); 673 xprt = svc_xprt_dequeue(pool);
633 if (xprt) { 674 if (xprt) {
634 rqstp->rq_xprt = xprt; 675 rqstp->rq_xprt = xprt;
635 svc_xprt_get(xprt);
636 676
637 /* As there is a shortage of threads and this request 677 /* As there is a shortage of threads and this request
638 * had to be queued, don't allow the thread to wait so 678 * had to be queued, don't allow the thread to wait so
@@ -640,51 +680,38 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
640 */ 680 */
641 rqstp->rq_chandle.thread_wait = 1*HZ; 681 rqstp->rq_chandle.thread_wait = 1*HZ;
642 clear_bit(SP_TASK_PENDING, &pool->sp_flags); 682 clear_bit(SP_TASK_PENDING, &pool->sp_flags);
643 } else { 683 return xprt;
644 if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags)) { 684 }
645 xprt = ERR_PTR(-EAGAIN);
646 goto out;
647 }
648 /*
649 * We have to be able to interrupt this wait
650 * to bring down the daemons ...
651 */
652 set_current_state(TASK_INTERRUPTIBLE);
653 685
654 /* No data pending. Go to sleep */ 686 /*
655 svc_thread_enqueue(pool, rqstp); 687 * We have to be able to interrupt this wait
656 spin_unlock_bh(&pool->sp_lock); 688 * to bring down the daemons ...
689 */
690 set_current_state(TASK_INTERRUPTIBLE);
691 clear_bit(RQ_BUSY, &rqstp->rq_flags);
692 smp_mb();
693
694 if (likely(rqst_should_sleep(rqstp)))
695 time_left = schedule_timeout(timeout);
696 else
697 __set_current_state(TASK_RUNNING);
657 698
658 if (!(signalled() || kthread_should_stop())) { 699 try_to_freeze();
659 time_left = schedule_timeout(timeout);
660 __set_current_state(TASK_RUNNING);
661 700
662 try_to_freeze(); 701 spin_lock_bh(&rqstp->rq_lock);
702 set_bit(RQ_BUSY, &rqstp->rq_flags);
703 spin_unlock_bh(&rqstp->rq_lock);
663 704
664 xprt = rqstp->rq_xprt; 705 xprt = rqstp->rq_xprt;
665 if (xprt != NULL) 706 if (xprt != NULL)
666 return xprt; 707 return xprt;
667 } else
668 __set_current_state(TASK_RUNNING);
669 708
670 spin_lock_bh(&pool->sp_lock); 709 if (!time_left)
671 if (!time_left) 710 atomic_long_inc(&pool->sp_stats.threads_timedout);
672 atomic_long_inc(&pool->sp_stats.threads_timedout);
673 711
674 xprt = rqstp->rq_xprt; 712 if (signalled() || kthread_should_stop())
675 if (!xprt) { 713 return ERR_PTR(-EINTR);
676 svc_thread_dequeue(pool, rqstp); 714 return ERR_PTR(-EAGAIN);
677 spin_unlock_bh(&pool->sp_lock);
678 dprintk("svc: server %p, no data yet\n", rqstp);
679 if (signalled() || kthread_should_stop())
680 return ERR_PTR(-EINTR);
681 else
682 return ERR_PTR(-EAGAIN);
683 }
684 }
685out:
686 spin_unlock_bh(&pool->sp_lock);
687 return xprt;
688} 715}
689 716
690static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt) 717static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)