From 584aa810b6240d88c28113a90c5029449814a3b5 Mon Sep 17 00:00:00 2001 From: Fred Isaman Date: Fri, 20 Apr 2012 14:47:51 -0400 Subject: NFS: rewrite directio read to use async coalesce code This also has the advantage that it allows directio to use pnfs. Signed-off-by: Fred Isaman Signed-off-by: Trond Myklebust --- fs/nfs/direct.c | 255 +++++++++++++++++++++++++++----------------------------- 1 file changed, 123 insertions(+), 132 deletions(-) (limited to 'fs/nfs/direct.c') diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c index 22a40c408449..4ba9a2c839bb 100644 --- a/fs/nfs/direct.c +++ b/fs/nfs/direct.c @@ -124,22 +124,6 @@ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_ return -EINVAL; } -static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) -{ - unsigned int npages; - unsigned int i; - - if (count == 0) - return; - pages += (pgbase >> PAGE_SHIFT); - npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; - for (i = 0; i < npages; i++) { - struct page *page = pages[i]; - if (!PageCompound(page)) - set_page_dirty(page); - } -} - static void nfs_direct_release_pages(struct page **pages, unsigned int npages) { unsigned int i; @@ -226,58 +210,92 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq) nfs_direct_req_release(dreq); } -/* - * We must hold a reference to all the pages in this direct read request - * until the RPCs complete. This could be long *after* we are woken up in - * nfs_direct_wait (for instance, if someone hits ^C on a slow server). - */ -static void nfs_direct_read_result(struct rpc_task *task, void *calldata) +void nfs_direct_readpage_release(struct nfs_page *req) { - struct nfs_read_data *data = calldata; - - nfs_readpage_result(task, data); + dprintk("NFS: direct read done (%s/%lld %d@%lld)\n", + req->wb_context->dentry->d_inode->i_sb->s_id, + (long long)NFS_FILEID(req->wb_context->dentry->d_inode), + req->wb_bytes, + (long long)req_offset(req)); + nfs_release_request(req); } -static void nfs_direct_read_release(void *calldata) +static void nfs_direct_read_completion(struct nfs_pgio_header *hdr) { + unsigned long bytes = 0; + struct nfs_direct_req *dreq = hdr->dreq; - struct nfs_read_data *data = calldata; - struct nfs_direct_req *dreq = (struct nfs_direct_req *)data->header->req; - int status = data->task.tk_status; + if (test_bit(NFS_IOHDR_REDO, &hdr->flags)) + goto out_put; spin_lock(&dreq->lock); - if (unlikely(status < 0)) { - dreq->error = status; - spin_unlock(&dreq->lock); + if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0)) + dreq->error = hdr->error; + else + dreq->count += hdr->good_bytes; + spin_unlock(&dreq->lock); + + if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) { + while (!list_empty(&hdr->pages)) { + struct nfs_page *req = nfs_list_entry(hdr->pages.next); + struct page *page = req->wb_page; + + if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) { + if (bytes > hdr->good_bytes) + zero_user(page, 0, PAGE_SIZE); + else if (hdr->good_bytes - bytes < PAGE_SIZE) + zero_user_segment(page, + hdr->good_bytes & ~PAGE_MASK, + PAGE_SIZE); + } + bytes += req->wb_bytes; + nfs_list_remove_request(req); + nfs_direct_readpage_release(req); + if (!PageCompound(page)) + set_page_dirty(page); + page_cache_release(page); + } } else { - dreq->count += data->res.count; - spin_unlock(&dreq->lock); - nfs_direct_dirty_pages(data->pages.pagevec, - data->args.pgbase, - data->res.count); + while (!list_empty(&hdr->pages)) { + struct nfs_page *req = nfs_list_entry(hdr->pages.next); + + if (bytes < hdr->good_bytes) + if (!PageCompound(req->wb_page)) + set_page_dirty(req->wb_page); + bytes += req->wb_bytes; + page_cache_release(req->wb_page); + nfs_list_remove_request(req); + nfs_direct_readpage_release(req); + } } - nfs_direct_release_pages(data->pages.pagevec, data->pages.npages); - +out_put: if (put_dreq(dreq)) nfs_direct_complete(dreq); - nfs_readdata_release(data); + hdr->release(hdr); } -static const struct rpc_call_ops nfs_read_direct_ops = { - .rpc_call_prepare = nfs_read_prepare, - .rpc_call_done = nfs_direct_read_result, - .rpc_release = nfs_direct_read_release, -}; - -static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr) +static void nfs_sync_pgio_error(struct list_head *head) { - struct nfs_read_data *data = &rhdr->rpc_data; + struct nfs_page *req; - if (data->pages.pagevec != data->pages.page_array) - kfree(data->pages.pagevec); - nfs_readhdr_free(&rhdr->header); + while (!list_empty(head)) { + req = nfs_list_entry(head->next); + nfs_list_remove_request(req); + nfs_release_request(req); + } } +static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr) +{ + get_dreq(hdr->dreq); +} + +static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = { + .error_cleanup = nfs_sync_pgio_error, + .init_hdr = nfs_direct_pgio_init, + .completion = nfs_direct_read_completion, +}; + /* * For each rsize'd chunk of the user's buffer, dispatch an NFS READ * operation. If nfs_readdata_alloc() or get_user_pages() fails, @@ -285,118 +303,85 @@ static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr) * handled automatically by nfs_direct_read_result(). Otherwise, if * no requests have been sent, just return an error. */ -static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, +static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc, const struct iovec *iov, loff_t pos) { + struct nfs_direct_req *dreq = desc->pg_dreq; struct nfs_open_context *ctx = dreq->ctx; struct inode *inode = ctx->dentry->d_inode; unsigned long user_addr = (unsigned long)iov->iov_base; size_t count = iov->iov_len; size_t rsize = NFS_SERVER(inode)->rsize; - struct rpc_task *task; - struct rpc_message msg = { - .rpc_cred = ctx->cred, - }; - struct rpc_task_setup task_setup_data = { - .rpc_client = NFS_CLIENT(inode), - .rpc_message = &msg, - .callback_ops = &nfs_read_direct_ops, - .workqueue = nfsiod_workqueue, - .flags = RPC_TASK_ASYNC, - }; unsigned int pgbase; int result; ssize_t started = 0; + struct page **pagevec = NULL; + unsigned int npages; do { - struct nfs_read_header *rhdr; - struct nfs_read_data *data; - struct nfs_page_array *pages; size_t bytes; + int i; pgbase = user_addr & ~PAGE_MASK; - bytes = min(rsize,count); + bytes = min(max(rsize, PAGE_SIZE), count); result = -ENOMEM; - rhdr = nfs_readhdr_alloc(); - if (unlikely(!rhdr)) - break; - data = nfs_readdata_alloc(&rhdr->header, nfs_page_array_len(pgbase, bytes)); - if (!data) { - nfs_readhdr_free(&rhdr->header); + npages = nfs_page_array_len(pgbase, bytes); + if (!pagevec) + pagevec = kmalloc(npages * sizeof(struct page *), + GFP_KERNEL); + if (!pagevec) break; - } - data->header = &rhdr->header; - atomic_inc(&data->header->refcnt); - pages = &data->pages; - down_read(¤t->mm->mmap_sem); result = get_user_pages(current, current->mm, user_addr, - pages->npages, 1, 0, pages->pagevec, NULL); + npages, 1, 0, pagevec, NULL); up_read(¤t->mm->mmap_sem); - if (result < 0) { - nfs_direct_readhdr_release(rhdr); + if (result < 0) break; - } - if ((unsigned)result < pages->npages) { + if ((unsigned)result < npages) { bytes = result * PAGE_SIZE; if (bytes <= pgbase) { - nfs_direct_release_pages(pages->pagevec, result); - nfs_direct_readhdr_release(rhdr); + nfs_direct_release_pages(pagevec, result); break; } bytes -= pgbase; - pages->npages = result; + npages = result; } - get_dreq(dreq); - - rhdr->header.req = (struct nfs_page *) dreq; - rhdr->header.inode = inode; - rhdr->header.cred = msg.rpc_cred; - data->args.fh = NFS_FH(inode); - data->args.context = get_nfs_open_context(ctx); - data->args.lock_context = dreq->l_ctx; - data->args.offset = pos; - data->args.pgbase = pgbase; - data->args.pages = pages->pagevec; - data->args.count = bytes; - data->res.fattr = &data->fattr; - data->res.eof = 0; - data->res.count = bytes; - nfs_fattr_init(&data->fattr); - msg.rpc_argp = &data->args; - msg.rpc_resp = &data->res; - - task_setup_data.task = &data->task; - task_setup_data.callback_data = data; - NFS_PROTO(inode)->read_setup(data, &msg); - - task = rpc_run_task(&task_setup_data); - if (IS_ERR(task)) - break; - - dprintk("NFS: %5u initiated direct read call " - "(req %s/%Ld, %zu bytes @ offset %Lu)\n", - task->tk_pid, - inode->i_sb->s_id, - (long long)NFS_FILEID(inode), - bytes, - (unsigned long long)data->args.offset); - rpc_put_task(task); - - started += bytes; - user_addr += bytes; - pos += bytes; - /* FIXME: Remove this unnecessary math from final patch */ - pgbase += bytes; - pgbase &= ~PAGE_MASK; - BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); - - count -= bytes; + for (i = 0; i < npages; i++) { + struct nfs_page *req; + unsigned int req_len = min(bytes, PAGE_SIZE - pgbase); + /* XXX do we need to do the eof zeroing found in async_filler? */ + req = nfs_create_request(dreq->ctx, dreq->inode, + pagevec[i], + pgbase, req_len); + if (IS_ERR(req)) { + nfs_direct_release_pages(pagevec + i, + npages - i); + result = PTR_ERR(req); + break; + } + req->wb_index = pos >> PAGE_SHIFT; + req->wb_offset = pos & ~PAGE_MASK; + if (!nfs_pageio_add_request(desc, req)) { + result = desc->pg_error; + nfs_release_request(req); + nfs_direct_release_pages(pagevec + i, + npages - i); + break; + } + pgbase = 0; + bytes -= req_len; + started += req_len; + user_addr += req_len; + pos += req_len; + count -= req_len; + } } while (count != 0); + kfree(pagevec); + if (started) return started; return result < 0 ? (ssize_t) result : -EFAULT; @@ -407,15 +392,19 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, unsigned long nr_segs, loff_t pos) { + struct nfs_pageio_descriptor desc; ssize_t result = -EINVAL; size_t requested_bytes = 0; unsigned long seg; + nfs_pageio_init_read(&desc, dreq->inode, + &nfs_direct_read_completion_ops); get_dreq(dreq); + desc.pg_dreq = dreq; for (seg = 0; seg < nr_segs; seg++) { const struct iovec *vec = &iov[seg]; - result = nfs_direct_read_schedule_segment(dreq, vec, pos); + result = nfs_direct_read_schedule_segment(&desc, vec, pos); if (result < 0) break; requested_bytes += result; @@ -424,6 +413,8 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, pos += vec->iov_len; } + nfs_pageio_complete(&desc); + /* * If no bytes were started, return the error, and let the * generic layer handle the completion. -- cgit v1.2.2