aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMiklos Szeredi <miklos@szeredi.hu>2009-05-07 09:37:35 -0400
committerJens Axboe <jens.axboe@oracle.com>2009-05-11 08:13:09 -0400
commit7c77f0b3f9208c339a4b40737bb2cb0f0319bb8d (patch)
treecf45bc6afaa4bab28275b5aefc903f0112545965
parentb1f744937f1be3e6d3009382a755679133cf782d (diff)
splice: implement pipe to pipe splicing
Allow splice(2) to work when both the input and the output is a pipe. Based on the impementation of the tee(2) syscall, but instead of duplicating the buffer references move the buffers from the input pipe to the output pipe. Moving the whole buffer only succeeds if the full length of the buffer is spliced. Otherwise duplicate the buffer, just like tee(2), set the length of the output buffer and advance the offset on the input buffer. Since splice is operating on two pipes, special care needs to be taken with locking to prevent AN ABBA deadlock. Again this is done similarly to the tee(2) syscall, first preparing the input and output pipes so there's data to consume and space for that data, and then doing the move operation while holding both locks. If other processes are doing I/O on the same pipes parallel to the splice, then by the time both inodes are locked there might be no buffers left to move, or no space to move them to. In this case retry the whole operation, including the preparation phase. This could lead to starvation, but I'm not sure if that's serious enough to worry about. Signed-off-by: Miklos Szeredi <mszeredi@suse.cz> Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
-rw-r--r--fs/splice.c162
1 files changed, 151 insertions, 11 deletions
diff --git a/fs/splice.c b/fs/splice.c
index 666953d59a35..e405cf552f5c 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -1112,6 +1112,9 @@ long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
1112 return ret; 1112 return ret;
1113} 1113}
1114 1114
1115static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
1116 struct pipe_inode_info *opipe,
1117 size_t len, unsigned int flags);
1115/* 1118/*
1116 * After the inode slimming patch, i_pipe/i_bdev/i_cdev share the same 1119 * After the inode slimming patch, i_pipe/i_bdev/i_cdev share the same
1117 * location, so checking ->i_pipe is not enough to verify that this is a 1120 * location, so checking ->i_pipe is not enough to verify that this is a
@@ -1132,12 +1135,32 @@ static long do_splice(struct file *in, loff_t __user *off_in,
1132 struct file *out, loff_t __user *off_out, 1135 struct file *out, loff_t __user *off_out,
1133 size_t len, unsigned int flags) 1136 size_t len, unsigned int flags)
1134{ 1137{
1135 struct pipe_inode_info *pipe; 1138 struct pipe_inode_info *ipipe;
1139 struct pipe_inode_info *opipe;
1136 loff_t offset, *off; 1140 loff_t offset, *off;
1137 long ret; 1141 long ret;
1138 1142
1139 pipe = pipe_info(in->f_path.dentry->d_inode); 1143 ipipe = pipe_info(in->f_path.dentry->d_inode);
1140 if (pipe) { 1144 opipe = pipe_info(out->f_path.dentry->d_inode);
1145
1146 if (ipipe && opipe) {
1147 if (off_in || off_out)
1148 return -ESPIPE;
1149
1150 if (!(in->f_mode & FMODE_READ))
1151 return -EBADF;
1152
1153 if (!(out->f_mode & FMODE_WRITE))
1154 return -EBADF;
1155
1156 /* Splicing to self would be fun, but... */
1157 if (ipipe == opipe)
1158 return -EINVAL;
1159
1160 return splice_pipe_to_pipe(ipipe, opipe, len, flags);
1161 }
1162
1163 if (ipipe) {
1141 if (off_in) 1164 if (off_in)
1142 return -ESPIPE; 1165 return -ESPIPE;
1143 if (off_out) { 1166 if (off_out) {
@@ -1149,7 +1172,7 @@ static long do_splice(struct file *in, loff_t __user *off_in,
1149 } else 1172 } else
1150 off = &out->f_pos; 1173 off = &out->f_pos;
1151 1174
1152 ret = do_splice_from(pipe, out, off, len, flags); 1175 ret = do_splice_from(ipipe, out, off, len, flags);
1153 1176
1154 if (off_out && copy_to_user(off_out, off, sizeof(loff_t))) 1177 if (off_out && copy_to_user(off_out, off, sizeof(loff_t)))
1155 ret = -EFAULT; 1178 ret = -EFAULT;
@@ -1157,8 +1180,7 @@ static long do_splice(struct file *in, loff_t __user *off_in,
1157 return ret; 1180 return ret;
1158 } 1181 }
1159 1182
1160 pipe = pipe_info(out->f_path.dentry->d_inode); 1183 if (opipe) {
1161 if (pipe) {
1162 if (off_out) 1184 if (off_out)
1163 return -ESPIPE; 1185 return -ESPIPE;
1164 if (off_in) { 1186 if (off_in) {
@@ -1170,7 +1192,7 @@ static long do_splice(struct file *in, loff_t __user *off_in,
1170 } else 1192 } else
1171 off = &in->f_pos; 1193 off = &in->f_pos;
1172 1194
1173 ret = do_splice_to(in, off, pipe, len, flags); 1195 ret = do_splice_to(in, off, opipe, len, flags);
1174 1196
1175 if (off_in && copy_to_user(off_in, off, sizeof(loff_t))) 1197 if (off_in && copy_to_user(off_in, off, sizeof(loff_t)))
1176 ret = -EFAULT; 1198 ret = -EFAULT;
@@ -1511,7 +1533,7 @@ SYSCALL_DEFINE6(splice, int, fd_in, loff_t __user *, off_in,
1511 * Make sure there's data to read. Wait for input if we can, otherwise 1533 * Make sure there's data to read. Wait for input if we can, otherwise
1512 * return an appropriate error. 1534 * return an appropriate error.
1513 */ 1535 */
1514static int link_ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags) 1536static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
1515{ 1537{
1516 int ret; 1538 int ret;
1517 1539
@@ -1549,7 +1571,7 @@ static int link_ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
1549 * Make sure there's writeable room. Wait for room if we can, otherwise 1571 * Make sure there's writeable room. Wait for room if we can, otherwise
1550 * return an appropriate error. 1572 * return an appropriate error.
1551 */ 1573 */
1552static int link_opipe_prep(struct pipe_inode_info *pipe, unsigned int flags) 1574static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
1553{ 1575{
1554 int ret; 1576 int ret;
1555 1577
@@ -1587,6 +1609,124 @@ static int link_opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
1587} 1609}
1588 1610
1589/* 1611/*
1612 * Splice contents of ipipe to opipe.
1613 */
1614static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
1615 struct pipe_inode_info *opipe,
1616 size_t len, unsigned int flags)
1617{
1618 struct pipe_buffer *ibuf, *obuf;
1619 int ret = 0, nbuf;
1620 bool input_wakeup = false;
1621
1622
1623retry:
1624 ret = ipipe_prep(ipipe, flags);
1625 if (ret)
1626 return ret;
1627
1628 ret = opipe_prep(opipe, flags);
1629 if (ret)
1630 return ret;
1631
1632 /*
1633 * Potential ABBA deadlock, work around it by ordering lock
1634 * grabbing by pipe info address. Otherwise two different processes
1635 * could deadlock (one doing tee from A -> B, the other from B -> A).
1636 */
1637 pipe_double_lock(ipipe, opipe);
1638
1639 do {
1640 if (!opipe->readers) {
1641 send_sig(SIGPIPE, current, 0);
1642 if (!ret)
1643 ret = -EPIPE;
1644 break;
1645 }
1646
1647 if (!ipipe->nrbufs && !ipipe->writers)
1648 break;
1649
1650 /*
1651 * Cannot make any progress, because either the input
1652 * pipe is empty or the output pipe is full.
1653 */
1654 if (!ipipe->nrbufs || opipe->nrbufs >= PIPE_BUFFERS) {
1655 /* Already processed some buffers, break */
1656 if (ret)
1657 break;
1658
1659 if (flags & SPLICE_F_NONBLOCK) {
1660 ret = -EAGAIN;
1661 break;
1662 }
1663
1664 /*
1665 * We raced with another reader/writer and haven't
1666 * managed to process any buffers. A zero return
1667 * value means EOF, so retry instead.
1668 */
1669 pipe_unlock(ipipe);
1670 pipe_unlock(opipe);
1671 goto retry;
1672 }
1673
1674 ibuf = ipipe->bufs + ipipe->curbuf;
1675 nbuf = (opipe->curbuf + opipe->nrbufs) % PIPE_BUFFERS;
1676 obuf = opipe->bufs + nbuf;
1677
1678 if (len >= ibuf->len) {
1679 /*
1680 * Simply move the whole buffer from ipipe to opipe
1681 */
1682 *obuf = *ibuf;
1683 ibuf->ops = NULL;
1684 opipe->nrbufs++;
1685 ipipe->curbuf = (ipipe->curbuf + 1) % PIPE_BUFFERS;
1686 ipipe->nrbufs--;
1687 input_wakeup = true;
1688 } else {
1689 /*
1690 * Get a reference to this pipe buffer,
1691 * so we can copy the contents over.
1692 */
1693 ibuf->ops->get(ipipe, ibuf);
1694 *obuf = *ibuf;
1695
1696 /*
1697 * Don't inherit the gift flag, we need to
1698 * prevent multiple steals of this page.
1699 */
1700 obuf->flags &= ~PIPE_BUF_FLAG_GIFT;
1701
1702 obuf->len = len;
1703 opipe->nrbufs++;
1704 ibuf->offset += obuf->len;
1705 ibuf->len -= obuf->len;
1706 }
1707 ret += obuf->len;
1708 len -= obuf->len;
1709 } while (len);
1710
1711 pipe_unlock(ipipe);
1712 pipe_unlock(opipe);
1713
1714 /*
1715 * If we put data in the output pipe, wakeup any potential readers.
1716 */
1717 if (ret > 0) {
1718 smp_mb();
1719 if (waitqueue_active(&opipe->wait))
1720 wake_up_interruptible(&opipe->wait);
1721 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1722 }
1723 if (input_wakeup)
1724 wakeup_pipe_writers(ipipe);
1725
1726 return ret;
1727}
1728
1729/*
1590 * Link contents of ipipe to opipe. 1730 * Link contents of ipipe to opipe.
1591 */ 1731 */
1592static int link_pipe(struct pipe_inode_info *ipipe, 1732static int link_pipe(struct pipe_inode_info *ipipe,
@@ -1690,9 +1830,9 @@ static long do_tee(struct file *in, struct file *out, size_t len,
1690 * Keep going, unless we encounter an error. The ipipe/opipe 1830 * Keep going, unless we encounter an error. The ipipe/opipe
1691 * ordering doesn't really matter. 1831 * ordering doesn't really matter.
1692 */ 1832 */
1693 ret = link_ipipe_prep(ipipe, flags); 1833 ret = ipipe_prep(ipipe, flags);
1694 if (!ret) { 1834 if (!ret) {
1695 ret = link_opipe_prep(opipe, flags); 1835 ret = opipe_prep(opipe, flags);
1696 if (!ret) 1836 if (!ret)
1697 ret = link_pipe(ipipe, opipe, len, flags); 1837 ret = link_pipe(ipipe, opipe, len, flags);
1698 } 1838 }