aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/sunrpc/svc.h1
-rw-r--r--net/sunrpc/svc_xprt.c100
2 files changed, 31 insertions, 70 deletions
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 38f561b2dda3..23c4d6496aac 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -46,6 +46,7 @@ struct svc_pool {
46 struct svc_pool_stats sp_stats; /* statistics on pool operation */ 46 struct svc_pool_stats sp_stats; /* statistics on pool operation */
47#define SP_TASK_PENDING (0) /* still work to do even if no 47#define SP_TASK_PENDING (0) /* still work to do even if no
48 * xprt is queued. */ 48 * xprt is queued. */
49#define SP_CONGESTED (1)
49 unsigned long sp_flags; 50 unsigned long sp_flags;
50} ____cacheline_aligned_in_smp; 51} ____cacheline_aligned_in_smp;
51 52
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 18e87791350f..80112c45aad1 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -380,7 +380,6 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
380 struct svc_pool *pool; 380 struct svc_pool *pool;
381 struct svc_rqst *rqstp = NULL; 381 struct svc_rqst *rqstp = NULL;
382 int cpu; 382 int cpu;
383 bool queued = false;
384 383
385 if (!svc_xprt_has_something_to_do(xprt)) 384 if (!svc_xprt_has_something_to_do(xprt))
386 goto out; 385 goto out;
@@ -401,58 +400,25 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
401 400
402 atomic_long_inc(&pool->sp_stats.packets); 401 atomic_long_inc(&pool->sp_stats.packets);
403 402
404redo_search: 403 dprintk("svc: transport %p put into queue\n", xprt);
404 spin_lock_bh(&pool->sp_lock);
405 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
406 pool->sp_stats.sockets_queued++;
407 spin_unlock_bh(&pool->sp_lock);
408
405 /* find a thread for this xprt */ 409 /* find a thread for this xprt */
406 rcu_read_lock(); 410 rcu_read_lock();
407 list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { 411 list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
408 /* Do a lockless check first */ 412 if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
409 if (test_bit(RQ_BUSY, &rqstp->rq_flags))
410 continue; 413 continue;
411
412 /*
413 * Once the xprt has been queued, it can only be dequeued by
414 * the task that intends to service it. All we can do at that
415 * point is to try to wake this thread back up so that it can
416 * do so.
417 */
418 if (!queued) {
419 spin_lock_bh(&rqstp->rq_lock);
420 if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
421 /* already busy, move on... */
422 spin_unlock_bh(&rqstp->rq_lock);
423 continue;
424 }
425
426 /* this one will do */
427 rqstp->rq_xprt = xprt;
428 svc_xprt_get(xprt);
429 spin_unlock_bh(&rqstp->rq_lock);
430 }
431 rcu_read_unlock();
432
433 atomic_long_inc(&pool->sp_stats.threads_woken); 414 atomic_long_inc(&pool->sp_stats.threads_woken);
434 wake_up_process(rqstp->rq_task); 415 wake_up_process(rqstp->rq_task);
435 put_cpu(); 416 goto out_unlock;
436 goto out;
437 }
438 rcu_read_unlock();
439
440 /*
441 * We didn't find an idle thread to use, so we need to queue the xprt.
442 * Do so and then search again. If we find one, we can't hook this one
443 * up to it directly but we can wake the thread up in the hopes that it
444 * will pick it up once it searches for a xprt to service.
445 */
446 if (!queued) {
447 queued = true;
448 dprintk("svc: transport %p put into queue\n", xprt);
449 spin_lock_bh(&pool->sp_lock);
450 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
451 pool->sp_stats.sockets_queued++;
452 spin_unlock_bh(&pool->sp_lock);
453 goto redo_search;
454 } 417 }
418 set_bit(SP_CONGESTED, &pool->sp_flags);
455 rqstp = NULL; 419 rqstp = NULL;
420out_unlock:
421 rcu_read_unlock();
456 put_cpu(); 422 put_cpu();
457out: 423out:
458 trace_svc_xprt_do_enqueue(xprt, rqstp); 424 trace_svc_xprt_do_enqueue(xprt, rqstp);
@@ -721,38 +687,25 @@ rqst_should_sleep(struct svc_rqst *rqstp)
721 687
722static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) 688static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
723{ 689{
724 struct svc_xprt *xprt;
725 struct svc_pool *pool = rqstp->rq_pool; 690 struct svc_pool *pool = rqstp->rq_pool;
726 long time_left = 0; 691 long time_left = 0;
727 692
728 /* rq_xprt should be clear on entry */ 693 /* rq_xprt should be clear on entry */
729 WARN_ON_ONCE(rqstp->rq_xprt); 694 WARN_ON_ONCE(rqstp->rq_xprt);
730 695
731 /* Normally we will wait up to 5 seconds for any required 696 rqstp->rq_xprt = svc_xprt_dequeue(pool);
732 * cache information to be provided. 697 if (rqstp->rq_xprt)
733 */ 698 goto out_found;
734 rqstp->rq_chandle.thread_wait = 5*HZ;
735
736 xprt = svc_xprt_dequeue(pool);
737 if (xprt) {
738 rqstp->rq_xprt = xprt;
739
740 /* As there is a shortage of threads and this request
741 * had to be queued, don't allow the thread to wait so
742 * long for cache updates.
743 */
744 rqstp->rq_chandle.thread_wait = 1*HZ;
745 clear_bit(SP_TASK_PENDING, &pool->sp_flags);
746 return xprt;
747 }
748 699
749 /* 700 /*
750 * We have to be able to interrupt this wait 701 * We have to be able to interrupt this wait
751 * to bring down the daemons ... 702 * to bring down the daemons ...
752 */ 703 */
753 set_current_state(TASK_INTERRUPTIBLE); 704 set_current_state(TASK_INTERRUPTIBLE);
705 smp_mb__before_atomic();
706 clear_bit(SP_CONGESTED, &pool->sp_flags);
754 clear_bit(RQ_BUSY, &rqstp->rq_flags); 707 clear_bit(RQ_BUSY, &rqstp->rq_flags);
755 smp_mb(); 708 smp_mb__after_atomic();
756 709
757 if (likely(rqst_should_sleep(rqstp))) 710 if (likely(rqst_should_sleep(rqstp)))
758 time_left = schedule_timeout(timeout); 711 time_left = schedule_timeout(timeout);
@@ -761,13 +714,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
761 714
762 try_to_freeze(); 715 try_to_freeze();
763 716
764 spin_lock_bh(&rqstp->rq_lock);
765 set_bit(RQ_BUSY, &rqstp->rq_flags); 717 set_bit(RQ_BUSY, &rqstp->rq_flags);
766 spin_unlock_bh(&rqstp->rq_lock); 718 smp_mb__after_atomic();
767 719 rqstp->rq_xprt = svc_xprt_dequeue(pool);
768 xprt = rqstp->rq_xprt; 720 if (rqstp->rq_xprt)
769 if (xprt != NULL) 721 goto out_found;
770 return xprt;
771 722
772 if (!time_left) 723 if (!time_left)
773 atomic_long_inc(&pool->sp_stats.threads_timedout); 724 atomic_long_inc(&pool->sp_stats.threads_timedout);
@@ -775,6 +726,15 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
775 if (signalled() || kthread_should_stop()) 726 if (signalled() || kthread_should_stop())
776 return ERR_PTR(-EINTR); 727 return ERR_PTR(-EINTR);
777 return ERR_PTR(-EAGAIN); 728 return ERR_PTR(-EAGAIN);
729out_found:
730 /* Normally we will wait up to 5 seconds for any required
731 * cache information to be provided.
732 */
733 if (!test_bit(SP_CONGESTED, &pool->sp_flags))
734 rqstp->rq_chandle.thread_wait = 5*HZ;
735 else
736 rqstp->rq_chandle.thread_wait = 1*HZ;
737 return rqstp->rq_xprt;
778} 738}
779 739
780static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt) 740static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)