aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorChuck Lever <cel@netapp.com>2006-03-20 13:44:32 -0500
committerTrond Myklebust <Trond.Myklebust@netapp.com>2006-03-20 13:44:32 -0500
commit462d5b3296b56289efec426499a83faad4c08d9e (patch)
treeeb4f9a0418e1190958603360cf9bcea0cbb67727 /fs
parent63ab46abc70b01cb0711301f5ddb08c1c0bb9b1c (diff)
NFS: make direct write path generate write requests concurrently
Duplicate infrastructure from direct read path that will allow write path to generate multiple write requests concurrently. This will enable us to add support for aio in this path. Temporarily we will lose the ability to do UNSTABLE writes followed by a COMMIT in the direct write path. However, all applications I am aware of that use NFS O_DIRECT currently write in relatively small chunks, so this should not be inconvenient in any way. Test plan: Millions of fsx-odirect ops. OraSim. Signed-off-by: Chuck Lever <cel@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/nfs/direct.c240
-rw-r--r--fs/nfs/write.c3
2 files changed, 160 insertions, 83 deletions
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 4df21ce28e17..dea3239cdded 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -384,106 +384,185 @@ static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size
384 return result; 384 return result;
385} 385}
386 386
387static ssize_t nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset, struct page **pages, int nr_pages) 387static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
388{ 388{
389 const unsigned int wsize = NFS_SERVER(inode)->wsize; 389 struct list_head *list;
390 size_t request; 390 struct nfs_direct_req *dreq;
391 int curpage, need_commit; 391 unsigned int writes = 0;
392 ssize_t result, tot_bytes; 392 unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
393 struct nfs_writeverf first_verf;
394 struct nfs_write_data *wdata;
395
396 wdata = nfs_writedata_alloc(NFS_SERVER(inode)->wpages);
397 if (!wdata)
398 return -ENOMEM;
399 393
400 wdata->inode = inode; 394 dreq = nfs_direct_req_alloc();
401 wdata->cred = ctx->cred; 395 if (!dreq)
402 wdata->args.fh = NFS_FH(inode); 396 return NULL;
403 wdata->args.context = ctx; 397
404 wdata->args.stable = NFS_UNSTABLE; 398 list = &dreq->list;
405 if (IS_SYNC(inode) || NFS_PROTO(inode)->version == 2 || count <= wsize) 399 for(;;) {
406 wdata->args.stable = NFS_FILE_SYNC; 400 struct nfs_write_data *data = nfs_writedata_alloc(wpages);
407 wdata->res.fattr = &wdata->fattr; 401
408 wdata->res.verf = &wdata->verf; 402 if (unlikely(!data)) {
403 while (!list_empty(list)) {
404 data = list_entry(list->next,
405 struct nfs_write_data, pages);
406 list_del(&data->pages);
407 nfs_writedata_free(data);
408 }
409 kref_put(&dreq->kref, nfs_direct_req_release);
410 return NULL;
411 }
412
413 INIT_LIST_HEAD(&data->pages);
414 list_add(&data->pages, list);
415
416 data->req = (struct nfs_page *) dreq;
417 writes++;
418 if (nbytes <= wsize)
419 break;
420 nbytes -= wsize;
421 }
422 kref_get(&dreq->kref);
423 atomic_set(&dreq->complete, writes);
424 return dreq;
425}
426
427/*
428 * Collects and returns the final error value/byte-count.
429 */
430static ssize_t nfs_direct_write_wait(struct nfs_direct_req *dreq, int intr)
431{
432 int result = 0;
433
434 if (intr) {
435 result = wait_event_interruptible(dreq->wait,
436 (atomic_read(&dreq->complete) == 0));
437 } else {
438 wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0));
439 }
440
441 if (!result)
442 result = atomic_read(&dreq->error);
443 if (!result)
444 result = atomic_read(&dreq->count);
445
446 kref_put(&dreq->kref, nfs_direct_req_release);
447 return (ssize_t) result;
448}
449
450static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
451{
452 struct nfs_write_data *data = calldata;
453 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
454 int status = task->tk_status;
455
456 if (nfs_writeback_done(task, data) != 0)
457 return;
458 /* If the server fell back to an UNSTABLE write, it's an error. */
459 if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
460 status = -EIO;
461
462 if (likely(status >= 0))
463 atomic_add(data->res.count, &dreq->count);
464 else
465 atomic_set(&dreq->error, status);
466
467 if (unlikely(atomic_dec_and_test(&dreq->complete)))
468 nfs_direct_complete(dreq);
469}
470
471static const struct rpc_call_ops nfs_write_direct_ops = {
472 .rpc_call_done = nfs_direct_write_result,
473 .rpc_release = nfs_writedata_release,
474};
475
476/*
477 * For each nfs_write_data struct that was allocated on the list, dispatch
478 * an NFS WRITE operation
479 *
480 * XXX: For now, support only FILE_SYNC writes. Later we may add
481 * support for UNSTABLE + COMMIT.
482 */
483static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset)
484{
485 struct list_head *list = &dreq->list;
486 struct page **pages = dreq->pages;
487 size_t wsize = NFS_SERVER(inode)->wsize;
488 unsigned int curpage, pgbase;
409 489
410 nfs_begin_data_update(inode);
411retry:
412 need_commit = 0;
413 tot_bytes = 0;
414 curpage = 0; 490 curpage = 0;
415 request = count; 491 pgbase = user_addr & ~PAGE_MASK;
416 wdata->args.pgbase = user_addr & ~PAGE_MASK;
417 wdata->args.offset = file_offset;
418 do { 492 do {
419 wdata->args.count = request; 493 struct nfs_write_data *data;
420 if (wdata->args.count > wsize) 494 size_t bytes;
421 wdata->args.count = wsize; 495
422 wdata->args.pages = &pages[curpage]; 496 bytes = wsize;
497 if (count < wsize)
498 bytes = count;
499
500 data = list_entry(list->next, struct nfs_write_data, pages);
501 list_del_init(&data->pages);
502
503 data->inode = inode;
504 data->cred = ctx->cred;
505 data->args.fh = NFS_FH(inode);
506 data->args.context = ctx;
507 data->args.offset = file_offset;
508 data->args.pgbase = pgbase;
509 data->args.pages = &pages[curpage];
510 data->args.count = bytes;
511 data->res.fattr = &data->fattr;
512 data->res.count = bytes;
513
514 rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
515 &nfs_write_direct_ops, data);
516 NFS_PROTO(inode)->write_setup(data, FLUSH_STABLE);
423 517
424 dprintk("NFS: direct write: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n", 518 data->task.tk_priority = RPC_PRIORITY_NORMAL;
425 wdata->args.count, (long long) wdata->args.offset, 519 data->task.tk_cookie = (unsigned long) inode;
426 user_addr + tot_bytes, wdata->args.pgbase, curpage);
427 520
428 lock_kernel(); 521 lock_kernel();
429 result = NFS_PROTO(inode)->write(wdata); 522 rpc_execute(&data->task);
430 unlock_kernel(); 523 unlock_kernel();
431 524
432 if (result <= 0) { 525 dfprintk(VFS, "NFS: %4d initiated direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
433 if (tot_bytes > 0) 526 data->task.tk_pid,
434 break; 527 inode->i_sb->s_id,
435 goto out; 528 (long long)NFS_FILEID(inode),
436 } 529 bytes,
530 (unsigned long long)data->args.offset);
437 531
438 if (tot_bytes == 0) 532 file_offset += bytes;
439 memcpy(&first_verf.verifier, &wdata->verf.verifier, 533 pgbase += bytes;
440 sizeof(first_verf.verifier)); 534 curpage += pgbase >> PAGE_SHIFT;
441 if (wdata->verf.committed != NFS_FILE_SYNC) { 535 pgbase &= ~PAGE_MASK;
442 need_commit = 1;
443 if (memcmp(&first_verf.verifier, &wdata->verf.verifier,
444 sizeof(first_verf.verifier)))
445 goto sync_retry;
446 }
447 536
448 tot_bytes += result; 537 count -= bytes;
538 } while (count != 0);
539}
449 540
450 /* in case of a short write: stop now, let the app recover */ 541static ssize_t nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset, struct page **pages, int nr_pages)
451 if (result < wdata->args.count) 542{
452 break; 543 ssize_t result;
544 sigset_t oldset;
545 struct rpc_clnt *clnt = NFS_CLIENT(inode);
546 struct nfs_direct_req *dreq;
453 547
454 wdata->args.offset += result; 548 dreq = nfs_direct_write_alloc(count, NFS_SERVER(inode)->wsize);
455 wdata->args.pgbase += result; 549 if (!dreq)
456 curpage += wdata->args.pgbase >> PAGE_SHIFT; 550 return -ENOMEM;
457 wdata->args.pgbase &= ~PAGE_MASK;
458 request -= result;
459 } while (request != 0);
460 551
461 /* 552 dreq->pages = pages;
462 * Commit data written so far, even in the event of an error 553 dreq->npages = nr_pages;
463 */
464 if (need_commit) {
465 wdata->args.count = tot_bytes;
466 wdata->args.offset = file_offset;
467 554
468 lock_kernel(); 555 nfs_begin_data_update(inode);
469 result = NFS_PROTO(inode)->commit(wdata);
470 unlock_kernel();
471 556
472 if (result < 0 || memcmp(&first_verf.verifier, 557 rpc_clnt_sigmask(clnt, &oldset);
473 &wdata->verf.verifier, 558 nfs_direct_write_schedule(dreq, inode, ctx, user_addr, count,
474 sizeof(first_verf.verifier)) != 0) 559 file_offset);
475 goto sync_retry; 560 result = nfs_direct_write_wait(dreq, clnt->cl_intr);
476 } 561 rpc_clnt_sigunmask(clnt, &oldset);
477 result = tot_bytes;
478 562
479out:
480 nfs_end_data_update(inode); 563 nfs_end_data_update(inode);
481 nfs_writedata_free(wdata);
482 return result;
483 564
484sync_retry: 565 return result;
485 wdata->args.stable = NFS_FILE_SYNC;
486 goto retry;
487} 566}
488 567
489/* 568/*
@@ -515,7 +594,6 @@ static ssize_t nfs_direct_write(struct inode *inode, struct nfs_open_context *ct
515 nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, size); 594 nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, size);
516 result = nfs_direct_write_seg(inode, ctx, user_addr, size, 595 result = nfs_direct_write_seg(inode, ctx, user_addr, size,
517 file_offset, pages, page_count); 596 file_offset, pages, page_count);
518 nfs_free_user_pages(pages, page_count, 0);
519 597
520 if (result <= 0) { 598 if (result <= 0) {
521 if (tot_bytes > 0) 599 if (tot_bytes > 0)
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 5912274ff1a1..875f5b060533 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -77,7 +77,6 @@ static struct nfs_page * nfs_update_request(struct nfs_open_context*,
77 struct inode *, 77 struct inode *,
78 struct page *, 78 struct page *,
79 unsigned int, unsigned int); 79 unsigned int, unsigned int);
80static int nfs_writeback_done(struct rpc_task *, struct nfs_write_data *);
81static int nfs_wait_on_write_congestion(struct address_space *, int); 80static int nfs_wait_on_write_congestion(struct address_space *, int);
82static int nfs_wait_on_requests(struct inode *, unsigned long, unsigned int); 81static int nfs_wait_on_requests(struct inode *, unsigned long, unsigned int);
83static int nfs_flush_inode(struct inode *inode, unsigned long idx_start, 82static int nfs_flush_inode(struct inode *inode, unsigned long idx_start,
@@ -1183,7 +1182,7 @@ static const struct rpc_call_ops nfs_write_full_ops = {
1183/* 1182/*
1184 * This function is called when the WRITE call is complete. 1183 * This function is called when the WRITE call is complete.
1185 */ 1184 */
1186static int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data) 1185int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
1187{ 1186{
1188 struct nfs_writeargs *argp = &data->args; 1187 struct nfs_writeargs *argp = &data->args;
1189 struct nfs_writeres *resp = &data->res; 1188 struct nfs_writeres *resp = &data->res;