aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDave Chinner <david@fromorbit.com>2012-04-23 03:54:32 -0400
committerBen Myers <bpm@sgi.com>2012-05-14 17:20:34 -0400
commit4c2d542f2e786537db33b613d5199dc6d69a96da (patch)
treeeeca27ca63e519981e8d4f2ab1bcf8230f5e598e
parent04913fdd91f342e537005ef1233f98068b925a7f (diff)
xfs: Do background CIL flushes via a workqueue
Doing background CIL flushes adds significant latency to whatever async transaction that triggers it. To avoid blocking async transactions on things like waiting for log buffer IO to complete, move the CIL push off into a workqueue. By moving the push work into a workqueue, we remove all the latency that the commit adds from the foreground transaction commit path. This also means that single threaded workloads won't do the CIL push procssing, leaving them more CPU to do more async transactions. To do this, we need to keep track of the sequence number we have pushed work for. This avoids having many transaction commits attempting to schedule work for the same sequence, and ensures that we only ever have one push (background or forced) in progress at a time. It also means that we don't need to take the CIL lock in write mode to check for potential background push races, which reduces lock contention. To avoid potential issues with "smart" IO schedulers, don't use the workqueue for log force triggered flushes. Instead, do them directly so that the log IO is done directly by the process issuing the log force and so doesn't get stuck on IO elevator queue idling incorrectly delaying the log IO from the workqueue. Signed-off-by: Dave Chinner <dchinner@redhat.com> Reviewed-by: Mark Tinguely <tinguely@sgi.com> Signed-off-by: Ben Myers <bpm@sgi.com>
-rw-r--r--fs/xfs/xfs_log_cil.c244
-rw-r--r--fs/xfs/xfs_log_priv.h2
-rw-r--r--fs/xfs/xfs_mount.h1
-rw-r--r--fs/xfs/xfs_super.c7
4 files changed, 160 insertions, 94 deletions
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
index d4fadbe8ac90..5fc7ec53d650 100644
--- a/fs/xfs/xfs_log_cil.c
+++ b/fs/xfs/xfs_log_cil.c
@@ -32,58 +32,6 @@
32#include "xfs_discard.h" 32#include "xfs_discard.h"
33 33
34/* 34/*
35 * Perform initial CIL structure initialisation.
36 */
37int
38xlog_cil_init(
39 struct log *log)
40{
41 struct xfs_cil *cil;
42 struct xfs_cil_ctx *ctx;
43
44 cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
45 if (!cil)
46 return ENOMEM;
47
48 ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
49 if (!ctx) {
50 kmem_free(cil);
51 return ENOMEM;
52 }
53
54 INIT_LIST_HEAD(&cil->xc_cil);
55 INIT_LIST_HEAD(&cil->xc_committing);
56 spin_lock_init(&cil->xc_cil_lock);
57 init_rwsem(&cil->xc_ctx_lock);
58 init_waitqueue_head(&cil->xc_commit_wait);
59
60 INIT_LIST_HEAD(&ctx->committing);
61 INIT_LIST_HEAD(&ctx->busy_extents);
62 ctx->sequence = 1;
63 ctx->cil = cil;
64 cil->xc_ctx = ctx;
65 cil->xc_current_sequence = ctx->sequence;
66
67 cil->xc_log = log;
68 log->l_cilp = cil;
69 return 0;
70}
71
72void
73xlog_cil_destroy(
74 struct log *log)
75{
76 if (log->l_cilp->xc_ctx) {
77 if (log->l_cilp->xc_ctx->ticket)
78 xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
79 kmem_free(log->l_cilp->xc_ctx);
80 }
81
82 ASSERT(list_empty(&log->l_cilp->xc_cil));
83 kmem_free(log->l_cilp);
84}
85
86/*
87 * Allocate a new ticket. Failing to get a new ticket makes it really hard to 35 * Allocate a new ticket. Failing to get a new ticket makes it really hard to
88 * recover, so we don't allow failure here. Also, we allocate in a context that 36 * recover, so we don't allow failure here. Also, we allocate in a context that
89 * we don't want to be issuing transactions from, so we need to tell the 37 * we don't want to be issuing transactions from, so we need to tell the
@@ -426,8 +374,7 @@ xlog_cil_committed(
426 */ 374 */
427STATIC int 375STATIC int
428xlog_cil_push( 376xlog_cil_push(
429 struct log *log, 377 struct log *log)
430 xfs_lsn_t push_seq)
431{ 378{
432 struct xfs_cil *cil = log->l_cilp; 379 struct xfs_cil *cil = log->l_cilp;
433 struct xfs_log_vec *lv; 380 struct xfs_log_vec *lv;
@@ -443,39 +390,36 @@ xlog_cil_push(
443 struct xfs_log_iovec lhdr; 390 struct xfs_log_iovec lhdr;
444 struct xfs_log_vec lvhdr = { NULL }; 391 struct xfs_log_vec lvhdr = { NULL };
445 xfs_lsn_t commit_lsn; 392 xfs_lsn_t commit_lsn;
393 xfs_lsn_t push_seq;
446 394
447 if (!cil) 395 if (!cil)
448 return 0; 396 return 0;
449 397
450 ASSERT(!push_seq || push_seq <= cil->xc_ctx->sequence);
451
452 new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS); 398 new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS);
453 new_ctx->ticket = xlog_cil_ticket_alloc(log); 399 new_ctx->ticket = xlog_cil_ticket_alloc(log);
454 400
455 /* 401 down_write(&cil->xc_ctx_lock);
456 * Lock out transaction commit, but don't block for background pushes
457 * unless we are well over the CIL space limit. See the definition of
458 * XLOG_CIL_HARD_SPACE_LIMIT() for the full explanation of the logic
459 * used here.
460 */
461 if (!down_write_trylock(&cil->xc_ctx_lock)) {
462 if (!push_seq &&
463 cil->xc_ctx->space_used < XLOG_CIL_HARD_SPACE_LIMIT(log))
464 goto out_free_ticket;
465 down_write(&cil->xc_ctx_lock);
466 }
467 ctx = cil->xc_ctx; 402 ctx = cil->xc_ctx;
468 403
469 /* check if we've anything to push */ 404 spin_lock(&cil->xc_cil_lock);
470 if (list_empty(&cil->xc_cil)) 405 push_seq = cil->xc_push_seq;
471 goto out_skip; 406 ASSERT(push_seq <= ctx->sequence);
472 407
473 /* check for spurious background flush */ 408 /*
474 if (!push_seq && cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) 409 * Check if we've anything to push. If there is nothing, then we don't
410 * move on to a new sequence number and so we have to be able to push
411 * this sequence again later.
412 */
413 if (list_empty(&cil->xc_cil)) {
414 cil->xc_push_seq = 0;
415 spin_unlock(&cil->xc_cil_lock);
475 goto out_skip; 416 goto out_skip;
417 }
418 spin_unlock(&cil->xc_cil_lock);
419
476 420
477 /* check for a previously pushed seqeunce */ 421 /* check for a previously pushed seqeunce */
478 if (push_seq && push_seq < cil->xc_ctx->sequence) 422 if (push_seq < cil->xc_ctx->sequence)
479 goto out_skip; 423 goto out_skip;
480 424
481 /* 425 /*
@@ -629,7 +573,6 @@ restart:
629 573
630out_skip: 574out_skip:
631 up_write(&cil->xc_ctx_lock); 575 up_write(&cil->xc_ctx_lock);
632out_free_ticket:
633 xfs_log_ticket_put(new_ctx->ticket); 576 xfs_log_ticket_put(new_ctx->ticket);
634 kmem_free(new_ctx); 577 kmem_free(new_ctx);
635 return 0; 578 return 0;
@@ -641,6 +584,82 @@ out_abort:
641 return XFS_ERROR(EIO); 584 return XFS_ERROR(EIO);
642} 585}
643 586
587static void
588xlog_cil_push_work(
589 struct work_struct *work)
590{
591 struct xfs_cil *cil = container_of(work, struct xfs_cil,
592 xc_push_work);
593 xlog_cil_push(cil->xc_log);
594}
595
596/*
597 * We need to push CIL every so often so we don't cache more than we can fit in
598 * the log. The limit really is that a checkpoint can't be more than half the
599 * log (the current checkpoint is not allowed to overwrite the previous
600 * checkpoint), but commit latency and memory usage limit this to a smaller
601 * size.
602 */
603static void
604xlog_cil_push_background(
605 struct log *log)
606{
607 struct xfs_cil *cil = log->l_cilp;
608
609 /*
610 * The cil won't be empty because we are called while holding the
611 * context lock so whatever we added to the CIL will still be there
612 */
613 ASSERT(!list_empty(&cil->xc_cil));
614
615 /*
616 * don't do a background push if we haven't used up all the
617 * space available yet.
618 */
619 if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
620 return;
621
622 spin_lock(&cil->xc_cil_lock);
623 if (cil->xc_push_seq < cil->xc_current_sequence) {
624 cil->xc_push_seq = cil->xc_current_sequence;
625 queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
626 }
627 spin_unlock(&cil->xc_cil_lock);
628
629}
630
631static void
632xlog_cil_push_foreground(
633 struct log *log,
634 xfs_lsn_t push_seq)
635{
636 struct xfs_cil *cil = log->l_cilp;
637
638 if (!cil)
639 return;
640
641 ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
642
643 /* start on any pending background push to minimise wait time on it */
644 flush_work(&cil->xc_push_work);
645
646 /*
647 * If the CIL is empty or we've already pushed the sequence then
648 * there's no work we need to do.
649 */
650 spin_lock(&cil->xc_cil_lock);
651 if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
652 spin_unlock(&cil->xc_cil_lock);
653 return;
654 }
655
656 cil->xc_push_seq = push_seq;
657 spin_unlock(&cil->xc_cil_lock);
658
659 /* do the push now */
660 xlog_cil_push(log);
661}
662
644/* 663/*
645 * Commit a transaction with the given vector to the Committed Item List. 664 * Commit a transaction with the given vector to the Committed Item List.
646 * 665 *
@@ -667,7 +686,6 @@ xfs_log_commit_cil(
667{ 686{
668 struct log *log = mp->m_log; 687 struct log *log = mp->m_log;
669 int log_flags = 0; 688 int log_flags = 0;
670 int push = 0;
671 struct xfs_log_vec *log_vector; 689 struct xfs_log_vec *log_vector;
672 690
673 if (flags & XFS_TRANS_RELEASE_LOG_RES) 691 if (flags & XFS_TRANS_RELEASE_LOG_RES)
@@ -719,21 +737,9 @@ xfs_log_commit_cil(
719 */ 737 */
720 xfs_trans_free_items(tp, *commit_lsn, 0); 738 xfs_trans_free_items(tp, *commit_lsn, 0);
721 739
722 /* check for background commit before unlock */ 740 xlog_cil_push_background(log);
723 if (log->l_cilp->xc_ctx->space_used > XLOG_CIL_SPACE_LIMIT(log))
724 push = 1;
725 741
726 up_read(&log->l_cilp->xc_ctx_lock); 742 up_read(&log->l_cilp->xc_ctx_lock);
727
728 /*
729 * We need to push CIL every so often so we don't cache more than we
730 * can fit in the log. The limit really is that a checkpoint can't be
731 * more than half the log (the current checkpoint is not allowed to
732 * overwrite the previous checkpoint), but commit latency and memory
733 * usage limit this to a smaller size in most cases.
734 */
735 if (push)
736 xlog_cil_push(log, 0);
737 return 0; 743 return 0;
738} 744}
739 745
@@ -746,9 +752,6 @@ xfs_log_commit_cil(
746 * 752 *
747 * We return the current commit lsn to allow the callers to determine if a 753 * We return the current commit lsn to allow the callers to determine if a
748 * iclog flush is necessary following this call. 754 * iclog flush is necessary following this call.
749 *
750 * XXX: Initially, just push the CIL unconditionally and return whatever
751 * commit lsn is there. It'll be empty, so this is broken for now.
752 */ 755 */
753xfs_lsn_t 756xfs_lsn_t
754xlog_cil_force_lsn( 757xlog_cil_force_lsn(
@@ -766,8 +769,7 @@ xlog_cil_force_lsn(
766 * xlog_cil_push() handles racing pushes for the same sequence, 769 * xlog_cil_push() handles racing pushes for the same sequence,
767 * so no need to deal with it here. 770 * so no need to deal with it here.
768 */ 771 */
769 if (sequence == cil->xc_current_sequence) 772 xlog_cil_push_foreground(log, sequence);
770 xlog_cil_push(log, sequence);
771 773
772 /* 774 /*
773 * See if we can find a previous sequence still committing. 775 * See if we can find a previous sequence still committing.
@@ -826,3 +828,57 @@ xfs_log_item_in_current_chkpt(
826 return false; 828 return false;
827 return true; 829 return true;
828} 830}
831
832/*
833 * Perform initial CIL structure initialisation.
834 */
835int
836xlog_cil_init(
837 struct log *log)
838{
839 struct xfs_cil *cil;
840 struct xfs_cil_ctx *ctx;
841
842 cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
843 if (!cil)
844 return ENOMEM;
845
846 ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
847 if (!ctx) {
848 kmem_free(cil);
849 return ENOMEM;
850 }
851
852 INIT_WORK(&cil->xc_push_work, xlog_cil_push_work);
853 INIT_LIST_HEAD(&cil->xc_cil);
854 INIT_LIST_HEAD(&cil->xc_committing);
855 spin_lock_init(&cil->xc_cil_lock);
856 init_rwsem(&cil->xc_ctx_lock);
857 init_waitqueue_head(&cil->xc_commit_wait);
858
859 INIT_LIST_HEAD(&ctx->committing);
860 INIT_LIST_HEAD(&ctx->busy_extents);
861 ctx->sequence = 1;
862 ctx->cil = cil;
863 cil->xc_ctx = ctx;
864 cil->xc_current_sequence = ctx->sequence;
865
866 cil->xc_log = log;
867 log->l_cilp = cil;
868 return 0;
869}
870
871void
872xlog_cil_destroy(
873 struct log *log)
874{
875 if (log->l_cilp->xc_ctx) {
876 if (log->l_cilp->xc_ctx->ticket)
877 xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
878 kmem_free(log->l_cilp->xc_ctx);
879 }
880
881 ASSERT(list_empty(&log->l_cilp->xc_cil));
882 kmem_free(log->l_cilp);
883}
884
diff --git a/fs/xfs/xfs_log_priv.h b/fs/xfs/xfs_log_priv.h
index 2152900b79d4..735ff1ee53da 100644
--- a/fs/xfs/xfs_log_priv.h
+++ b/fs/xfs/xfs_log_priv.h
@@ -417,6 +417,8 @@ struct xfs_cil {
417 struct list_head xc_committing; 417 struct list_head xc_committing;
418 wait_queue_head_t xc_commit_wait; 418 wait_queue_head_t xc_commit_wait;
419 xfs_lsn_t xc_current_sequence; 419 xfs_lsn_t xc_current_sequence;
420 struct work_struct xc_push_work;
421 xfs_lsn_t xc_push_seq;
420}; 422};
421 423
422/* 424/*
diff --git a/fs/xfs/xfs_mount.h b/fs/xfs/xfs_mount.h
index 19fd5eda92b8..8b89c5ac72d9 100644
--- a/fs/xfs/xfs_mount.h
+++ b/fs/xfs/xfs_mount.h
@@ -214,6 +214,7 @@ typedef struct xfs_mount {
214 214
215 struct workqueue_struct *m_data_workqueue; 215 struct workqueue_struct *m_data_workqueue;
216 struct workqueue_struct *m_unwritten_workqueue; 216 struct workqueue_struct *m_unwritten_workqueue;
217 struct workqueue_struct *m_cil_workqueue;
217} xfs_mount_t; 218} xfs_mount_t;
218 219
219/* 220/*
diff --git a/fs/xfs/xfs_super.c b/fs/xfs/xfs_super.c
index fa07b7731cf2..49197e24d8db 100644
--- a/fs/xfs/xfs_super.c
+++ b/fs/xfs/xfs_super.c
@@ -773,8 +773,14 @@ xfs_init_mount_workqueues(
773 if (!mp->m_unwritten_workqueue) 773 if (!mp->m_unwritten_workqueue)
774 goto out_destroy_data_iodone_queue; 774 goto out_destroy_data_iodone_queue;
775 775
776 mp->m_cil_workqueue = alloc_workqueue("xfs-cil/%s",
777 WQ_MEM_RECLAIM, 0, mp->m_fsname);
778 if (!mp->m_cil_workqueue)
779 goto out_destroy_unwritten;
776 return 0; 780 return 0;
777 781
782out_destroy_unwritten:
783 destroy_workqueue(mp->m_unwritten_workqueue);
778out_destroy_data_iodone_queue: 784out_destroy_data_iodone_queue:
779 destroy_workqueue(mp->m_data_workqueue); 785 destroy_workqueue(mp->m_data_workqueue);
780out: 786out:
@@ -785,6 +791,7 @@ STATIC void
785xfs_destroy_mount_workqueues( 791xfs_destroy_mount_workqueues(
786 struct xfs_mount *mp) 792 struct xfs_mount *mp)
787{ 793{
794 destroy_workqueue(mp->m_cil_workqueue);
788 destroy_workqueue(mp->m_data_workqueue); 795 destroy_workqueue(mp->m_data_workqueue);
789 destroy_workqueue(mp->m_unwritten_workqueue); 796 destroy_workqueue(mp->m_unwritten_workqueue);
790} 797}