aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKent Overstreet <koverstreet@google.com>2013-05-07 19:18:41 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-07 21:38:28 -0400
commit36f5588905c10a8c4568a210d601fe8c3c27e0f0 (patch)
treef9d313abfd454b27c38df609b41cd0e4a071e0a9
parent11599ebac4a249ab3c8b9a535c21db7a51458c0a (diff)
aio: refcounting cleanup
The usage of ctx->dead was fubar - it makes no sense to explicitly check it all over the place, especially when we're already using RCU. Now, ctx->dead only indicates whether we've dropped the initial refcount. The new teardown sequence is: set ctx->dead hlist_del_rcu(); synchronize_rcu(); Now we know no system calls can take a new ref, and it's safe to drop the initial ref: put_ioctx(); We also need to ensure there are no more outstanding kiocbs. This was done incorrectly - it was being done in kill_ctx(), and before dropping the initial refcount. At this point, other syscalls may still be submitting kiocbs! Now, we cancel and wait for outstanding kiocbs in free_ioctx(), after kioctx->users has dropped to 0 and we know no more iocbs could be submitted. [akpm@linux-foundation.org: coding-style fixes] Signed-off-by: Kent Overstreet <koverstreet@google.com> Cc: Zach Brown <zab@redhat.com> Cc: Felipe Balbi <balbi@ti.com> Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org> Cc: Mark Fasheh <mfasheh@suse.com> Cc: Joel Becker <jlbec@evilplan.org> Cc: Rusty Russell <rusty@rustcorp.com.au> Cc: Jens Axboe <axboe@kernel.dk> Cc: Asai Thambi S P <asamymuthupa@micron.com> Cc: Selvan Mani <smani@micron.com> Cc: Sam Bradshaw <sbradshaw@micron.com> Cc: Jeff Moyer <jmoyer@redhat.com> Cc: Al Viro <viro@zeniv.linux.org.uk> Cc: Benjamin LaHaise <bcrl@kvack.org> Reviewed-by: "Theodore Ts'o" <tytso@mit.edu> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
-rw-r--r--fs/aio.c272
1 files changed, 119 insertions, 153 deletions
diff --git a/fs/aio.c b/fs/aio.c
index f877417f3c42..96f55bf207ed 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -79,7 +79,7 @@ static inline unsigned aio_ring_avail(struct aio_ring_info *info,
79 79
80struct kioctx { 80struct kioctx {
81 atomic_t users; 81 atomic_t users;
82 int dead; 82 atomic_t dead;
83 83
84 /* This needs improving */ 84 /* This needs improving */
85 unsigned long user_id; 85 unsigned long user_id;
@@ -98,6 +98,7 @@ struct kioctx {
98 struct aio_ring_info ring_info; 98 struct aio_ring_info ring_info;
99 99
100 struct rcu_head rcu_head; 100 struct rcu_head rcu_head;
101 struct work_struct rcu_work;
101}; 102};
102 103
103/*------ sysctl variables----*/ 104/*------ sysctl variables----*/
@@ -237,44 +238,6 @@ static int aio_setup_ring(struct kioctx *ctx)
237 kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \ 238 kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
238} while(0) 239} while(0)
239 240
240static void ctx_rcu_free(struct rcu_head *head)
241{
242 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
243 kmem_cache_free(kioctx_cachep, ctx);
244}
245
246/* __put_ioctx
247 * Called when the last user of an aio context has gone away,
248 * and the struct needs to be freed.
249 */
250static void __put_ioctx(struct kioctx *ctx)
251{
252 unsigned nr_events = ctx->max_reqs;
253 BUG_ON(atomic_read(&ctx->reqs_active));
254
255 aio_free_ring(ctx);
256 if (nr_events) {
257 spin_lock(&aio_nr_lock);
258 BUG_ON(aio_nr - nr_events > aio_nr);
259 aio_nr -= nr_events;
260 spin_unlock(&aio_nr_lock);
261 }
262 pr_debug("freeing %p\n", ctx);
263 call_rcu(&ctx->rcu_head, ctx_rcu_free);
264}
265
266static inline int try_get_ioctx(struct kioctx *kioctx)
267{
268 return atomic_inc_not_zero(&kioctx->users);
269}
270
271static inline void put_ioctx(struct kioctx *kioctx)
272{
273 BUG_ON(atomic_read(&kioctx->users) <= 0);
274 if (unlikely(atomic_dec_and_test(&kioctx->users)))
275 __put_ioctx(kioctx);
276}
277
278static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, 241static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
279 struct io_event *res) 242 struct io_event *res)
280{ 243{
@@ -298,6 +261,61 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
298 return ret; 261 return ret;
299} 262}
300 263
264static void free_ioctx_rcu(struct rcu_head *head)
265{
266 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
267 kmem_cache_free(kioctx_cachep, ctx);
268}
269
270/*
271 * When this function runs, the kioctx has been removed from the "hash table"
272 * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
273 * now it's safe to cancel any that need to be.
274 */
275static void free_ioctx(struct kioctx *ctx)
276{
277 struct io_event res;
278 struct kiocb *req;
279
280 spin_lock_irq(&ctx->ctx_lock);
281
282 while (!list_empty(&ctx->active_reqs)) {
283 req = list_first_entry(&ctx->active_reqs,
284 struct kiocb, ki_list);
285
286 list_del_init(&req->ki_list);
287 kiocb_cancel(ctx, req, &res);
288 }
289
290 spin_unlock_irq(&ctx->ctx_lock);
291
292 wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
293
294 aio_free_ring(ctx);
295
296 spin_lock(&aio_nr_lock);
297 BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
298 aio_nr -= ctx->max_reqs;
299 spin_unlock(&aio_nr_lock);
300
301 pr_debug("freeing %p\n", ctx);
302
303 /*
304 * Here the call_rcu() is between the wait_event() for reqs_active to
305 * hit 0, and freeing the ioctx.
306 *
307 * aio_complete() decrements reqs_active, but it has to touch the ioctx
308 * after to issue a wakeup so we use rcu.
309 */
310 call_rcu(&ctx->rcu_head, free_ioctx_rcu);
311}
312
313static void put_ioctx(struct kioctx *ctx)
314{
315 if (unlikely(atomic_dec_and_test(&ctx->users)))
316 free_ioctx(ctx);
317}
318
301/* ioctx_alloc 319/* ioctx_alloc
302 * Allocates and initializes an ioctx. Returns an ERR_PTR if it failed. 320 * Allocates and initializes an ioctx. Returns an ERR_PTR if it failed.
303 */ 321 */
@@ -324,6 +342,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
324 ctx->max_reqs = nr_events; 342 ctx->max_reqs = nr_events;
325 343
326 atomic_set(&ctx->users, 2); 344 atomic_set(&ctx->users, 2);
345 atomic_set(&ctx->dead, 0);
327 spin_lock_init(&ctx->ctx_lock); 346 spin_lock_init(&ctx->ctx_lock);
328 spin_lock_init(&ctx->ring_info.ring_lock); 347 spin_lock_init(&ctx->ring_info.ring_lock);
329 init_waitqueue_head(&ctx->wait); 348 init_waitqueue_head(&ctx->wait);
@@ -361,44 +380,43 @@ out_freectx:
361 return ERR_PTR(err); 380 return ERR_PTR(err);
362} 381}
363 382
364/* kill_ctx 383static void kill_ioctx_work(struct work_struct *work)
365 * Cancels all outstanding aio requests on an aio context. Used
366 * when the processes owning a context have all exited to encourage
367 * the rapid destruction of the kioctx.
368 */
369static void kill_ctx(struct kioctx *ctx)
370{ 384{
371 struct task_struct *tsk = current; 385 struct kioctx *ctx = container_of(work, struct kioctx, rcu_work);
372 DECLARE_WAITQUEUE(wait, tsk);
373 struct io_event res;
374 struct kiocb *req;
375 386
376 spin_lock_irq(&ctx->ctx_lock); 387 wake_up_all(&ctx->wait);
377 ctx->dead = 1; 388 put_ioctx(ctx);
378 while (!list_empty(&ctx->active_reqs)) { 389}
379 req = list_first_entry(&ctx->active_reqs,
380 struct kiocb, ki_list);
381 390
382 list_del_init(&req->ki_list); 391static void kill_ioctx_rcu(struct rcu_head *head)
383 kiocb_cancel(ctx, req, &res); 392{
384 } 393 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
385 394
386 if (!atomic_read(&ctx->reqs_active)) 395 INIT_WORK(&ctx->rcu_work, kill_ioctx_work);
387 goto out; 396 schedule_work(&ctx->rcu_work);
397}
388 398
389 add_wait_queue(&ctx->wait, &wait); 399/* kill_ioctx
390 set_task_state(tsk, TASK_UNINTERRUPTIBLE); 400 * Cancels all outstanding aio requests on an aio context. Used
391 while (atomic_read(&ctx->reqs_active)) { 401 * when the processes owning a context have all exited to encourage
392 spin_unlock_irq(&ctx->ctx_lock); 402 * the rapid destruction of the kioctx.
393 io_schedule(); 403 */
394 set_task_state(tsk, TASK_UNINTERRUPTIBLE); 404static void kill_ioctx(struct kioctx *ctx)
395 spin_lock_irq(&ctx->ctx_lock); 405{
396 } 406 if (!atomic_xchg(&ctx->dead, 1)) {
397 __set_task_state(tsk, TASK_RUNNING); 407 hlist_del_rcu(&ctx->list);
398 remove_wait_queue(&ctx->wait, &wait); 408 /* Between hlist_del_rcu() and dropping the initial ref */
409 synchronize_rcu();
399 410
400out: 411 /*
401 spin_unlock_irq(&ctx->ctx_lock); 412 * We can't punt to workqueue here because put_ioctx() ->
413 * free_ioctx() will unmap the ringbuffer, and that has to be
414 * done in the original process's context. kill_ioctx_rcu/work()
415 * exist for exit_aio(), as in that path free_ioctx() won't do
416 * the unmap.
417 */
418 kill_ioctx_work(&ctx->rcu_work);
419 }
402} 420}
403 421
404/* wait_on_sync_kiocb: 422/* wait_on_sync_kiocb:
@@ -417,27 +435,25 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
417} 435}
418EXPORT_SYMBOL(wait_on_sync_kiocb); 436EXPORT_SYMBOL(wait_on_sync_kiocb);
419 437
420/* exit_aio: called when the last user of mm goes away. At this point, 438/*
421 * there is no way for any new requests to be submited or any of the 439 * exit_aio: called when the last user of mm goes away. At this point, there is
422 * io_* syscalls to be called on the context. However, there may be 440 * no way for any new requests to be submited or any of the io_* syscalls to be
423 * outstanding requests which hold references to the context; as they 441 * called on the context.
424 * go away, they will call put_ioctx and release any pinned memory 442 *
425 * associated with the request (held via struct page * references). 443 * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on
444 * them.
426 */ 445 */
427void exit_aio(struct mm_struct *mm) 446void exit_aio(struct mm_struct *mm)
428{ 447{
429 struct kioctx *ctx; 448 struct kioctx *ctx;
449 struct hlist_node *n;
430 450
431 while (!hlist_empty(&mm->ioctx_list)) { 451 hlist_for_each_entry_safe(ctx, n, &mm->ioctx_list, list) {
432 ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
433 hlist_del_rcu(&ctx->list);
434
435 kill_ctx(ctx);
436
437 if (1 != atomic_read(&ctx->users)) 452 if (1 != atomic_read(&ctx->users))
438 printk(KERN_DEBUG 453 printk(KERN_DEBUG
439 "exit_aio:ioctx still alive: %d %d %d\n", 454 "exit_aio:ioctx still alive: %d %d %d\n",
440 atomic_read(&ctx->users), ctx->dead, 455 atomic_read(&ctx->users),
456 atomic_read(&ctx->dead),
441 atomic_read(&ctx->reqs_active)); 457 atomic_read(&ctx->reqs_active));
442 /* 458 /*
443 * We don't need to bother with munmap() here - 459 * We don't need to bother with munmap() here -
@@ -448,7 +464,11 @@ void exit_aio(struct mm_struct *mm)
448 * place that uses ->mmap_size, so it's safe. 464 * place that uses ->mmap_size, so it's safe.
449 */ 465 */
450 ctx->ring_info.mmap_size = 0; 466 ctx->ring_info.mmap_size = 0;
451 put_ioctx(ctx); 467
468 if (!atomic_xchg(&ctx->dead, 1)) {
469 hlist_del_rcu(&ctx->list);
470 call_rcu(&ctx->rcu_head, kill_ioctx_rcu);
471 }
452 } 472 }
453} 473}
454 474
@@ -514,8 +534,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
514 kmem_cache_free(kiocb_cachep, req); 534 kmem_cache_free(kiocb_cachep, req);
515 atomic_dec(&ctx->reqs_active); 535 atomic_dec(&ctx->reqs_active);
516 } 536 }
517 if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
518 wake_up_all(&ctx->wait);
519 spin_unlock_irq(&ctx->ctx_lock); 537 spin_unlock_irq(&ctx->ctx_lock);
520} 538}
521 539
@@ -612,13 +630,8 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
612 rcu_read_lock(); 630 rcu_read_lock();
613 631
614 hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) { 632 hlist_for_each_entry_rcu(ctx, &mm->ioctx_list, list) {
615 /* 633 if (ctx->user_id == ctx_id) {
616 * RCU protects us against accessing freed memory but 634 atomic_inc(&ctx->users);
617 * we have to be careful not to get a reference when the
618 * reference count already dropped to 0 (ctx->dead test
619 * is unreliable because of races).
620 */
621 if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
622 ret = ctx; 635 ret = ctx;
623 break; 636 break;
624 } 637 }
@@ -657,12 +670,15 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
657 670
658 info = &ctx->ring_info; 671 info = &ctx->ring_info;
659 672
660 /* add a completion event to the ring buffer. 673 /*
661 * must be done holding ctx->ctx_lock to prevent 674 * Add a completion event to the ring buffer. Must be done holding
662 * other code from messing with the tail 675 * ctx->ctx_lock to prevent other code from messing with the tail
663 * pointer since we might be called from irq 676 * pointer since we might be called from irq context.
664 * context. 677 *
678 * Take rcu_read_lock() in case the kioctx is being destroyed, as we
679 * need to issue a wakeup after decrementing reqs_active.
665 */ 680 */
681 rcu_read_lock();
666 spin_lock_irqsave(&ctx->ctx_lock, flags); 682 spin_lock_irqsave(&ctx->ctx_lock, flags);
667 683
668 list_del(&iocb->ki_list); /* remove from active_reqs */ 684 list_del(&iocb->ki_list); /* remove from active_reqs */
@@ -728,6 +744,7 @@ put_rq:
728 wake_up(&ctx->wait); 744 wake_up(&ctx->wait);
729 745
730 spin_unlock_irqrestore(&ctx->ctx_lock, flags); 746 spin_unlock_irqrestore(&ctx->ctx_lock, flags);
747 rcu_read_unlock();
731} 748}
732EXPORT_SYMBOL(aio_complete); 749EXPORT_SYMBOL(aio_complete);
733 750
@@ -871,7 +888,7 @@ static int read_events(struct kioctx *ctx,
871 break; 888 break;
872 if (min_nr <= i) 889 if (min_nr <= i)
873 break; 890 break;
874 if (unlikely(ctx->dead)) { 891 if (unlikely(atomic_read(&ctx->dead))) {
875 ret = -EINVAL; 892 ret = -EINVAL;
876 break; 893 break;
877 } 894 }
@@ -914,35 +931,6 @@ out:
914 return i ? i : ret; 931 return i ? i : ret;
915} 932}
916 933
917/* Take an ioctx and remove it from the list of ioctx's. Protects
918 * against races with itself via ->dead.
919 */
920static void io_destroy(struct kioctx *ioctx)
921{
922 struct mm_struct *mm = current->mm;
923 int was_dead;
924
925 /* delete the entry from the list is someone else hasn't already */
926 spin_lock(&mm->ioctx_lock);
927 was_dead = ioctx->dead;
928 ioctx->dead = 1;
929 hlist_del_rcu(&ioctx->list);
930 spin_unlock(&mm->ioctx_lock);
931
932 pr_debug("(%p)\n", ioctx);
933 if (likely(!was_dead))
934 put_ioctx(ioctx); /* twice for the list */
935
936 kill_ctx(ioctx);
937
938 /*
939 * Wake up any waiters. The setting of ctx->dead must be seen
940 * by other CPUs at this point. Right now, we rely on the
941 * locking done by the above calls to ensure this consistency.
942 */
943 wake_up_all(&ioctx->wait);
944}
945
946/* sys_io_setup: 934/* sys_io_setup:
947 * Create an aio_context capable of receiving at least nr_events. 935 * Create an aio_context capable of receiving at least nr_events.
948 * ctxp must not point to an aio_context that already exists, and 936 * ctxp must not point to an aio_context that already exists, and
@@ -978,7 +966,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
978 if (!IS_ERR(ioctx)) { 966 if (!IS_ERR(ioctx)) {
979 ret = put_user(ioctx->user_id, ctxp); 967 ret = put_user(ioctx->user_id, ctxp);
980 if (ret) 968 if (ret)
981 io_destroy(ioctx); 969 kill_ioctx(ioctx);
982 put_ioctx(ioctx); 970 put_ioctx(ioctx);
983 } 971 }
984 972
@@ -996,7 +984,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
996{ 984{
997 struct kioctx *ioctx = lookup_ioctx(ctx); 985 struct kioctx *ioctx = lookup_ioctx(ctx);
998 if (likely(NULL != ioctx)) { 986 if (likely(NULL != ioctx)) {
999 io_destroy(ioctx); 987 kill_ioctx(ioctx);
1000 put_ioctx(ioctx); 988 put_ioctx(ioctx);
1001 return 0; 989 return 0;
1002 } 990 }
@@ -1303,25 +1291,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
1303 if (ret) 1291 if (ret)
1304 goto out_put_req; 1292 goto out_put_req;
1305 1293
1306 spin_lock_irq(&ctx->ctx_lock);
1307 /*
1308 * We could have raced with io_destroy() and are currently holding a
1309 * reference to ctx which should be destroyed. We cannot submit IO
1310 * since ctx gets freed as soon as io_submit() puts its reference. The
1311 * check here is reliable: io_destroy() sets ctx->dead before waiting
1312 * for outstanding IO and the barrier between these two is realized by
1313 * unlock of mm->ioctx_lock and lock of ctx->ctx_lock. Analogously we
1314 * increment ctx->reqs_active before checking for ctx->dead and the
1315 * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
1316 * don't see ctx->dead set here, io_destroy() waits for our IO to
1317 * finish.
1318 */
1319 if (ctx->dead)
1320 ret = -EINVAL;
1321 spin_unlock_irq(&ctx->ctx_lock);
1322 if (ret)
1323 goto out_put_req;
1324
1325 if (unlikely(kiocbIsCancelled(req))) 1294 if (unlikely(kiocbIsCancelled(req)))
1326 ret = -EINTR; 1295 ret = -EINTR;
1327 else 1296 else
@@ -1348,9 +1317,6 @@ out_put_req:
1348 spin_unlock_irq(&ctx->ctx_lock); 1317 spin_unlock_irq(&ctx->ctx_lock);
1349 1318
1350 atomic_dec(&ctx->reqs_active); 1319 atomic_dec(&ctx->reqs_active);
1351 if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
1352 wake_up_all(&ctx->wait);
1353
1354 aio_put_req(req); /* drop extra ref to req */ 1320 aio_put_req(req); /* drop extra ref to req */
1355 aio_put_req(req); /* drop i/o ref to req */ 1321 aio_put_req(req); /* drop i/o ref to req */
1356 return ret; 1322 return ret;