aboutsummaryrefslogtreecommitdiffstats
path: root/fs/nfs/direct.c
diff options
context:
space:
mode:
authorFred Isaman <iisaman@netapp.com>2012-04-20 14:47:51 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2012-04-27 14:10:38 -0400
commit584aa810b6240d88c28113a90c5029449814a3b5 (patch)
tree694b0942747c9ee7b8f53f21cb81ddc32cc07bbb /fs/nfs/direct.c
parent1825a0d08f22463e5a8f4b1636473efd057a3479 (diff)
NFS: rewrite directio read to use async coalesce code
This also has the advantage that it allows directio to use pnfs. Signed-off-by: Fred Isaman <iisaman@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'fs/nfs/direct.c')
-rw-r--r--fs/nfs/direct.c255
1 files changed, 123 insertions, 132 deletions
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 22a40c408449..4ba9a2c839bb 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -124,22 +124,6 @@ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_
124 return -EINVAL; 124 return -EINVAL;
125} 125}
126 126
127static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count)
128{
129 unsigned int npages;
130 unsigned int i;
131
132 if (count == 0)
133 return;
134 pages += (pgbase >> PAGE_SHIFT);
135 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT;
136 for (i = 0; i < npages; i++) {
137 struct page *page = pages[i];
138 if (!PageCompound(page))
139 set_page_dirty(page);
140 }
141}
142
143static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 127static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
144{ 128{
145 unsigned int i; 129 unsigned int i;
@@ -226,58 +210,92 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
226 nfs_direct_req_release(dreq); 210 nfs_direct_req_release(dreq);
227} 211}
228 212
229/* 213void nfs_direct_readpage_release(struct nfs_page *req)
230 * We must hold a reference to all the pages in this direct read request
231 * until the RPCs complete. This could be long *after* we are woken up in
232 * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
233 */
234static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
235{ 214{
236 struct nfs_read_data *data = calldata; 215 dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
237 216 req->wb_context->dentry->d_inode->i_sb->s_id,
238 nfs_readpage_result(task, data); 217 (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
218 req->wb_bytes,
219 (long long)req_offset(req));
220 nfs_release_request(req);
239} 221}
240 222
241static void nfs_direct_read_release(void *calldata) 223static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
242{ 224{
225 unsigned long bytes = 0;
226 struct nfs_direct_req *dreq = hdr->dreq;
243 227
244 struct nfs_read_data *data = calldata; 228 if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
245 struct nfs_direct_req *dreq = (struct nfs_direct_req *)data->header->req; 229 goto out_put;
246 int status = data->task.tk_status;
247 230
248 spin_lock(&dreq->lock); 231 spin_lock(&dreq->lock);
249 if (unlikely(status < 0)) { 232 if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
250 dreq->error = status; 233 dreq->error = hdr->error;
251 spin_unlock(&dreq->lock); 234 else
235 dreq->count += hdr->good_bytes;
236 spin_unlock(&dreq->lock);
237
238 if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
239 while (!list_empty(&hdr->pages)) {
240 struct nfs_page *req = nfs_list_entry(hdr->pages.next);
241 struct page *page = req->wb_page;
242
243 if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
244 if (bytes > hdr->good_bytes)
245 zero_user(page, 0, PAGE_SIZE);
246 else if (hdr->good_bytes - bytes < PAGE_SIZE)
247 zero_user_segment(page,
248 hdr->good_bytes & ~PAGE_MASK,
249 PAGE_SIZE);
250 }
251 bytes += req->wb_bytes;
252 nfs_list_remove_request(req);
253 nfs_direct_readpage_release(req);
254 if (!PageCompound(page))
255 set_page_dirty(page);
256 page_cache_release(page);
257 }
252 } else { 258 } else {
253 dreq->count += data->res.count; 259 while (!list_empty(&hdr->pages)) {
254 spin_unlock(&dreq->lock); 260 struct nfs_page *req = nfs_list_entry(hdr->pages.next);
255 nfs_direct_dirty_pages(data->pages.pagevec, 261
256 data->args.pgbase, 262 if (bytes < hdr->good_bytes)
257 data->res.count); 263 if (!PageCompound(req->wb_page))
264 set_page_dirty(req->wb_page);
265 bytes += req->wb_bytes;
266 page_cache_release(req->wb_page);
267 nfs_list_remove_request(req);
268 nfs_direct_readpage_release(req);
269 }
258 } 270 }
259 nfs_direct_release_pages(data->pages.pagevec, data->pages.npages); 271out_put:
260
261 if (put_dreq(dreq)) 272 if (put_dreq(dreq))
262 nfs_direct_complete(dreq); 273 nfs_direct_complete(dreq);
263 nfs_readdata_release(data); 274 hdr->release(hdr);
264} 275}
265 276
266static const struct rpc_call_ops nfs_read_direct_ops = { 277static void nfs_sync_pgio_error(struct list_head *head)
267 .rpc_call_prepare = nfs_read_prepare,
268 .rpc_call_done = nfs_direct_read_result,
269 .rpc_release = nfs_direct_read_release,
270};
271
272static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
273{ 278{
274 struct nfs_read_data *data = &rhdr->rpc_data; 279 struct nfs_page *req;
275 280
276 if (data->pages.pagevec != data->pages.page_array) 281 while (!list_empty(head)) {
277 kfree(data->pages.pagevec); 282 req = nfs_list_entry(head->next);
278 nfs_readhdr_free(&rhdr->header); 283 nfs_list_remove_request(req);
284 nfs_release_request(req);
285 }
279} 286}
280 287
288static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
289{
290 get_dreq(hdr->dreq);
291}
292
293static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
294 .error_cleanup = nfs_sync_pgio_error,
295 .init_hdr = nfs_direct_pgio_init,
296 .completion = nfs_direct_read_completion,
297};
298
281/* 299/*
282 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 300 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
283 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 301 * operation. If nfs_readdata_alloc() or get_user_pages() fails,
@@ -285,118 +303,85 @@ static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
285 * handled automatically by nfs_direct_read_result(). Otherwise, if 303 * handled automatically by nfs_direct_read_result(). Otherwise, if
286 * no requests have been sent, just return an error. 304 * no requests have been sent, just return an error.
287 */ 305 */
288static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 306static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
289 const struct iovec *iov, 307 const struct iovec *iov,
290 loff_t pos) 308 loff_t pos)
291{ 309{
310 struct nfs_direct_req *dreq = desc->pg_dreq;
292 struct nfs_open_context *ctx = dreq->ctx; 311 struct nfs_open_context *ctx = dreq->ctx;
293 struct inode *inode = ctx->dentry->d_inode; 312 struct inode *inode = ctx->dentry->d_inode;
294 unsigned long user_addr = (unsigned long)iov->iov_base; 313 unsigned long user_addr = (unsigned long)iov->iov_base;
295 size_t count = iov->iov_len; 314 size_t count = iov->iov_len;
296 size_t rsize = NFS_SERVER(inode)->rsize; 315 size_t rsize = NFS_SERVER(inode)->rsize;
297 struct rpc_task *task;
298 struct rpc_message msg = {
299 .rpc_cred = ctx->cred,
300 };
301 struct rpc_task_setup task_setup_data = {
302 .rpc_client = NFS_CLIENT(inode),
303 .rpc_message = &msg,
304 .callback_ops = &nfs_read_direct_ops,
305 .workqueue = nfsiod_workqueue,
306 .flags = RPC_TASK_ASYNC,
307 };
308 unsigned int pgbase; 316 unsigned int pgbase;
309 int result; 317 int result;
310 ssize_t started = 0; 318 ssize_t started = 0;
319 struct page **pagevec = NULL;
320 unsigned int npages;
311 321
312 do { 322 do {
313 struct nfs_read_header *rhdr;
314 struct nfs_read_data *data;
315 struct nfs_page_array *pages;
316 size_t bytes; 323 size_t bytes;
324 int i;
317 325
318 pgbase = user_addr & ~PAGE_MASK; 326 pgbase = user_addr & ~PAGE_MASK;
319 bytes = min(rsize,count); 327 bytes = min(max(rsize, PAGE_SIZE), count);
320 328
321 result = -ENOMEM; 329 result = -ENOMEM;
322 rhdr = nfs_readhdr_alloc(); 330 npages = nfs_page_array_len(pgbase, bytes);
323 if (unlikely(!rhdr)) 331 if (!pagevec)
324 break; 332 pagevec = kmalloc(npages * sizeof(struct page *),
325 data = nfs_readdata_alloc(&rhdr->header, nfs_page_array_len(pgbase, bytes)); 333 GFP_KERNEL);
326 if (!data) { 334 if (!pagevec)
327 nfs_readhdr_free(&rhdr->header);
328 break; 335 break;
329 }
330 data->header = &rhdr->header;
331 atomic_inc(&data->header->refcnt);
332 pages = &data->pages;
333
334 down_read(&current->mm->mmap_sem); 336 down_read(&current->mm->mmap_sem);
335 result = get_user_pages(current, current->mm, user_addr, 337 result = get_user_pages(current, current->mm, user_addr,
336 pages->npages, 1, 0, pages->pagevec, NULL); 338 npages, 1, 0, pagevec, NULL);
337 up_read(&current->mm->mmap_sem); 339 up_read(&current->mm->mmap_sem);
338 if (result < 0) { 340 if (result < 0)
339 nfs_direct_readhdr_release(rhdr);
340 break; 341 break;
341 } 342 if ((unsigned)result < npages) {
342 if ((unsigned)result < pages->npages) {
343 bytes = result * PAGE_SIZE; 343 bytes = result * PAGE_SIZE;
344 if (bytes <= pgbase) { 344 if (bytes <= pgbase) {
345 nfs_direct_release_pages(pages->pagevec, result); 345 nfs_direct_release_pages(pagevec, result);
346 nfs_direct_readhdr_release(rhdr);
347 break; 346 break;
348 } 347 }
349 bytes -= pgbase; 348 bytes -= pgbase;
350 pages->npages = result; 349 npages = result;
351 } 350 }
352 351
353 get_dreq(dreq); 352 for (i = 0; i < npages; i++) {
354 353 struct nfs_page *req;
355 rhdr->header.req = (struct nfs_page *) dreq; 354 unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
356 rhdr->header.inode = inode; 355 /* XXX do we need to do the eof zeroing found in async_filler? */
357 rhdr->header.cred = msg.rpc_cred; 356 req = nfs_create_request(dreq->ctx, dreq->inode,
358 data->args.fh = NFS_FH(inode); 357 pagevec[i],
359 data->args.context = get_nfs_open_context(ctx); 358 pgbase, req_len);
360 data->args.lock_context = dreq->l_ctx; 359 if (IS_ERR(req)) {
361 data->args.offset = pos; 360 nfs_direct_release_pages(pagevec + i,
362 data->args.pgbase = pgbase; 361 npages - i);
363 data->args.pages = pages->pagevec; 362 result = PTR_ERR(req);
364 data->args.count = bytes; 363 break;
365 data->res.fattr = &data->fattr; 364 }
366 data->res.eof = 0; 365 req->wb_index = pos >> PAGE_SHIFT;
367 data->res.count = bytes; 366 req->wb_offset = pos & ~PAGE_MASK;
368 nfs_fattr_init(&data->fattr); 367 if (!nfs_pageio_add_request(desc, req)) {
369 msg.rpc_argp = &data->args; 368 result = desc->pg_error;
370 msg.rpc_resp = &data->res; 369 nfs_release_request(req);
371 370 nfs_direct_release_pages(pagevec + i,
372 task_setup_data.task = &data->task; 371 npages - i);
373 task_setup_data.callback_data = data; 372 break;
374 NFS_PROTO(inode)->read_setup(data, &msg); 373 }
375 374 pgbase = 0;
376 task = rpc_run_task(&task_setup_data); 375 bytes -= req_len;
377 if (IS_ERR(task)) 376 started += req_len;
378 break; 377 user_addr += req_len;
379 378 pos += req_len;
380 dprintk("NFS: %5u initiated direct read call " 379 count -= req_len;
381 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 380 }
382 task->tk_pid,
383 inode->i_sb->s_id,
384 (long long)NFS_FILEID(inode),
385 bytes,
386 (unsigned long long)data->args.offset);
387 rpc_put_task(task);
388
389 started += bytes;
390 user_addr += bytes;
391 pos += bytes;
392 /* FIXME: Remove this unnecessary math from final patch */
393 pgbase += bytes;
394 pgbase &= ~PAGE_MASK;
395 BUG_ON(pgbase != (user_addr & ~PAGE_MASK));
396
397 count -= bytes;
398 } while (count != 0); 381 } while (count != 0);
399 382
383 kfree(pagevec);
384
400 if (started) 385 if (started)
401 return started; 386 return started;
402 return result < 0 ? (ssize_t) result : -EFAULT; 387 return result < 0 ? (ssize_t) result : -EFAULT;
@@ -407,15 +392,19 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
407 unsigned long nr_segs, 392 unsigned long nr_segs,
408 loff_t pos) 393 loff_t pos)
409{ 394{
395 struct nfs_pageio_descriptor desc;
410 ssize_t result = -EINVAL; 396 ssize_t result = -EINVAL;
411 size_t requested_bytes = 0; 397 size_t requested_bytes = 0;
412 unsigned long seg; 398 unsigned long seg;
413 399
400 nfs_pageio_init_read(&desc, dreq->inode,
401 &nfs_direct_read_completion_ops);
414 get_dreq(dreq); 402 get_dreq(dreq);
403 desc.pg_dreq = dreq;
415 404
416 for (seg = 0; seg < nr_segs; seg++) { 405 for (seg = 0; seg < nr_segs; seg++) {
417 const struct iovec *vec = &iov[seg]; 406 const struct iovec *vec = &iov[seg];
418 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 407 result = nfs_direct_read_schedule_segment(&desc, vec, pos);
419 if (result < 0) 408 if (result < 0)
420 break; 409 break;
421 requested_bytes += result; 410 requested_bytes += result;
@@ -424,6 +413,8 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
424 pos += vec->iov_len; 413 pos += vec->iov_len;
425 } 414 }
426 415
416 nfs_pageio_complete(&desc);
417
427 /* 418 /*
428 * If no bytes were started, return the error, and let the 419 * If no bytes were started, return the error, and let the
429 * generic layer handle the completion. 420 * generic layer handle the completion.