aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMiklos Szeredi <mszeredi@suse.cz>2010-05-25 09:06:07 -0400
committerMiklos Szeredi <mszeredi@suse.cz>2010-05-25 09:06:07 -0400
commitc3021629a0d820247ee12b6c5192a1d5380e21c6 (patch)
treeeceb75197a9871e99b0b696af8047357686e4223
parentce534fb052928ce556639d7ecf01cbf4e01321e1 (diff)
fuse: support splice() reading from fuse device
Allow userspace filesystem implementation to use splice() to read from the fuse device. The userspace filesystem can now transfer data coming from a WRITE request to an arbitrary file descriptor (regular file, block device or socket) without having to go through a userspace buffer. The semantics of using splice() to read messages are: 1) with a single splice() call move the whole message from the fuse device to a temporary pipe 2) read the header from the pipe and determine the message type 3a) if message is a WRITE then splice data from pipe to destination 3b) else read rest of message to userspace buffer Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
-rw-r--r--fs/fuse/dev.c228
1 files changed, 187 insertions, 41 deletions
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index b070d3adf9b0..4413f5e7b133 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -515,13 +515,12 @@ struct fuse_copy_state {
515}; 515};
516 516
517static void fuse_copy_init(struct fuse_copy_state *cs, struct fuse_conn *fc, 517static void fuse_copy_init(struct fuse_copy_state *cs, struct fuse_conn *fc,
518 int write, struct fuse_req *req, 518 int write,
519 const struct iovec *iov, unsigned long nr_segs) 519 const struct iovec *iov, unsigned long nr_segs)
520{ 520{
521 memset(cs, 0, sizeof(*cs)); 521 memset(cs, 0, sizeof(*cs));
522 cs->fc = fc; 522 cs->fc = fc;
523 cs->write = write; 523 cs->write = write;
524 cs->req = req;
525 cs->iov = iov; 524 cs->iov = iov;
526 cs->nr_segs = nr_segs; 525 cs->nr_segs = nr_segs;
527} 526}
@@ -532,8 +531,12 @@ static void fuse_copy_finish(struct fuse_copy_state *cs)
532 if (cs->currbuf) { 531 if (cs->currbuf) {
533 struct pipe_buffer *buf = cs->currbuf; 532 struct pipe_buffer *buf = cs->currbuf;
534 533
535 buf->ops->unmap(cs->pipe, buf, cs->mapaddr); 534 if (!cs->write) {
536 535 buf->ops->unmap(cs->pipe, buf, cs->mapaddr);
536 } else {
537 kunmap_atomic(cs->mapaddr, KM_USER0);
538 buf->len = PAGE_SIZE - cs->len;
539 }
537 cs->currbuf = NULL; 540 cs->currbuf = NULL;
538 cs->mapaddr = NULL; 541 cs->mapaddr = NULL;
539 } else if (cs->mapaddr) { 542 } else if (cs->mapaddr) {
@@ -561,17 +564,39 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
561 if (cs->pipebufs) { 564 if (cs->pipebufs) {
562 struct pipe_buffer *buf = cs->pipebufs; 565 struct pipe_buffer *buf = cs->pipebufs;
563 566
564 err = buf->ops->confirm(cs->pipe, buf); 567 if (!cs->write) {
565 if (err) 568 err = buf->ops->confirm(cs->pipe, buf);
566 return err; 569 if (err)
570 return err;
571
572 BUG_ON(!cs->nr_segs);
573 cs->currbuf = buf;
574 cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
575 cs->len = buf->len;
576 cs->buf = cs->mapaddr + buf->offset;
577 cs->pipebufs++;
578 cs->nr_segs--;
579 } else {
580 struct page *page;
567 581
568 BUG_ON(!cs->nr_segs); 582 if (cs->nr_segs == cs->pipe->buffers)
569 cs->currbuf = buf; 583 return -EIO;
570 cs->mapaddr = buf->ops->map(cs->pipe, buf, 1); 584
571 cs->len = buf->len; 585 page = alloc_page(GFP_HIGHUSER);
572 cs->buf = cs->mapaddr + buf->offset; 586 if (!page)
573 cs->pipebufs++; 587 return -ENOMEM;
574 cs->nr_segs--; 588
589 buf->page = page;
590 buf->offset = 0;
591 buf->len = 0;
592
593 cs->currbuf = buf;
594 cs->mapaddr = kmap_atomic(page, KM_USER0);
595 cs->buf = cs->mapaddr;
596 cs->len = PAGE_SIZE;
597 cs->pipebufs++;
598 cs->nr_segs++;
599 }
575 } else { 600 } else {
576 if (!cs->seglen) { 601 if (!cs->seglen) {
577 BUG_ON(!cs->nr_segs); 602 BUG_ON(!cs->nr_segs);
@@ -731,6 +756,30 @@ out_fallback:
731 return 1; 756 return 1;
732} 757}
733 758
759static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
760 unsigned offset, unsigned count)
761{
762 struct pipe_buffer *buf;
763
764 if (cs->nr_segs == cs->pipe->buffers)
765 return -EIO;
766
767 unlock_request(cs->fc, cs->req);
768 fuse_copy_finish(cs);
769
770 buf = cs->pipebufs;
771 page_cache_get(page);
772 buf->page = page;
773 buf->offset = offset;
774 buf->len = count;
775
776 cs->pipebufs++;
777 cs->nr_segs++;
778 cs->len = 0;
779
780 return 0;
781}
782
734/* 783/*
735 * Copy a page in the request to/from the userspace buffer. Must be 784 * Copy a page in the request to/from the userspace buffer. Must be
736 * done atomically 785 * done atomically
@@ -747,7 +796,9 @@ static int fuse_copy_page(struct fuse_copy_state *cs, struct page **pagep,
747 kunmap_atomic(mapaddr, KM_USER1); 796 kunmap_atomic(mapaddr, KM_USER1);
748 } 797 }
749 while (count) { 798 while (count) {
750 if (!cs->len) { 799 if (cs->write && cs->pipebufs && page) {
800 return fuse_ref_page(cs, page, offset, count);
801 } else if (!cs->len) {
751 if (cs->move_pages && page && 802 if (cs->move_pages && page &&
752 offset == 0 && count == PAGE_SIZE) { 803 offset == 0 && count == PAGE_SIZE) {
753 err = fuse_try_move_page(cs, pagep); 804 err = fuse_try_move_page(cs, pagep);
@@ -862,11 +913,10 @@ __acquires(&fc->lock)
862 * 913 *
863 * Called with fc->lock held, releases it 914 * Called with fc->lock held, releases it
864 */ 915 */
865static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_req *req, 916static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_copy_state *cs,
866 const struct iovec *iov, unsigned long nr_segs) 917 size_t nbytes, struct fuse_req *req)
867__releases(&fc->lock) 918__releases(&fc->lock)
868{ 919{
869 struct fuse_copy_state cs;
870 struct fuse_in_header ih; 920 struct fuse_in_header ih;
871 struct fuse_interrupt_in arg; 921 struct fuse_interrupt_in arg;
872 unsigned reqsize = sizeof(ih) + sizeof(arg); 922 unsigned reqsize = sizeof(ih) + sizeof(arg);
@@ -882,14 +932,13 @@ __releases(&fc->lock)
882 arg.unique = req->in.h.unique; 932 arg.unique = req->in.h.unique;
883 933
884 spin_unlock(&fc->lock); 934 spin_unlock(&fc->lock);
885 if (iov_length(iov, nr_segs) < reqsize) 935 if (nbytes < reqsize)
886 return -EINVAL; 936 return -EINVAL;
887 937
888 fuse_copy_init(&cs, fc, 1, NULL, iov, nr_segs); 938 err = fuse_copy_one(cs, &ih, sizeof(ih));
889 err = fuse_copy_one(&cs, &ih, sizeof(ih));
890 if (!err) 939 if (!err)
891 err = fuse_copy_one(&cs, &arg, sizeof(arg)); 940 err = fuse_copy_one(cs, &arg, sizeof(arg));
892 fuse_copy_finish(&cs); 941 fuse_copy_finish(cs);
893 942
894 return err ? err : reqsize; 943 return err ? err : reqsize;
895} 944}
@@ -903,18 +952,13 @@ __releases(&fc->lock)
903 * request_end(). Otherwise add it to the processing list, and set 952 * request_end(). Otherwise add it to the processing list, and set
904 * the 'sent' flag. 953 * the 'sent' flag.
905 */ 954 */
906static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov, 955static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
907 unsigned long nr_segs, loff_t pos) 956 struct fuse_copy_state *cs, size_t nbytes)
908{ 957{
909 int err; 958 int err;
910 struct fuse_req *req; 959 struct fuse_req *req;
911 struct fuse_in *in; 960 struct fuse_in *in;
912 struct fuse_copy_state cs;
913 unsigned reqsize; 961 unsigned reqsize;
914 struct file *file = iocb->ki_filp;
915 struct fuse_conn *fc = fuse_get_conn(file);
916 if (!fc)
917 return -EPERM;
918 962
919 restart: 963 restart:
920 spin_lock(&fc->lock); 964 spin_lock(&fc->lock);
@@ -934,7 +978,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
934 if (!list_empty(&fc->interrupts)) { 978 if (!list_empty(&fc->interrupts)) {
935 req = list_entry(fc->interrupts.next, struct fuse_req, 979 req = list_entry(fc->interrupts.next, struct fuse_req,
936 intr_entry); 980 intr_entry);
937 return fuse_read_interrupt(fc, req, iov, nr_segs); 981 return fuse_read_interrupt(fc, cs, nbytes, req);
938 } 982 }
939 983
940 req = list_entry(fc->pending.next, struct fuse_req, list); 984 req = list_entry(fc->pending.next, struct fuse_req, list);
@@ -944,7 +988,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
944 in = &req->in; 988 in = &req->in;
945 reqsize = in->h.len; 989 reqsize = in->h.len;
946 /* If request is too large, reply with an error and restart the read */ 990 /* If request is too large, reply with an error and restart the read */
947 if (iov_length(iov, nr_segs) < reqsize) { 991 if (nbytes < reqsize) {
948 req->out.h.error = -EIO; 992 req->out.h.error = -EIO;
949 /* SETXATTR is special, since it may contain too large data */ 993 /* SETXATTR is special, since it may contain too large data */
950 if (in->h.opcode == FUSE_SETXATTR) 994 if (in->h.opcode == FUSE_SETXATTR)
@@ -953,12 +997,12 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
953 goto restart; 997 goto restart;
954 } 998 }
955 spin_unlock(&fc->lock); 999 spin_unlock(&fc->lock);
956 fuse_copy_init(&cs, fc, 1, req, iov, nr_segs); 1000 cs->req = req;
957 err = fuse_copy_one(&cs, &in->h, sizeof(in->h)); 1001 err = fuse_copy_one(cs, &in->h, sizeof(in->h));
958 if (!err) 1002 if (!err)
959 err = fuse_copy_args(&cs, in->numargs, in->argpages, 1003 err = fuse_copy_args(cs, in->numargs, in->argpages,
960 (struct fuse_arg *) in->args, 0); 1004 (struct fuse_arg *) in->args, 0);
961 fuse_copy_finish(&cs); 1005 fuse_copy_finish(cs);
962 spin_lock(&fc->lock); 1006 spin_lock(&fc->lock);
963 req->locked = 0; 1007 req->locked = 0;
964 if (req->aborted) { 1008 if (req->aborted) {
@@ -986,6 +1030,110 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
986 return err; 1030 return err;
987} 1031}
988 1032
1033static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
1034 unsigned long nr_segs, loff_t pos)
1035{
1036 struct fuse_copy_state cs;
1037 struct file *file = iocb->ki_filp;
1038 struct fuse_conn *fc = fuse_get_conn(file);
1039 if (!fc)
1040 return -EPERM;
1041
1042 fuse_copy_init(&cs, fc, 1, iov, nr_segs);
1043
1044 return fuse_dev_do_read(fc, file, &cs, iov_length(iov, nr_segs));
1045}
1046
1047static int fuse_dev_pipe_buf_steal(struct pipe_inode_info *pipe,
1048 struct pipe_buffer *buf)
1049{
1050 return 1;
1051}
1052
1053static const struct pipe_buf_operations fuse_dev_pipe_buf_ops = {
1054 .can_merge = 0,
1055 .map = generic_pipe_buf_map,
1056 .unmap = generic_pipe_buf_unmap,
1057 .confirm = generic_pipe_buf_confirm,
1058 .release = generic_pipe_buf_release,
1059 .steal = fuse_dev_pipe_buf_steal,
1060 .get = generic_pipe_buf_get,
1061};
1062
1063static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
1064 struct pipe_inode_info *pipe,
1065 size_t len, unsigned int flags)
1066{
1067 int ret;
1068 int page_nr = 0;
1069 int do_wakeup = 0;
1070 struct pipe_buffer *bufs;
1071 struct fuse_copy_state cs;
1072 struct fuse_conn *fc = fuse_get_conn(in);
1073 if (!fc)
1074 return -EPERM;
1075
1076 bufs = kmalloc(pipe->buffers * sizeof (struct pipe_buffer), GFP_KERNEL);
1077 if (!bufs)
1078 return -ENOMEM;
1079
1080 fuse_copy_init(&cs, fc, 1, NULL, 0);
1081 cs.pipebufs = bufs;
1082 cs.pipe = pipe;
1083 ret = fuse_dev_do_read(fc, in, &cs, len);
1084 if (ret < 0)
1085 goto out;
1086
1087 ret = 0;
1088 pipe_lock(pipe);
1089
1090 if (!pipe->readers) {
1091 send_sig(SIGPIPE, current, 0);
1092 if (!ret)
1093 ret = -EPIPE;
1094 goto out_unlock;
1095 }
1096
1097 if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
1098 ret = -EIO;
1099 goto out_unlock;
1100 }
1101
1102 while (page_nr < cs.nr_segs) {
1103 int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
1104 struct pipe_buffer *buf = pipe->bufs + newbuf;
1105
1106 buf->page = bufs[page_nr].page;
1107 buf->offset = bufs[page_nr].offset;
1108 buf->len = bufs[page_nr].len;
1109 buf->ops = &fuse_dev_pipe_buf_ops;
1110
1111 pipe->nrbufs++;
1112 page_nr++;
1113 ret += buf->len;
1114
1115 if (pipe->inode)
1116 do_wakeup = 1;
1117 }
1118
1119out_unlock:
1120 pipe_unlock(pipe);
1121
1122 if (do_wakeup) {
1123 smp_mb();
1124 if (waitqueue_active(&pipe->wait))
1125 wake_up_interruptible(&pipe->wait);
1126 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
1127 }
1128
1129out:
1130 for (; page_nr < cs.nr_segs; page_nr++)
1131 page_cache_release(bufs[page_nr].page);
1132
1133 kfree(bufs);
1134 return ret;
1135}
1136
989static int fuse_notify_poll(struct fuse_conn *fc, unsigned int size, 1137static int fuse_notify_poll(struct fuse_conn *fc, unsigned int size,
990 struct fuse_copy_state *cs) 1138 struct fuse_copy_state *cs)
991{ 1139{
@@ -1246,7 +1394,7 @@ static ssize_t fuse_dev_write(struct kiocb *iocb, const struct iovec *iov,
1246 if (!fc) 1394 if (!fc)
1247 return -EPERM; 1395 return -EPERM;
1248 1396
1249 fuse_copy_init(&cs, fc, 0, NULL, iov, nr_segs); 1397 fuse_copy_init(&cs, fc, 0, iov, nr_segs);
1250 1398
1251 return fuse_dev_do_write(fc, &cs, iov_length(iov, nr_segs)); 1399 return fuse_dev_do_write(fc, &cs, iov_length(iov, nr_segs));
1252} 1400}
@@ -1311,11 +1459,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
1311 } 1459 }
1312 pipe_unlock(pipe); 1460 pipe_unlock(pipe);
1313 1461
1314 memset(&cs, 0, sizeof(struct fuse_copy_state)); 1462 fuse_copy_init(&cs, fc, 0, NULL, nbuf);
1315 cs.fc = fc;
1316 cs.write = 0;
1317 cs.pipebufs = bufs; 1463 cs.pipebufs = bufs;
1318 cs.nr_segs = nbuf;
1319 cs.pipe = pipe; 1464 cs.pipe = pipe;
1320 1465
1321 if (flags & SPLICE_F_MOVE) 1466 if (flags & SPLICE_F_MOVE)
@@ -1473,6 +1618,7 @@ const struct file_operations fuse_dev_operations = {
1473 .llseek = no_llseek, 1618 .llseek = no_llseek,
1474 .read = do_sync_read, 1619 .read = do_sync_read,
1475 .aio_read = fuse_dev_read, 1620 .aio_read = fuse_dev_read,
1621 .splice_read = fuse_dev_splice_read,
1476 .write = do_sync_write, 1622 .write = do_sync_write,
1477 .aio_write = fuse_dev_write, 1623 .aio_write = fuse_dev_write,
1478 .splice_write = fuse_dev_splice_write, 1624 .splice_write = fuse_dev_splice_write,