aboutsummaryrefslogtreecommitdiffstats
path: root/fs/aio.c
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2013-05-07 19:18:39 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-07 21:38:28 -0400
commit11599ebac4a249ab3c8b9a535c21db7a51458c0a (patch)
treec5927106704034f4dbe6c10ae2c2c6bd60df42d8 /fs/aio.c
parent1d98ebfccc15aeea87a7c48d50d7343e1ce8daae (diff)
aio: make aio_put_req() lockless
Freeing a kiocb needed to touch the kioctx for three things: * Pull it off the reqs_active list * Decrementing reqs_active * Issuing a wakeup, if the kioctx was in the process of being freed. This patch moves these to aio_complete(), for a couple reasons: * aio_complete() already has to issue the wakeup, so if we drop the kioctx refcount before aio_complete does its wakeup we don't have to do it twice. * aio_complete currently has to take the kioctx lock, so it makes sense for it to pull the kiocb off the reqs_active list too. * A later patch is going to change reqs_active to include unreaped completions - this will mean allocating a kiocb doesn't have to look at the ringbuffer. So taking the decrement of reqs_active out of kiocb_free() is useful prep work for that patch. This doesn't really affect cancellation, since existing (usb) code that implements a cancel function still calls aio_complete() - we just have to make sure that aio_complete does the necessary teardown for cancelled kiocbs. It does affect code paths where we free kiocbs that were never submitted; they need to decrement reqs_active and pull the kiocb off the reqs_active list. This occurs in two places: kiocb_batch_free(), which is going away in a later patch, and the error path in io_submit_one. [akpm@linux-foundation.org: coding-style fixes] Signed-off-by: Kent Overstreet <koverstreet@google.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Acked-by: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Reviewed-by: "Theodore Ts'o" <tytso@mit.edu> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
Diffstat (limited to 'fs/aio.c')
-rw-r--r--fs/aio.c86
1 files changed, 34 insertions, 52 deletions
diff --git a/fs/aio.c b/fs/aio.c
index ffa8b8d2cb8e..f877417f3c42 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -89,7 +89,7 @@ struct kioctx {
89 89
90 spinlock_t ctx_lock; 90 spinlock_t ctx_lock;
91 91
92 int reqs_active; 92 atomic_t reqs_active;
93 struct list_head active_reqs; /* used for cancellation */ 93 struct list_head active_reqs; /* used for cancellation */
94 94
95 /* sys_io_setup currently limits this to an unsigned int */ 95 /* sys_io_setup currently limits this to an unsigned int */
@@ -250,7 +250,7 @@ static void ctx_rcu_free(struct rcu_head *head)
250static void __put_ioctx(struct kioctx *ctx) 250static void __put_ioctx(struct kioctx *ctx)
251{ 251{
252 unsigned nr_events = ctx->max_reqs; 252 unsigned nr_events = ctx->max_reqs;
253 BUG_ON(ctx->reqs_active); 253 BUG_ON(atomic_read(&ctx->reqs_active));
254 254
255 aio_free_ring(ctx); 255 aio_free_ring(ctx);
256 if (nr_events) { 256 if (nr_events) {
@@ -284,7 +284,7 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
284 cancel = kiocb->ki_cancel; 284 cancel = kiocb->ki_cancel;
285 kiocbSetCancelled(kiocb); 285 kiocbSetCancelled(kiocb);
286 if (cancel) { 286 if (cancel) {
287 kiocb->ki_users++; 287 atomic_inc(&kiocb->ki_users);
288 spin_unlock_irq(&ctx->ctx_lock); 288 spin_unlock_irq(&ctx->ctx_lock);
289 289
290 memset(res, 0, sizeof(*res)); 290 memset(res, 0, sizeof(*res));
@@ -383,12 +383,12 @@ static void kill_ctx(struct kioctx *ctx)
383 kiocb_cancel(ctx, req, &res); 383 kiocb_cancel(ctx, req, &res);
384 } 384 }
385 385
386 if (!ctx->reqs_active) 386 if (!atomic_read(&ctx->reqs_active))
387 goto out; 387 goto out;
388 388
389 add_wait_queue(&ctx->wait, &wait); 389 add_wait_queue(&ctx->wait, &wait);
390 set_task_state(tsk, TASK_UNINTERRUPTIBLE); 390 set_task_state(tsk, TASK_UNINTERRUPTIBLE);
391 while (ctx->reqs_active) { 391 while (atomic_read(&ctx->reqs_active)) {
392 spin_unlock_irq(&ctx->ctx_lock); 392 spin_unlock_irq(&ctx->ctx_lock);
393 io_schedule(); 393 io_schedule();
394 set_task_state(tsk, TASK_UNINTERRUPTIBLE); 394 set_task_state(tsk, TASK_UNINTERRUPTIBLE);
@@ -406,9 +406,9 @@ out:
406 */ 406 */
407ssize_t wait_on_sync_kiocb(struct kiocb *iocb) 407ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
408{ 408{
409 while (iocb->ki_users) { 409 while (atomic_read(&iocb->ki_users)) {
410 set_current_state(TASK_UNINTERRUPTIBLE); 410 set_current_state(TASK_UNINTERRUPTIBLE);
411 if (!iocb->ki_users) 411 if (!atomic_read(&iocb->ki_users))
412 break; 412 break;
413 io_schedule(); 413 io_schedule();
414 } 414 }
@@ -438,7 +438,7 @@ void exit_aio(struct mm_struct *mm)
438 printk(KERN_DEBUG 438 printk(KERN_DEBUG
439 "exit_aio:ioctx still alive: %d %d %d\n", 439 "exit_aio:ioctx still alive: %d %d %d\n",
440 atomic_read(&ctx->users), ctx->dead, 440 atomic_read(&ctx->users), ctx->dead,
441 ctx->reqs_active); 441 atomic_read(&ctx->reqs_active));
442 /* 442 /*
443 * We don't need to bother with munmap() here - 443 * We don't need to bother with munmap() here -
444 * exit_mmap(mm) is coming and it'll unmap everything. 444 * exit_mmap(mm) is coming and it'll unmap everything.
@@ -453,11 +453,11 @@ void exit_aio(struct mm_struct *mm)
453} 453}
454 454
455/* aio_get_req 455/* aio_get_req
456 * Allocate a slot for an aio request. Increments the users count 456 * Allocate a slot for an aio request. Increments the ki_users count
457 * of the kioctx so that the kioctx stays around until all requests are 457 * of the kioctx so that the kioctx stays around until all requests are
458 * complete. Returns NULL if no requests are free. 458 * complete. Returns NULL if no requests are free.
459 * 459 *
460 * Returns with kiocb->users set to 2. The io submit code path holds 460 * Returns with kiocb->ki_users set to 2. The io submit code path holds
461 * an extra reference while submitting the i/o. 461 * an extra reference while submitting the i/o.
462 * This prevents races between the aio code path referencing the 462 * This prevents races between the aio code path referencing the
463 * req (after submitting it) and aio_complete() freeing the req. 463 * req (after submitting it) and aio_complete() freeing the req.
@@ -471,7 +471,7 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
471 return NULL; 471 return NULL;
472 472
473 req->ki_flags = 0; 473 req->ki_flags = 0;
474 req->ki_users = 2; 474 atomic_set(&req->ki_users, 2);
475 req->ki_key = 0; 475 req->ki_key = 0;
476 req->ki_ctx = ctx; 476 req->ki_ctx = ctx;
477 req->ki_cancel = NULL; 477 req->ki_cancel = NULL;
@@ -512,9 +512,9 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
512 list_del(&req->ki_batch); 512 list_del(&req->ki_batch);
513 list_del(&req->ki_list); 513 list_del(&req->ki_list);
514 kmem_cache_free(kiocb_cachep, req); 514 kmem_cache_free(kiocb_cachep, req);
515 ctx->reqs_active--; 515 atomic_dec(&ctx->reqs_active);
516 } 516 }
517 if (unlikely(!ctx->reqs_active && ctx->dead)) 517 if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
518 wake_up_all(&ctx->wait); 518 wake_up_all(&ctx->wait);
519 spin_unlock_irq(&ctx->ctx_lock); 519 spin_unlock_irq(&ctx->ctx_lock);
520} 520}
@@ -545,7 +545,8 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
545 spin_lock_irq(&ctx->ctx_lock); 545 spin_lock_irq(&ctx->ctx_lock);
546 ring = kmap_atomic(ctx->ring_info.ring_pages[0]); 546 ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
547 547
548 avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active; 548 avail = aio_ring_avail(&ctx->ring_info, ring) -
549 atomic_read(&ctx->reqs_active);
549 BUG_ON(avail < 0); 550 BUG_ON(avail < 0);
550 if (avail < allocated) { 551 if (avail < allocated) {
551 /* Trim back the number of requests. */ 552 /* Trim back the number of requests. */
@@ -560,7 +561,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
560 batch->count -= allocated; 561 batch->count -= allocated;
561 list_for_each_entry(req, &batch->head, ki_batch) { 562 list_for_each_entry(req, &batch->head, ki_batch) {
562 list_add(&req->ki_list, &ctx->active_reqs); 563 list_add(&req->ki_list, &ctx->active_reqs);
563 ctx->reqs_active++; 564 atomic_inc(&ctx->reqs_active);
564 } 565 }
565 566
566 kunmap_atomic(ring); 567 kunmap_atomic(ring);
@@ -583,10 +584,8 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx,
583 return req; 584 return req;
584} 585}
585 586
586static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) 587static void kiocb_free(struct kiocb *req)
587{ 588{
588 assert_spin_locked(&ctx->ctx_lock);
589
590 if (req->ki_filp) 589 if (req->ki_filp)
591 fput(req->ki_filp); 590 fput(req->ki_filp);
592 if (req->ki_eventfd != NULL) 591 if (req->ki_eventfd != NULL)
@@ -596,40 +595,12 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
596 if (req->ki_iovec != &req->ki_inline_vec) 595 if (req->ki_iovec != &req->ki_inline_vec)
597 kfree(req->ki_iovec); 596 kfree(req->ki_iovec);
598 kmem_cache_free(kiocb_cachep, req); 597 kmem_cache_free(kiocb_cachep, req);
599 ctx->reqs_active--;
600
601 if (unlikely(!ctx->reqs_active && ctx->dead))
602 wake_up_all(&ctx->wait);
603} 598}
604 599
605/* __aio_put_req
606 * Returns true if this put was the last user of the request.
607 */
608static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
609{
610 assert_spin_locked(&ctx->ctx_lock);
611
612 req->ki_users--;
613 BUG_ON(req->ki_users < 0);
614 if (likely(req->ki_users))
615 return;
616 list_del(&req->ki_list); /* remove from active_reqs */
617 req->ki_cancel = NULL;
618 req->ki_retry = NULL;
619
620 really_put_req(ctx, req);
621}
622
623/* aio_put_req
624 * Returns true if this put was the last user of the kiocb,
625 * false if the request is still in use.
626 */
627void aio_put_req(struct kiocb *req) 600void aio_put_req(struct kiocb *req)
628{ 601{
629 struct kioctx *ctx = req->ki_ctx; 602 if (atomic_dec_and_test(&req->ki_users))
630 spin_lock_irq(&ctx->ctx_lock); 603 kiocb_free(req);
631 __aio_put_req(ctx, req);
632 spin_unlock_irq(&ctx->ctx_lock);
633} 604}
634EXPORT_SYMBOL(aio_put_req); 605EXPORT_SYMBOL(aio_put_req);
635 606
@@ -677,9 +648,9 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
677 * - the sync task helpfully left a reference to itself in the iocb 648 * - the sync task helpfully left a reference to itself in the iocb
678 */ 649 */
679 if (is_sync_kiocb(iocb)) { 650 if (is_sync_kiocb(iocb)) {
680 BUG_ON(iocb->ki_users != 1); 651 BUG_ON(atomic_read(&iocb->ki_users) != 1);
681 iocb->ki_user_data = res; 652 iocb->ki_user_data = res;
682 iocb->ki_users = 0; 653 atomic_set(&iocb->ki_users, 0);
683 wake_up_process(iocb->ki_obj.tsk); 654 wake_up_process(iocb->ki_obj.tsk);
684 return; 655 return;
685 } 656 }
@@ -694,6 +665,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
694 */ 665 */
695 spin_lock_irqsave(&ctx->ctx_lock, flags); 666 spin_lock_irqsave(&ctx->ctx_lock, flags);
696 667
668 list_del(&iocb->ki_list); /* remove from active_reqs */
669
697 /* 670 /*
698 * cancelled requests don't get events, userland was given one 671 * cancelled requests don't get events, userland was given one
699 * when the event got cancelled. 672 * when the event got cancelled.
@@ -740,7 +713,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
740 713
741put_rq: 714put_rq:
742 /* everything turned out well, dispose of the aiocb. */ 715 /* everything turned out well, dispose of the aiocb. */
743 __aio_put_req(ctx, iocb); 716 aio_put_req(iocb);
717 atomic_dec(&ctx->reqs_active);
744 718
745 /* 719 /*
746 * We have to order our ring_info tail store above and test 720 * We have to order our ring_info tail store above and test
@@ -905,7 +879,7 @@ static int read_events(struct kioctx *ctx,
905 break; 879 break;
906 /* Try to only show up in io wait if there are ops 880 /* Try to only show up in io wait if there are ops
907 * in flight */ 881 * in flight */
908 if (ctx->reqs_active) 882 if (atomic_read(&ctx->reqs_active))
909 io_schedule(); 883 io_schedule();
910 else 884 else
911 schedule(); 885 schedule();
@@ -1369,6 +1343,14 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1369 return 0; 1343 return 0;
1370 1344
1371out_put_req: 1345out_put_req:
1346 spin_lock_irq(&ctx->ctx_lock);
1347 list_del(&req->ki_list);
1348 spin_unlock_irq(&ctx->ctx_lock);
1349
1350 atomic_dec(&ctx->reqs_active);
1351 if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
1352 wake_up_all(&ctx->wait);
1353
1372 aio_put_req(req); /* drop extra ref to req */ 1354 aio_put_req(req); /* drop extra ref to req */
1373 aio_put_req(req); /* drop i/o ref to req */ 1355 aio_put_req(req); /* drop i/o ref to req */
1374 return ret; 1356 return ret;