aboutsummaryrefslogtreecommitdiffstats
path: root/fs/aio.c
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2013-05-07 19:18:49 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-07 21:38:29 -0400
commit0460fef2a9215680f7f85415b57731b7e0fdf673 (patch)
treea97456b0a9c6c6f66c4f22eac11d037be4c4d290 /fs/aio.c
parent21b40200cfe961b1428a529c63c33b1f1e1b4738 (diff)
aio: use cancellation list lazily
Cancelling kiocbs requires adding them to a per kioctx linked list, which is one of the few things we need to take the kioctx lock for in the fast path. But most kiocbs can't be cancelled - so if we just do this lazily, we can avoid quite a bit of locking overhead. While we're at it, instead of using a flag bit switch to using ki_cancel itself to indicate that a kiocb has been cancelled/completed. This lets us get rid of ki_flags entirely. [akpm@linux-foundation.org: remove buggy BUG()] Signed-off-by: Kent Overstreet <koverstreet@google.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Cc: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Reviewed-by: "Theodore Ts'o" <tytso@mit.edu> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'fs/aio.c')
-rw-r--r--fs/aio.c106
1 files changed, 62 insertions, 44 deletions
diff --git a/fs/aio.c b/fs/aio.c
index aea060d8c1e8..3428e9ae2f1d 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -97,6 +97,8 @@ struct kioctx {
97 97
98 struct aio_ring_info ring_info; 98 struct aio_ring_info ring_info;
99 99
100 spinlock_t completion_lock;
101
100 struct rcu_head rcu_head; 102 struct rcu_head rcu_head;
101 struct work_struct rcu_work; 103 struct work_struct rcu_work;
102}; 104};
@@ -220,25 +222,51 @@ static int aio_setup_ring(struct kioctx *ctx)
220#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) 222#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
221#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE) 223#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
222 224
225void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
226{
227 struct kioctx *ctx = req->ki_ctx;
228 unsigned long flags;
229
230 spin_lock_irqsave(&ctx->ctx_lock, flags);
231
232 if (!req->ki_list.next)
233 list_add(&req->ki_list, &ctx->active_reqs);
234
235 req->ki_cancel = cancel;
236
237 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
238}
239EXPORT_SYMBOL(kiocb_set_cancel_fn);
240
223static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, 241static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
224 struct io_event *res) 242 struct io_event *res)
225{ 243{
226 int (*cancel)(struct kiocb *, struct io_event *); 244 kiocb_cancel_fn *old, *cancel;
227 int ret = -EINVAL; 245 int ret = -EINVAL;
228 246
229 cancel = kiocb->ki_cancel; 247 /*
230 kiocbSetCancelled(kiocb); 248 * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
231 if (cancel) { 249 * actually has a cancel function, hence the cmpxchg()
232 atomic_inc(&kiocb->ki_users); 250 */
233 spin_unlock_irq(&ctx->ctx_lock); 251
252 cancel = ACCESS_ONCE(kiocb->ki_cancel);
253 do {
254 if (!cancel || cancel == KIOCB_CANCELLED)
255 return ret;
234 256
235 memset(res, 0, sizeof(*res)); 257 old = cancel;
236 res->obj = (u64)(unsigned long)kiocb->ki_obj.user; 258 cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
237 res->data = kiocb->ki_user_data; 259 } while (cancel != old);
238 ret = cancel(kiocb, res);
239 260
240 spin_lock_irq(&ctx->ctx_lock); 261 atomic_inc(&kiocb->ki_users);
241 } 262 spin_unlock_irq(&ctx->ctx_lock);
263
264 memset(res, 0, sizeof(*res));
265 res->obj = (u64)(unsigned long)kiocb->ki_obj.user;
266 res->data = kiocb->ki_user_data;
267 ret = cancel(kiocb, res);
268
269 spin_lock_irq(&ctx->ctx_lock);
242 270
243 return ret; 271 return ret;
244} 272}
@@ -326,6 +354,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
326 atomic_set(&ctx->users, 2); 354 atomic_set(&ctx->users, 2);
327 atomic_set(&ctx->dead, 0); 355 atomic_set(&ctx->dead, 0);
328 spin_lock_init(&ctx->ctx_lock); 356 spin_lock_init(&ctx->ctx_lock);
357 spin_lock_init(&ctx->completion_lock);
329 mutex_init(&ctx->ring_info.ring_lock); 358 mutex_init(&ctx->ring_info.ring_lock);
330 init_waitqueue_head(&ctx->wait); 359 init_waitqueue_head(&ctx->wait);
331 360
@@ -468,20 +497,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
468{ 497{
469 struct kiocb *req = NULL; 498 struct kiocb *req = NULL;
470 499
471 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); 500 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
472 if (unlikely(!req)) 501 if (unlikely(!req))
473 return NULL; 502 return NULL;
474 503
475 req->ki_flags = 0;
476 atomic_set(&req->ki_users, 2); 504 atomic_set(&req->ki_users, 2);
477 req->ki_key = 0;
478 req->ki_ctx = ctx; 505 req->ki_ctx = ctx;
479 req->ki_cancel = NULL;
480 req->ki_retry = NULL;
481 req->ki_dtor = NULL;
482 req->private = NULL;
483 req->ki_iovec = NULL;
484 req->ki_eventfd = NULL;
485 506
486 return req; 507 return req;
487} 508}
@@ -512,7 +533,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
512 spin_lock_irq(&ctx->ctx_lock); 533 spin_lock_irq(&ctx->ctx_lock);
513 list_for_each_entry_safe(req, n, &batch->head, ki_batch) { 534 list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
514 list_del(&req->ki_batch); 535 list_del(&req->ki_batch);
515 list_del(&req->ki_list);
516 kmem_cache_free(kiocb_cachep, req); 536 kmem_cache_free(kiocb_cachep, req);
517 atomic_dec(&ctx->reqs_active); 537 atomic_dec(&ctx->reqs_active);
518 } 538 }
@@ -559,10 +579,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
559 } 579 }
560 580
561 batch->count -= allocated; 581 batch->count -= allocated;
562 list_for_each_entry(req, &batch->head, ki_batch) { 582 atomic_add(allocated, &ctx->reqs_active);
563 list_add(&req->ki_list, &ctx->active_reqs);
564 atomic_inc(&ctx->reqs_active);
565 }
566 583
567 kunmap_atomic(ring); 584 kunmap_atomic(ring);
568 spin_unlock_irq(&ctx->ctx_lock); 585 spin_unlock_irq(&ctx->ctx_lock);
@@ -653,25 +670,34 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
653 info = &ctx->ring_info; 670 info = &ctx->ring_info;
654 671
655 /* 672 /*
656 * Add a completion event to the ring buffer. Must be done holding
657 * ctx->ctx_lock to prevent other code from messing with the tail
658 * pointer since we might be called from irq context.
659 *
660 * Take rcu_read_lock() in case the kioctx is being destroyed, as we 673 * Take rcu_read_lock() in case the kioctx is being destroyed, as we
661 * need to issue a wakeup after decrementing reqs_active. 674 * need to issue a wakeup after decrementing reqs_active.
662 */ 675 */
663 rcu_read_lock(); 676 rcu_read_lock();
664 spin_lock_irqsave(&ctx->ctx_lock, flags);
665 677
666 list_del(&iocb->ki_list); /* remove from active_reqs */ 678 if (iocb->ki_list.next) {
679 unsigned long flags;
680
681 spin_lock_irqsave(&ctx->ctx_lock, flags);
682 list_del(&iocb->ki_list);
683 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
684 }
667 685
668 /* 686 /*
669 * cancelled requests don't get events, userland was given one 687 * cancelled requests don't get events, userland was given one
670 * when the event got cancelled. 688 * when the event got cancelled.
671 */ 689 */
672 if (kiocbIsCancelled(iocb)) 690 if (unlikely(xchg(&iocb->ki_cancel,
691 KIOCB_CANCELLED) == KIOCB_CANCELLED))
673 goto put_rq; 692 goto put_rq;
674 693
694 /*
695 * Add a completion event to the ring buffer. Must be done holding
696 * ctx->ctx_lock to prevent other code from messing with the tail
697 * pointer since we might be called from irq context.
698 */
699 spin_lock_irqsave(&ctx->completion_lock, flags);
700
675 tail = info->tail; 701 tail = info->tail;
676 pos = tail + AIO_EVENTS_OFFSET; 702 pos = tail + AIO_EVENTS_OFFSET;
677 703
@@ -705,6 +731,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
705 kunmap_atomic(ring); 731 kunmap_atomic(ring);
706 flush_dcache_page(info->ring_pages[0]); 732 flush_dcache_page(info->ring_pages[0]);
707 733
734 spin_unlock_irqrestore(&ctx->completion_lock, flags);
735
708 pr_debug("added to ring %p at [%u]\n", iocb, tail); 736 pr_debug("added to ring %p at [%u]\n", iocb, tail);
709 737
710 /* 738 /*
@@ -731,7 +759,6 @@ put_rq:
731 if (waitqueue_active(&ctx->wait)) 759 if (waitqueue_active(&ctx->wait))
732 wake_up(&ctx->wait); 760 wake_up(&ctx->wait);
733 761
734 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
735 rcu_read_unlock(); 762 rcu_read_unlock();
736} 763}
737EXPORT_SYMBOL(aio_complete); 764EXPORT_SYMBOL(aio_complete);
@@ -1216,15 +1243,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1216 req->ki_opcode = iocb->aio_lio_opcode; 1243 req->ki_opcode = iocb->aio_lio_opcode;
1217 1244
1218 ret = aio_setup_iocb(req, compat); 1245 ret = aio_setup_iocb(req, compat);
1219
1220 if (ret) 1246 if (ret)
1221 goto out_put_req; 1247 goto out_put_req;
1222 1248
1223 if (unlikely(kiocbIsCancelled(req))) 1249 ret = req->ki_retry(req);
1224 ret = -EINTR;
1225 else
1226 ret = req->ki_retry(req);
1227
1228 if (ret != -EIOCBQUEUED) { 1250 if (ret != -EIOCBQUEUED) {
1229 /* 1251 /*
1230 * There's no easy way to restart the syscall since other AIO's 1252 * There's no easy way to restart the syscall since other AIO's
@@ -1241,10 +1263,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1241 return 0; 1263 return 0;
1242 1264
1243out_put_req: 1265out_put_req:
1244 spin_lock_irq(&ctx->ctx_lock);
1245 list_del(&req->ki_list);
1246 spin_unlock_irq(&ctx->ctx_lock);
1247
1248 atomic_dec(&ctx->reqs_active); 1266 atomic_dec(&ctx->reqs_active);
1249 aio_put_req(req); /* drop extra ref to req */ 1267 aio_put_req(req); /* drop extra ref to req */
1250 aio_put_req(req); /* drop i/o ref to req */ 1268 aio_put_req(req); /* drop i/o ref to req */