aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorEd Cashin <ecashin@coraid.com>2013-07-03 18:09:05 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-07-03 19:08:05 -0400
commit8030d34397e066deecb5ee9d17387fa767b12de2 (patch)
treeb66a88cd47d47c46295ced5f5cd2ac073be19116 /drivers/block
parentc378f70adbc1bbecd9e6db145019f14b2f688c7c (diff)
aoe: perform I/O completions in parallel
Some users have a large AoE target while others like to use many AoE targets at the same time. In the latter case, there is an opportunity to greatly improve aggregate throughput by allowing different threads to complete the I/O associated with each target. For 36 targets, 4 KiB read throughput roughly doubles, for example, with these changes in place. Signed-off-by: Ed Cashin <ecashin@coraid.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/aoe/aoe.h7
-rw-r--r--drivers/block/aoe/aoecmd.c152
-rw-r--r--drivers/block/aoe/aoedev.c1
-rw-r--r--drivers/block/aoe/aoenet.c5
4 files changed, 134 insertions, 31 deletions
diff --git a/drivers/block/aoe/aoe.h b/drivers/block/aoe/aoe.h
index 175649468c95..c9698529df51 100644
--- a/drivers/block/aoe/aoe.h
+++ b/drivers/block/aoe/aoe.h
@@ -196,9 +196,11 @@ struct ktstate {
196 struct completion rendez; 196 struct completion rendez;
197 struct task_struct *task; 197 struct task_struct *task;
198 wait_queue_head_t *waitq; 198 wait_queue_head_t *waitq;
199 int (*fn) (void); 199 int (*fn) (int);
200 char *name; 200 char name[12];
201 spinlock_t *lock; 201 spinlock_t *lock;
202 int id;
203 int active;
202}; 204};
203 205
204int aoeblk_init(void); 206int aoeblk_init(void);
@@ -222,6 +224,7 @@ int aoecmd_init(void);
222struct sk_buff *aoecmd_ata_id(struct aoedev *); 224struct sk_buff *aoecmd_ata_id(struct aoedev *);
223void aoe_freetframe(struct frame *); 225void aoe_freetframe(struct frame *);
224void aoe_flush_iocq(void); 226void aoe_flush_iocq(void);
227void aoe_flush_iocq_by_index(int);
225void aoe_end_request(struct aoedev *, struct request *, int); 228void aoe_end_request(struct aoedev *, struct request *, int);
226int aoe_ktstart(struct ktstate *k); 229int aoe_ktstart(struct ktstate *k);
227void aoe_ktstop(struct ktstate *k); 230void aoe_ktstop(struct ktstate *k);
diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c
index b75c7db16559..19955dd72eee 100644
--- a/drivers/block/aoe/aoecmd.c
+++ b/drivers/block/aoe/aoecmd.c
@@ -35,14 +35,27 @@ module_param(aoe_maxout, int, 0644);
35MODULE_PARM_DESC(aoe_maxout, 35MODULE_PARM_DESC(aoe_maxout,
36 "Only aoe_maxout outstanding packets for every MAC on eX.Y."); 36 "Only aoe_maxout outstanding packets for every MAC on eX.Y.");
37 37
38static wait_queue_head_t ktiowq; 38/* The number of online cpus during module initialization gives us a
39static struct ktstate kts; 39 * convenient heuristic cap on the parallelism used for ktio threads
40 * doing I/O completion. It is not important that the cap equal the
41 * actual number of running CPUs at any given time, but because of CPU
42 * hotplug, we take care to use ncpus instead of using
43 * num_online_cpus() after module initialization.
44 */
45static int ncpus;
46
47/* mutex lock used for synchronization while thread spawning */
48static DEFINE_MUTEX(ktio_spawn_lock);
49
50static wait_queue_head_t *ktiowq;
51static struct ktstate *kts;
40 52
41/* io completion queue */ 53/* io completion queue */
42static struct { 54struct iocq_ktio {
43 struct list_head head; 55 struct list_head head;
44 spinlock_t lock; 56 spinlock_t lock;
45} iocq; 57};
58static struct iocq_ktio *iocq;
46 59
47static struct page *empty_page; 60static struct page *empty_page;
48 61
@@ -1278,23 +1291,36 @@ out:
1278 * Returns true iff responses needing processing remain. 1291 * Returns true iff responses needing processing remain.
1279 */ 1292 */
1280static int 1293static int
1281ktio(void) 1294ktio(int id)
1282{ 1295{
1283 struct frame *f; 1296 struct frame *f;
1284 struct list_head *pos; 1297 struct list_head *pos;
1285 int i; 1298 int i;
1299 int actual_id;
1286 1300
1287 for (i = 0; ; ++i) { 1301 for (i = 0; ; ++i) {
1288 if (i == MAXIOC) 1302 if (i == MAXIOC)
1289 return 1; 1303 return 1;
1290 if (list_empty(&iocq.head)) 1304 if (list_empty(&iocq[id].head))
1291 return 0; 1305 return 0;
1292 pos = iocq.head.next; 1306 pos = iocq[id].head.next;
1293 list_del(pos); 1307 list_del(pos);
1294 spin_unlock_irq(&iocq.lock);
1295 f = list_entry(pos, struct frame, head); 1308 f = list_entry(pos, struct frame, head);
1309 spin_unlock_irq(&iocq[id].lock);
1296 ktiocomplete(f); 1310 ktiocomplete(f);
1297 spin_lock_irq(&iocq.lock); 1311
1312 /* Figure out if extra threads are required. */
1313 actual_id = f->t->d->aoeminor % ncpus;
1314
1315 if (!kts[actual_id].active) {
1316 BUG_ON(id != 0);
1317 mutex_lock(&ktio_spawn_lock);
1318 if (!kts[actual_id].active
1319 && aoe_ktstart(&kts[actual_id]) == 0)
1320 kts[actual_id].active = 1;
1321 mutex_unlock(&ktio_spawn_lock);
1322 }
1323 spin_lock_irq(&iocq[id].lock);
1298 } 1324 }
1299} 1325}
1300 1326
@@ -1311,7 +1337,7 @@ kthread(void *vp)
1311 complete(&k->rendez); /* tell spawner we're running */ 1337 complete(&k->rendez); /* tell spawner we're running */
1312 do { 1338 do {
1313 spin_lock_irq(k->lock); 1339 spin_lock_irq(k->lock);
1314 more = k->fn(); 1340 more = k->fn(k->id);
1315 if (!more) { 1341 if (!more) {
1316 add_wait_queue(k->waitq, &wait); 1342 add_wait_queue(k->waitq, &wait);
1317 __set_current_state(TASK_INTERRUPTIBLE); 1343 __set_current_state(TASK_INTERRUPTIBLE);
@@ -1353,13 +1379,24 @@ aoe_ktstart(struct ktstate *k)
1353static void 1379static void
1354ktcomplete(struct frame *f, struct sk_buff *skb) 1380ktcomplete(struct frame *f, struct sk_buff *skb)
1355{ 1381{
1382 int id;
1356 ulong flags; 1383 ulong flags;
1357 1384
1358 f->r_skb = skb; 1385 f->r_skb = skb;
1359 spin_lock_irqsave(&iocq.lock, flags); 1386 id = f->t->d->aoeminor % ncpus;
1360 list_add_tail(&f->head, &iocq.head); 1387 spin_lock_irqsave(&iocq[id].lock, flags);
1361 spin_unlock_irqrestore(&iocq.lock, flags); 1388 if (!kts[id].active) {
1362 wake_up(&ktiowq); 1389 spin_unlock_irqrestore(&iocq[id].lock, flags);
1390 /* The thread with id has not been spawned yet,
1391 * so delegate the work to the main thread and
1392 * try spawning a new thread.
1393 */
1394 id = 0;
1395 spin_lock_irqsave(&iocq[id].lock, flags);
1396 }
1397 list_add_tail(&f->head, &iocq[id].head);
1398 spin_unlock_irqrestore(&iocq[id].lock, flags);
1399 wake_up(&ktiowq[id]);
1363} 1400}
1364 1401
1365struct sk_buff * 1402struct sk_buff *
@@ -1706,6 +1743,17 @@ aoe_failbuf(struct aoedev *d, struct buf *buf)
1706void 1743void
1707aoe_flush_iocq(void) 1744aoe_flush_iocq(void)
1708{ 1745{
1746 int i;
1747
1748 for (i = 0; i < ncpus; i++) {
1749 if (kts[i].active)
1750 aoe_flush_iocq_by_index(i);
1751 }
1752}
1753
1754void
1755aoe_flush_iocq_by_index(int id)
1756{
1709 struct frame *f; 1757 struct frame *f;
1710 struct aoedev *d; 1758 struct aoedev *d;
1711 LIST_HEAD(flist); 1759 LIST_HEAD(flist);
@@ -1713,9 +1761,9 @@ aoe_flush_iocq(void)
1713 struct sk_buff *skb; 1761 struct sk_buff *skb;
1714 ulong flags; 1762 ulong flags;
1715 1763
1716 spin_lock_irqsave(&iocq.lock, flags); 1764 spin_lock_irqsave(&iocq[id].lock, flags);
1717 list_splice_init(&iocq.head, &flist); 1765 list_splice_init(&iocq[id].head, &flist);
1718 spin_unlock_irqrestore(&iocq.lock, flags); 1766 spin_unlock_irqrestore(&iocq[id].lock, flags);
1719 while (!list_empty(&flist)) { 1767 while (!list_empty(&flist)) {
1720 pos = flist.next; 1768 pos = flist.next;
1721 list_del(pos); 1769 list_del(pos);
@@ -1738,6 +1786,8 @@ int __init
1738aoecmd_init(void) 1786aoecmd_init(void)
1739{ 1787{
1740 void *p; 1788 void *p;
1789 int i;
1790 int ret;
1741 1791
1742 /* get_zeroed_page returns page with ref count 1 */ 1792 /* get_zeroed_page returns page with ref count 1 */
1743 p = (void *) get_zeroed_page(GFP_KERNEL | __GFP_REPEAT); 1793 p = (void *) get_zeroed_page(GFP_KERNEL | __GFP_REPEAT);
@@ -1745,22 +1795,72 @@ aoecmd_init(void)
1745 return -ENOMEM; 1795 return -ENOMEM;
1746 empty_page = virt_to_page(p); 1796 empty_page = virt_to_page(p);
1747 1797
1748 INIT_LIST_HEAD(&iocq.head); 1798 ncpus = num_online_cpus();
1749 spin_lock_init(&iocq.lock); 1799
1750 init_waitqueue_head(&ktiowq); 1800 iocq = kcalloc(ncpus, sizeof(struct iocq_ktio), GFP_KERNEL);
1751 kts.name = "aoe_ktio"; 1801 if (!iocq)
1752 kts.fn = ktio; 1802 return -ENOMEM;
1753 kts.waitq = &ktiowq; 1803
1754 kts.lock = &iocq.lock; 1804 kts = kcalloc(ncpus, sizeof(struct ktstate), GFP_KERNEL);
1755 return aoe_ktstart(&kts); 1805 if (!kts) {
1806 ret = -ENOMEM;
1807 goto kts_fail;
1808 }
1809
1810 ktiowq = kcalloc(ncpus, sizeof(wait_queue_head_t), GFP_KERNEL);
1811 if (!ktiowq) {
1812 ret = -ENOMEM;
1813 goto ktiowq_fail;
1814 }
1815
1816 mutex_init(&ktio_spawn_lock);
1817
1818 for (i = 0; i < ncpus; i++) {
1819 INIT_LIST_HEAD(&iocq[i].head);
1820 spin_lock_init(&iocq[i].lock);
1821 init_waitqueue_head(&ktiowq[i]);
1822 snprintf(kts[i].name, sizeof(kts[i].name), "aoe_ktio%d", i);
1823 kts[i].fn = ktio;
1824 kts[i].waitq = &ktiowq[i];
1825 kts[i].lock = &iocq[i].lock;
1826 kts[i].id = i;
1827 kts[i].active = 0;
1828 }
1829 kts[0].active = 1;
1830 if (aoe_ktstart(&kts[0])) {
1831 ret = -ENOMEM;
1832 goto ktstart_fail;
1833 }
1834 return 0;
1835
1836ktstart_fail:
1837 kfree(ktiowq);
1838ktiowq_fail:
1839 kfree(kts);
1840kts_fail:
1841 kfree(iocq);
1842
1843 return ret;
1756} 1844}
1757 1845
1758void 1846void
1759aoecmd_exit(void) 1847aoecmd_exit(void)
1760{ 1848{
1761 aoe_ktstop(&kts); 1849 int i;
1850
1851 for (i = 0; i < ncpus; i++)
1852 if (kts[i].active)
1853 aoe_ktstop(&kts[i]);
1854
1762 aoe_flush_iocq(); 1855 aoe_flush_iocq();
1763 1856
1857 /* Free up the iocq and thread speicific configuration
1858 * allocated during startup.
1859 */
1860 kfree(iocq);
1861 kfree(kts);
1862 kfree(ktiowq);
1863
1764 free_page((unsigned long) page_address(empty_page)); 1864 free_page((unsigned long) page_address(empty_page));
1765 empty_page = NULL; 1865 empty_page = NULL;
1766} 1866}
diff --git a/drivers/block/aoe/aoedev.c b/drivers/block/aoe/aoedev.c
index 98f2965778b9..92201b6334c2 100644
--- a/drivers/block/aoe/aoedev.c
+++ b/drivers/block/aoe/aoedev.c
@@ -518,7 +518,6 @@ void
518aoedev_exit(void) 518aoedev_exit(void)
519{ 519{
520 flush_scheduled_work(); 520 flush_scheduled_work();
521 aoe_flush_iocq();
522 flush(NULL, 0, EXITING); 521 flush(NULL, 0, EXITING);
523} 522}
524 523
diff --git a/drivers/block/aoe/aoenet.c b/drivers/block/aoe/aoenet.c
index 71d3ea8d3006..4af5f06c467b 100644
--- a/drivers/block/aoe/aoenet.c
+++ b/drivers/block/aoe/aoenet.c
@@ -52,7 +52,7 @@ static struct sk_buff_head skbtxq;
52 52
53/* enters with txlock held */ 53/* enters with txlock held */
54static int 54static int
55tx(void) __must_hold(&txlock) 55tx(int id) __must_hold(&txlock)
56{ 56{
57 struct sk_buff *skb; 57 struct sk_buff *skb;
58 struct net_device *ifp; 58 struct net_device *ifp;
@@ -205,7 +205,8 @@ aoenet_init(void)
205 kts.lock = &txlock; 205 kts.lock = &txlock;
206 kts.fn = tx; 206 kts.fn = tx;
207 kts.waitq = &txwq; 207 kts.waitq = &txwq;
208 kts.name = "aoe_tx"; 208 kts.id = 0;
209 snprintf(kts.name, sizeof(kts.name), "aoe_tx%d", kts.id);
209 if (aoe_ktstart(&kts)) 210 if (aoe_ktstart(&kts))
210 return -EAGAIN; 211 return -EAGAIN;
211 dev_add_pack(&aoe_pt); 212 dev_add_pack(&aoe_pt);