aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2013-05-07 19:18:49 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-07 21:38:29 -0400
commit0460fef2a9215680f7f85415b57731b7e0fdf673 (patch)
treea97456b0a9c6c6f66c4f22eac11d037be4c4d290
parent21b40200cfe961b1428a529c63c33b1f1e1b4738 (diff)
aio: use cancellation list lazily
Cancelling kiocbs requires adding them to a per kioctx linked list, which is one of the few things we need to take the kioctx lock for in the fast path. But most kiocbs can't be cancelled - so if we just do this lazily, we can avoid quite a bit of locking overhead. While we're at it, instead of using a flag bit switch to using ki_cancel itself to indicate that a kiocb has been cancelled/completed. This lets us get rid of ki_flags entirely. [akpm@linux-foundation.org: remove buggy BUG()] Signed-off-by: Kent Overstreet <koverstreet@google.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Cc: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Reviewed-by: "Theodore Ts'o" <tytso@mit.edu> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r--drivers/usb/gadget/inode.c3
-rw-r--r--fs/aio.c106
-rw-r--r--include/linux/aio.h27
3 files changed, 81 insertions, 55 deletions
diff --git a/drivers/usb/gadget/inode.c b/drivers/usb/gadget/inode.c
index 994e7433e87a..570c005062ab 100644
--- a/drivers/usb/gadget/inode.c
+++ b/drivers/usb/gadget/inode.c
@@ -533,7 +533,6 @@ static int ep_aio_cancel(struct kiocb *iocb, struct io_event *e)
533 local_irq_disable(); 533 local_irq_disable();
534 epdata = priv->epdata; 534 epdata = priv->epdata;
535 // spin_lock(&epdata->dev->lock); 535 // spin_lock(&epdata->dev->lock);
536 kiocbSetCancelled(iocb);
537 if (likely(epdata && epdata->ep && priv->req)) 536 if (likely(epdata && epdata->ep && priv->req))
538 value = usb_ep_dequeue (epdata->ep, priv->req); 537 value = usb_ep_dequeue (epdata->ep, priv->req);
539 else 538 else
@@ -663,7 +662,7 @@ fail:
663 goto fail; 662 goto fail;
664 } 663 }
665 664
666 iocb->ki_cancel = ep_aio_cancel; 665 kiocb_set_cancel_fn(iocb, ep_aio_cancel);
667 get_ep(epdata); 666 get_ep(epdata);
668 priv->epdata = epdata; 667 priv->epdata = epdata;
669 priv->actual = 0; 668 priv->actual = 0;
diff --git a/fs/aio.c b/fs/aio.c
index aea060d8c1e8..3428e9ae2f1d 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -97,6 +97,8 @@ struct kioctx {
97 97
98 struct aio_ring_info ring_info; 98 struct aio_ring_info ring_info;
99 99
100 spinlock_t completion_lock;
101
100 struct rcu_head rcu_head; 102 struct rcu_head rcu_head;
101 struct work_struct rcu_work; 103 struct work_struct rcu_work;
102}; 104};
@@ -220,25 +222,51 @@ static int aio_setup_ring(struct kioctx *ctx)
220#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) 222#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
221#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE) 223#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
222 224
225void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
226{
227 struct kioctx *ctx = req->ki_ctx;
228 unsigned long flags;
229
230 spin_lock_irqsave(&ctx->ctx_lock, flags);
231
232 if (!req->ki_list.next)
233 list_add(&req->ki_list, &ctx->active_reqs);
234
235 req->ki_cancel = cancel;
236
237 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
238}
239EXPORT_SYMBOL(kiocb_set_cancel_fn);
240
223static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, 241static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
224 struct io_event *res) 242 struct io_event *res)
225{ 243{
226 int (*cancel)(struct kiocb *, struct io_event *); 244 kiocb_cancel_fn *old, *cancel;
227 int ret = -EINVAL; 245 int ret = -EINVAL;
228 246
229 cancel = kiocb->ki_cancel; 247 /*
230 kiocbSetCancelled(kiocb); 248 * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
231 if (cancel) { 249 * actually has a cancel function, hence the cmpxchg()
232 atomic_inc(&kiocb->ki_users); 250 */
233 spin_unlock_irq(&ctx->ctx_lock); 251
252 cancel = ACCESS_ONCE(kiocb->ki_cancel);
253 do {
254 if (!cancel || cancel == KIOCB_CANCELLED)
255 return ret;
234 256
235 memset(res, 0, sizeof(*res)); 257 old = cancel;
236 res->obj = (u64)(unsigned long)kiocb->ki_obj.user; 258 cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
237 res->data = kiocb->ki_user_data; 259 } while (cancel != old);
238 ret = cancel(kiocb, res);
239 260
240 spin_lock_irq(&ctx->ctx_lock); 261 atomic_inc(&kiocb->ki_users);
241 } 262 spin_unlock_irq(&ctx->ctx_lock);
263
264 memset(res, 0, sizeof(*res));
265 res->obj = (u64)(unsigned long)kiocb->ki_obj.user;
266 res->data = kiocb->ki_user_data;
267 ret = cancel(kiocb, res);
268
269 spin_lock_irq(&ctx->ctx_lock);
242 270
243 return ret; 271 return ret;
244} 272}
@@ -326,6 +354,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
326 atomic_set(&ctx->users, 2); 354 atomic_set(&ctx->users, 2);
327 atomic_set(&ctx->dead, 0); 355 atomic_set(&ctx->dead, 0);
328 spin_lock_init(&ctx->ctx_lock); 356 spin_lock_init(&ctx->ctx_lock);
357 spin_lock_init(&ctx->completion_lock);
329 mutex_init(&ctx->ring_info.ring_lock); 358 mutex_init(&ctx->ring_info.ring_lock);
330 init_waitqueue_head(&ctx->wait); 359 init_waitqueue_head(&ctx->wait);
331 360
@@ -468,20 +497,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
468{ 497{
469 struct kiocb *req = NULL; 498 struct kiocb *req = NULL;
470 499
471 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); 500 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
472 if (unlikely(!req)) 501 if (unlikely(!req))
473 return NULL; 502 return NULL;
474 503
475 req->ki_flags = 0;
476 atomic_set(&req->ki_users, 2); 504 atomic_set(&req->ki_users, 2);
477 req->ki_key = 0;
478 req->ki_ctx = ctx; 505 req->ki_ctx = ctx;
479 req->ki_cancel = NULL;
480 req->ki_retry = NULL;
481 req->ki_dtor = NULL;
482 req->private = NULL;
483 req->ki_iovec = NULL;
484 req->ki_eventfd = NULL;
485 506
486 return req; 507 return req;
487} 508}
@@ -512,7 +533,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
512 spin_lock_irq(&ctx->ctx_lock); 533 spin_lock_irq(&ctx->ctx_lock);
513 list_for_each_entry_safe(req, n, &batch->head, ki_batch) { 534 list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
514 list_del(&req->ki_batch); 535 list_del(&req->ki_batch);
515 list_del(&req->ki_list);
516 kmem_cache_free(kiocb_cachep, req); 536 kmem_cache_free(kiocb_cachep, req);
517 atomic_dec(&ctx->reqs_active); 537 atomic_dec(&ctx->reqs_active);
518 } 538 }
@@ -559,10 +579,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
559 } 579 }
560 580
561 batch->count -= allocated; 581 batch->count -= allocated;
562 list_for_each_entry(req, &batch->head, ki_batch) { 582 atomic_add(allocated, &ctx->reqs_active);
563 list_add(&req->ki_list, &ctx->active_reqs);
564 atomic_inc(&ctx->reqs_active);
565 }
566 583
567 kunmap_atomic(ring); 584 kunmap_atomic(ring);
568 spin_unlock_irq(&ctx->ctx_lock); 585 spin_unlock_irq(&ctx->ctx_lock);
@@ -653,25 +670,34 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
653 info = &ctx->ring_info; 670 info = &ctx->ring_info;
654 671
655 /* 672 /*
656 * Add a completion event to the ring buffer. Must be done holding
657 * ctx->ctx_lock to prevent other code from messing with the tail
658 * pointer since we might be called from irq context.
659 *
660 * Take rcu_read_lock() in case the kioctx is being destroyed, as we 673 * Take rcu_read_lock() in case the kioctx is being destroyed, as we
661 * need to issue a wakeup after decrementing reqs_active. 674 * need to issue a wakeup after decrementing reqs_active.
662 */ 675 */
663 rcu_read_lock(); 676 rcu_read_lock();
664 spin_lock_irqsave(&ctx->ctx_lock, flags);
665 677
666 list_del(&iocb->ki_list); /* remove from active_reqs */ 678 if (iocb->ki_list.next) {
679 unsigned long flags;
680
681 spin_lock_irqsave(&ctx->ctx_lock, flags);
682 list_del(&iocb->ki_list);
683 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
684 }
667 685
668 /* 686 /*
669 * cancelled requests don't get events, userland was given one 687 * cancelled requests don't get events, userland was given one
670 * when the event got cancelled. 688 * when the event got cancelled.
671 */ 689 */
672 if (kiocbIsCancelled(iocb)) 690 if (unlikely(xchg(&iocb->ki_cancel,
691 KIOCB_CANCELLED) == KIOCB_CANCELLED))
673 goto put_rq; 692 goto put_rq;
674 693
694 /*
695 * Add a completion event to the ring buffer. Must be done holding
696 * ctx->ctx_lock to prevent other code from messing with the tail
697 * pointer since we might be called from irq context.
698 */
699 spin_lock_irqsave(&ctx->completion_lock, flags);
700
675 tail = info->tail; 701 tail = info->tail;
676 pos = tail + AIO_EVENTS_OFFSET; 702 pos = tail + AIO_EVENTS_OFFSET;
677 703
@@ -705,6 +731,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
705 kunmap_atomic(ring); 731 kunmap_atomic(ring);
706 flush_dcache_page(info->ring_pages[0]); 732 flush_dcache_page(info->ring_pages[0]);
707 733
734 spin_unlock_irqrestore(&ctx->completion_lock, flags);
735
708 pr_debug("added to ring %p at [%u]\n", iocb, tail); 736 pr_debug("added to ring %p at [%u]\n", iocb, tail);
709 737
710 /* 738 /*
@@ -731,7 +759,6 @@ put_rq:
731 if (waitqueue_active(&ctx->wait)) 759 if (waitqueue_active(&ctx->wait))
732 wake_up(&ctx->wait); 760 wake_up(&ctx->wait);
733 761
734 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
735 rcu_read_unlock(); 762 rcu_read_unlock();
736} 763}
737EXPORT_SYMBOL(aio_complete); 764EXPORT_SYMBOL(aio_complete);
@@ -1216,15 +1243,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1216 req->ki_opcode = iocb->aio_lio_opcode; 1243 req->ki_opcode = iocb->aio_lio_opcode;
1217 1244
1218 ret = aio_setup_iocb(req, compat); 1245 ret = aio_setup_iocb(req, compat);
1219
1220 if (ret) 1246 if (ret)
1221 goto out_put_req; 1247 goto out_put_req;
1222 1248
1223 if (unlikely(kiocbIsCancelled(req))) 1249 ret = req->ki_retry(req);
1224 ret = -EINTR;
1225 else
1226 ret = req->ki_retry(req);
1227
1228 if (ret != -EIOCBQUEUED) { 1250 if (ret != -EIOCBQUEUED) {
1229 /* 1251 /*
1230 * There's no easy way to restart the syscall since other AIO's 1252 * There's no easy way to restart the syscall since other AIO's
@@ -1241,10 +1263,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1241 return 0; 1263 return 0;
1242 1264
1243out_put_req: 1265out_put_req:
1244 spin_lock_irq(&ctx->ctx_lock);
1245 list_del(&req->ki_list);
1246 spin_unlock_irq(&ctx->ctx_lock);
1247
1248 atomic_dec(&ctx->reqs_active); 1266 atomic_dec(&ctx->reqs_active);
1249 aio_put_req(req); /* drop extra ref to req */ 1267 aio_put_req(req); /* drop extra ref to req */
1250 aio_put_req(req); /* drop i/o ref to req */ 1268 aio_put_req(req); /* drop i/o ref to req */
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 1e728f0086f8..d2a00038ec77 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -10,17 +10,24 @@
10#include <linux/atomic.h> 10#include <linux/atomic.h>
11 11
12struct kioctx; 12struct kioctx;
13struct kiocb;
13 14
14#define KIOCB_SYNC_KEY (~0U) 15#define KIOCB_SYNC_KEY (~0U)
15 16
16/* ki_flags bits */ 17/*
17#define KIF_CANCELLED 2 18 * We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either
18 19 * cancelled or completed (this makes a certain amount of sense because
19#define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags) 20 * successful cancellation - io_cancel() - does deliver the completion to
20 21 * userspace).
21#define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags) 22 *
23 * And since most things don't implement kiocb cancellation and we'd really like
24 * kiocb completion to be lockless when possible, we use ki_cancel to
25 * synchronize cancellation and completion - we only set it to KIOCB_CANCELLED
26 * with xchg() or cmpxchg(), see batch_complete_aio() and kiocb_cancel().
27 */
28#define KIOCB_CANCELLED ((void *) (~0ULL))
22 29
23#define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags) 30typedef int (kiocb_cancel_fn)(struct kiocb *, struct io_event *);
24 31
25/* is there a better place to document function pointer methods? */ 32/* is there a better place to document function pointer methods? */
26/** 33/**
@@ -48,13 +55,12 @@ struct kioctx;
48 * calls may result in undefined behaviour. 55 * calls may result in undefined behaviour.
49 */ 56 */
50struct kiocb { 57struct kiocb {
51 unsigned long ki_flags;
52 atomic_t ki_users; 58 atomic_t ki_users;
53 unsigned ki_key; /* id of this request */ 59 unsigned ki_key; /* id of this request */
54 60
55 struct file *ki_filp; 61 struct file *ki_filp;
56 struct kioctx *ki_ctx; /* may be NULL for sync ops */ 62 struct kioctx *ki_ctx; /* may be NULL for sync ops */
57 int (*ki_cancel)(struct kiocb *, struct io_event *); 63 kiocb_cancel_fn *ki_cancel;
58 ssize_t (*ki_retry)(struct kiocb *); 64 ssize_t (*ki_retry)(struct kiocb *);
59 void (*ki_dtor)(struct kiocb *); 65 void (*ki_dtor)(struct kiocb *);
60 66
@@ -112,6 +118,7 @@ struct mm_struct;
112extern void exit_aio(struct mm_struct *mm); 118extern void exit_aio(struct mm_struct *mm);
113extern long do_io_submit(aio_context_t ctx_id, long nr, 119extern long do_io_submit(aio_context_t ctx_id, long nr,
114 struct iocb __user *__user *iocbpp, bool compat); 120 struct iocb __user *__user *iocbpp, bool compat);
121void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel);
115#else 122#else
116static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; } 123static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
117static inline void aio_put_req(struct kiocb *iocb) { } 124static inline void aio_put_req(struct kiocb *iocb) { }
@@ -121,6 +128,8 @@ static inline void exit_aio(struct mm_struct *mm) { }
121static inline long do_io_submit(aio_context_t ctx_id, long nr, 128static inline long do_io_submit(aio_context_t ctx_id, long nr,
122 struct iocb __user * __user *iocbpp, 129 struct iocb __user * __user *iocbpp,
123 bool compat) { return 0; } 130 bool compat) { return 0; }
131static inline void kiocb_set_cancel_fn(struct kiocb *req,
132 kiocb_cancel_fn *cancel) { }
124#endif /* CONFIG_AIO */ 133#endif /* CONFIG_AIO */
125 134
126static inline struct kiocb *list_kiocb(struct list_head *h) 135static inline struct kiocb *list_kiocb(struct list_head *h)