aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTom Zanussi <zanussi@us.ibm.com>2007-06-04 03:12:05 -0400
committerJens Axboe <jens.axboe@oracle.com>2007-07-10 02:04:14 -0400
commitebf9909343392c929d9943c04f421cd42e03b530 (patch)
treefc484001e9af14cf5ca3b868b89de9c4f50719ad
parentcf8208d0eabd1d5d2625ec02a175a294c3f30d36 (diff)
splice: relay support
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
-rw-r--r--kernel/relay.c241
1 files changed, 191 insertions, 50 deletions
diff --git a/kernel/relay.c b/kernel/relay.c
index 95db8c79fe8f..d1d1920f2800 100644
--- a/kernel/relay.c
+++ b/kernel/relay.c
@@ -21,6 +21,7 @@
21#include <linux/vmalloc.h> 21#include <linux/vmalloc.h>
22#include <linux/mm.h> 22#include <linux/mm.h>
23#include <linux/cpu.h> 23#include <linux/cpu.h>
24#include <linux/pipe_fs_i.h>
24 25
25/* list of open channels, for cpu hotplug */ 26/* list of open channels, for cpu hotplug */
26static DEFINE_MUTEX(relay_channels_mutex); 27static DEFINE_MUTEX(relay_channels_mutex);
@@ -121,6 +122,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
121 buf->page_array[i] = alloc_page(GFP_KERNEL); 122 buf->page_array[i] = alloc_page(GFP_KERNEL);
122 if (unlikely(!buf->page_array[i])) 123 if (unlikely(!buf->page_array[i]))
123 goto depopulate; 124 goto depopulate;
125 set_page_private(buf->page_array[i], (unsigned long)buf);
124 } 126 }
125 mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL); 127 mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
126 if (!mem) 128 if (!mem)
@@ -970,43 +972,6 @@ static int subbuf_read_actor(size_t read_start,
970 return ret; 972 return ret;
971} 973}
972 974
973/*
974 * subbuf_send_actor - send up to one subbuf's worth of data
975 */
976static int subbuf_send_actor(size_t read_start,
977 struct rchan_buf *buf,
978 size_t avail,
979 read_descriptor_t *desc,
980 read_actor_t actor)
981{
982 unsigned long pidx, poff;
983 unsigned int subbuf_pages;
984 int ret = 0;
985
986 subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
987 pidx = (read_start / PAGE_SIZE) % subbuf_pages;
988 poff = read_start & ~PAGE_MASK;
989 while (avail) {
990 struct page *p = buf->page_array[pidx];
991 unsigned int len;
992
993 len = PAGE_SIZE - poff;
994 if (len > avail)
995 len = avail;
996
997 len = actor(desc, p, poff, len);
998 if (desc->error)
999 break;
1000
1001 avail -= len;
1002 ret += len;
1003 poff = 0;
1004 pidx = (pidx + 1) % subbuf_pages;
1005 }
1006
1007 return ret;
1008}
1009
1010typedef int (*subbuf_actor_t) (size_t read_start, 975typedef int (*subbuf_actor_t) (size_t read_start,
1011 struct rchan_buf *buf, 976 struct rchan_buf *buf,
1012 size_t avail, 977 size_t avail,
@@ -1067,19 +1032,195 @@ static ssize_t relay_file_read(struct file *filp,
1067 NULL, &desc); 1032 NULL, &desc);
1068} 1033}
1069 1034
1070static ssize_t relay_file_sendfile(struct file *filp, 1035static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
1071 loff_t *ppos, 1036 struct pipe_buffer *buf)
1072 size_t count,
1073 read_actor_t actor,
1074 void *target)
1075{ 1037{
1076 read_descriptor_t desc; 1038 struct rchan_buf *rbuf;
1077 desc.written = 0; 1039
1078 desc.count = count; 1040 rbuf = (struct rchan_buf *)page_private(buf->page);
1079 desc.arg.data = target; 1041
1080 desc.error = 0; 1042 rbuf->bytes_consumed += PAGE_SIZE;
1081 return relay_file_read_subbufs(filp, ppos, subbuf_send_actor, 1043
1082 actor, &desc); 1044 if (rbuf->bytes_consumed == rbuf->chan->subbuf_size) {
1045 relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
1046 rbuf->bytes_consumed = 0;
1047 }
1048}
1049
1050static struct pipe_buf_operations relay_pipe_buf_ops = {
1051 .can_merge = 0,
1052 .map = generic_pipe_buf_map,
1053 .unmap = generic_pipe_buf_unmap,
1054 .pin = generic_pipe_buf_pin,
1055 .release = relay_pipe_buf_release,
1056 .steal = generic_pipe_buf_steal,
1057 .get = generic_pipe_buf_get,
1058};
1059
1060/**
1061 * subbuf_splice_actor - splice up to one subbuf's worth of data
1062 */
1063static int subbuf_splice_actor(struct file *in,
1064 loff_t *ppos,
1065 struct pipe_inode_info *pipe,
1066 size_t len,
1067 unsigned int flags,
1068 int *nonpad_ret)
1069{
1070 unsigned int pidx, poff;
1071 unsigned int subbuf_pages;
1072 int ret = 0;
1073 int do_wakeup = 0;
1074 struct rchan_buf *rbuf = in->private_data;
1075 unsigned int subbuf_size = rbuf->chan->subbuf_size;
1076 size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
1077 size_t avail = subbuf_size - read_start % subbuf_size;
1078 size_t read_subbuf = read_start / subbuf_size;
1079 size_t padding = rbuf->padding[read_subbuf];
1080 size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;
1081
1082 if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
1083 return 0;
1084
1085 if (len > avail)
1086 len = avail;
1087
1088 if (pipe->inode)
1089 mutex_lock(&pipe->inode->i_mutex);
1090
1091 subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
1092 pidx = (read_start / PAGE_SIZE) % subbuf_pages;
1093 poff = read_start & ~PAGE_MASK;
1094
1095 for (;;) {
1096 unsigned int this_len;
1097 unsigned int this_end;
1098 int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
1099 struct pipe_buffer *buf = pipe->bufs + newbuf;
1100
1101 if (!pipe->readers) {
1102 send_sig(SIGPIPE, current, 0);
1103 if (!ret)
1104 ret = -EPIPE;
1105 break;
1106 }
1107
1108 if (pipe->nrbufs < PIPE_BUFFERS) {
1109 this_len = PAGE_SIZE - poff;
1110 if (this_len > avail)
1111 this_len = avail;
1112
1113 buf->page = rbuf->page_array[pidx];
1114 buf->offset = poff;
1115 this_end = read_start + ret + this_len;
1116 if (this_end > nonpad_end) {
1117 if (read_start + ret >= nonpad_end)
1118 buf->len = 0;
1119 else
1120 buf->len = nonpad_end - (read_start + ret);
1121 } else
1122 buf->len = this_len;
1123
1124 *nonpad_ret += buf->len;
1125
1126 buf->ops = &relay_pipe_buf_ops;
1127 pipe->nrbufs++;
1128
1129 avail -= this_len;
1130 ret += this_len;
1131 poff = 0;
1132 pidx = (pidx + 1) % subbuf_pages;
1133
1134 if (pipe->inode)
1135 do_wakeup = 1;
1136
1137 if (!avail)
1138 break;
1139
1140 if (pipe->nrbufs < PIPE_BUFFERS)
1141 continue;
1142
1143 break;
1144 }
1145
1146 if (flags & SPLICE_F_NONBLOCK) {
1147 if (!ret)
1148 ret = -EAGAIN;
1149 break;
1150 }
1151
1152 if (signal_pending(current)) {
1153 if (!ret)
1154 ret = -ERESTARTSYS;
1155 break;
1156 }
1157
1158 if (do_wakeup) {
1159 smp_mb();
1160 if (waitqueue_active(&pipe->wait))
1161 wake_up_interruptible_sync(&pipe->wait);
1162 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
1163 do_wakeup = 0;
1164 }
1165
1166 pipe->waiting_writers++;
1167 pipe_wait(pipe);
1168 pipe->waiting_writers--;
1169 }
1170
1171 if (pipe->inode)
1172 mutex_unlock(&pipe->inode->i_mutex);
1173
1174 if (do_wakeup) {
1175 smp_mb();
1176 if (waitqueue_active(&pipe->wait))
1177 wake_up_interruptible(&pipe->wait);
1178 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
1179 }
1180
1181 return ret;
1182}
1183
1184static ssize_t relay_file_splice_read(struct file *in,
1185 loff_t *ppos,
1186 struct pipe_inode_info *pipe,
1187 size_t len,
1188 unsigned int flags)
1189{
1190 ssize_t spliced;
1191 int ret;
1192 int nonpad_ret = 0;
1193
1194 ret = 0;
1195 spliced = 0;
1196
1197 while (len) {
1198 ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
1199 if (ret < 0)
1200 break;
1201 else if (!ret) {
1202 break;
1203 if (spliced)
1204 break;
1205 if (flags & SPLICE_F_NONBLOCK) {
1206 ret = -EAGAIN;
1207 break;
1208 }
1209 }
1210
1211 *ppos += ret;
1212 if (ret > len)
1213 len = 0;
1214 else
1215 len -= ret;
1216 spliced += nonpad_ret;
1217 nonpad_ret = 0;
1218 }
1219
1220 if (spliced)
1221 return spliced;
1222
1223 return ret;
1083} 1224}
1084 1225
1085const struct file_operations relay_file_operations = { 1226const struct file_operations relay_file_operations = {
@@ -1089,7 +1230,7 @@ const struct file_operations relay_file_operations = {
1089 .read = relay_file_read, 1230 .read = relay_file_read,
1090 .llseek = no_llseek, 1231 .llseek = no_llseek,
1091 .release = relay_file_release, 1232 .release = relay_file_release,
1092 .sendfile = relay_file_sendfile, 1233 .splice_read = relay_file_splice_read,
1093}; 1234};
1094EXPORT_SYMBOL_GPL(relay_file_operations); 1235EXPORT_SYMBOL_GPL(relay_file_operations);
1095 1236