aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorJens Axboe <jens.axboe@oracle.com>2008-12-09 02:11:22 -0500
committerJens Axboe <jens.axboe@oracle.com>2008-12-29 02:29:50 -0500
commitabf137dd7712132ee56d5b3143c2ff61a72a5faa (patch)
tree8334f03c598343bb93340f081fcde5ba659b440b /fs
parent392ddc32982a5c661dd90dd49a3cb37f1c68b782 (diff)
aio: make the lookup_ioctx() lockless
The mm->ioctx_list is currently protected by a reader-writer lock, so we always grab that lock on the read side for doing ioctx lookups. As the workload is extremely reader biased, turn this into an rcu hlist so we can make lookup_ioctx() lockless. Get rid of the rwlock and use a spinlock for providing update side exclusion. There's usually only 1 entry on this list, so it doesn't make sense to look into fancier data structures. Reviewed-by: Jeff Moyer <jmoyer@redhat.com> Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/aio.c100
1 files changed, 56 insertions, 44 deletions
diff --git a/fs/aio.c b/fs/aio.c
index f658441d5666..d6f89d3c15e8 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -191,6 +191,20 @@ static int aio_setup_ring(struct kioctx *ctx)
191 kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \ 191 kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \
192} while(0) 192} while(0)
193 193
194static void ctx_rcu_free(struct rcu_head *head)
195{
196 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
197 unsigned nr_events = ctx->max_reqs;
198
199 kmem_cache_free(kioctx_cachep, ctx);
200
201 if (nr_events) {
202 spin_lock(&aio_nr_lock);
203 BUG_ON(aio_nr - nr_events > aio_nr);
204 aio_nr -= nr_events;
205 spin_unlock(&aio_nr_lock);
206 }
207}
194 208
195/* __put_ioctx 209/* __put_ioctx
196 * Called when the last user of an aio context has gone away, 210 * Called when the last user of an aio context has gone away,
@@ -198,8 +212,6 @@ static int aio_setup_ring(struct kioctx *ctx)
198 */ 212 */
199static void __put_ioctx(struct kioctx *ctx) 213static void __put_ioctx(struct kioctx *ctx)
200{ 214{
201 unsigned nr_events = ctx->max_reqs;
202
203 BUG_ON(ctx->reqs_active); 215 BUG_ON(ctx->reqs_active);
204 216
205 cancel_delayed_work(&ctx->wq); 217 cancel_delayed_work(&ctx->wq);
@@ -208,14 +220,7 @@ static void __put_ioctx(struct kioctx *ctx)
208 mmdrop(ctx->mm); 220 mmdrop(ctx->mm);
209 ctx->mm = NULL; 221 ctx->mm = NULL;
210 pr_debug("__put_ioctx: freeing %p\n", ctx); 222 pr_debug("__put_ioctx: freeing %p\n", ctx);
211 kmem_cache_free(kioctx_cachep, ctx); 223 call_rcu(&ctx->rcu_head, ctx_rcu_free);
212
213 if (nr_events) {
214 spin_lock(&aio_nr_lock);
215 BUG_ON(aio_nr - nr_events > aio_nr);
216 aio_nr -= nr_events;
217 spin_unlock(&aio_nr_lock);
218 }
219} 224}
220 225
221#define get_ioctx(kioctx) do { \ 226#define get_ioctx(kioctx) do { \
@@ -235,6 +240,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
235{ 240{
236 struct mm_struct *mm; 241 struct mm_struct *mm;
237 struct kioctx *ctx; 242 struct kioctx *ctx;
243 int did_sync = 0;
238 244
239 /* Prevent overflows */ 245 /* Prevent overflows */
240 if ((nr_events > (0x10000000U / sizeof(struct io_event))) || 246 if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
@@ -267,21 +273,30 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
267 goto out_freectx; 273 goto out_freectx;
268 274
269 /* limit the number of system wide aios */ 275 /* limit the number of system wide aios */
270 spin_lock(&aio_nr_lock); 276 do {
271 if (aio_nr + ctx->max_reqs > aio_max_nr || 277 spin_lock_bh(&aio_nr_lock);
272 aio_nr + ctx->max_reqs < aio_nr) 278 if (aio_nr + nr_events > aio_max_nr ||
273 ctx->max_reqs = 0; 279 aio_nr + nr_events < aio_nr)
274 else 280 ctx->max_reqs = 0;
275 aio_nr += ctx->max_reqs; 281 else
276 spin_unlock(&aio_nr_lock); 282 aio_nr += ctx->max_reqs;
283 spin_unlock_bh(&aio_nr_lock);
284 if (ctx->max_reqs || did_sync)
285 break;
286
287 /* wait for rcu callbacks to have completed before giving up */
288 synchronize_rcu();
289 did_sync = 1;
290 ctx->max_reqs = nr_events;
291 } while (1);
292
277 if (ctx->max_reqs == 0) 293 if (ctx->max_reqs == 0)
278 goto out_cleanup; 294 goto out_cleanup;
279 295
280 /* now link into global list. */ 296 /* now link into global list. */
281 write_lock(&mm->ioctx_list_lock); 297 spin_lock(&mm->ioctx_lock);
282 ctx->next = mm->ioctx_list; 298 hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
283 mm->ioctx_list = ctx; 299 spin_unlock(&mm->ioctx_lock);
284 write_unlock(&mm->ioctx_list_lock);
285 300
286 dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n", 301 dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
287 ctx, ctx->user_id, current->mm, ctx->ring_info.nr); 302 ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
@@ -375,11 +390,12 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
375 */ 390 */
376void exit_aio(struct mm_struct *mm) 391void exit_aio(struct mm_struct *mm)
377{ 392{
378 struct kioctx *ctx = mm->ioctx_list; 393 struct kioctx *ctx;
379 mm->ioctx_list = NULL; 394
380 while (ctx) { 395 while (!hlist_empty(&mm->ioctx_list)) {
381 struct kioctx *next = ctx->next; 396 ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
382 ctx->next = NULL; 397 hlist_del_rcu(&ctx->list);
398
383 aio_cancel_all(ctx); 399 aio_cancel_all(ctx);
384 400
385 wait_for_all_aios(ctx); 401 wait_for_all_aios(ctx);
@@ -394,7 +410,6 @@ void exit_aio(struct mm_struct *mm)
394 atomic_read(&ctx->users), ctx->dead, 410 atomic_read(&ctx->users), ctx->dead,
395 ctx->reqs_active); 411 ctx->reqs_active);
396 put_ioctx(ctx); 412 put_ioctx(ctx);
397 ctx = next;
398 } 413 }
399} 414}
400 415
@@ -555,19 +570,21 @@ int aio_put_req(struct kiocb *req)
555 570
556static struct kioctx *lookup_ioctx(unsigned long ctx_id) 571static struct kioctx *lookup_ioctx(unsigned long ctx_id)
557{ 572{
558 struct kioctx *ioctx; 573 struct mm_struct *mm = current->mm;
559 struct mm_struct *mm; 574 struct kioctx *ctx = NULL;
575 struct hlist_node *n;
560 576
561 mm = current->mm; 577 rcu_read_lock();
562 read_lock(&mm->ioctx_list_lock); 578
563 for (ioctx = mm->ioctx_list; ioctx; ioctx = ioctx->next) 579 hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
564 if (likely(ioctx->user_id == ctx_id && !ioctx->dead)) { 580 if (ctx->user_id == ctx_id && !ctx->dead) {
565 get_ioctx(ioctx); 581 get_ioctx(ctx);
566 break; 582 break;
567 } 583 }
568 read_unlock(&mm->ioctx_list_lock); 584 }
569 585
570 return ioctx; 586 rcu_read_unlock();
587 return ctx;
571} 588}
572 589
573/* 590/*
@@ -1215,19 +1232,14 @@ out:
1215static void io_destroy(struct kioctx *ioctx) 1232static void io_destroy(struct kioctx *ioctx)
1216{ 1233{
1217 struct mm_struct *mm = current->mm; 1234 struct mm_struct *mm = current->mm;
1218 struct kioctx **tmp;
1219 int was_dead; 1235 int was_dead;
1220 1236
1221 /* delete the entry from the list is someone else hasn't already */ 1237 /* delete the entry from the list is someone else hasn't already */
1222 write_lock(&mm->ioctx_list_lock); 1238 spin_lock(&mm->ioctx_lock);
1223 was_dead = ioctx->dead; 1239 was_dead = ioctx->dead;
1224 ioctx->dead = 1; 1240 ioctx->dead = 1;
1225 for (tmp = &mm->ioctx_list; *tmp && *tmp != ioctx; 1241 hlist_del_rcu(&ioctx->list);
1226 tmp = &(*tmp)->next) 1242 spin_unlock(&mm->ioctx_lock);
1227 ;
1228 if (*tmp)
1229 *tmp = ioctx->next;
1230 write_unlock(&mm->ioctx_list_lock);
1231 1243
1232 dprintk("aio_release(%p)\n", ioctx); 1244 dprintk("aio_release(%p)\n", ioctx);
1233 if (likely(!was_dead)) 1245 if (likely(!was_dead))