aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-01-08 16:58:34 -0500
committerSage Weil <sage@newdream.net>2010-01-25 15:57:37 -0500
commit2450418c47b7998ad55a73f23707b1e21c371eef (patch)
tree1e17dd88f86c5daa1bfbca1aeea0c909391b5829
parent5b1daecd59f95eb24dc629407ed80369c9929520 (diff)
ceph: allocate middle of message before stating to read
Both front and middle parts of the message are now being allocated at the ceph_alloc_msg(). Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
-rw-r--r--fs/ceph/mds_client.c2
-rw-r--r--fs/ceph/messenger.c142
-rw-r--r--fs/ceph/messenger.h9
-rw-r--r--fs/ceph/mon_client.c25
-rw-r--r--fs/ceph/osd_client.c17
5 files changed, 115 insertions, 80 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 623c67cd484b..93998a0678c4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = {
2953 .get_authorizer = get_authorizer, 2953 .get_authorizer = get_authorizer,
2954 .verify_authorizer_reply = verify_authorizer_reply, 2954 .verify_authorizer_reply = verify_authorizer_reply,
2955 .peer_reset = peer_reset, 2955 .peer_reset = peer_reset,
2956 .alloc_msg = ceph_alloc_msg,
2957 .alloc_middle = ceph_alloc_middle,
2958}; 2956};
2959 2957
2960 2958
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 1360708d7505..25de15c006b1 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con)
1279 1279
1280 1280
1281 1281
1282static int read_partial_message_section(struct ceph_connection *con,
1283 struct kvec *section, unsigned int sec_len,
1284 u32 *crc)
1285{
1286 int left;
1287 int ret;
1288
1289 BUG_ON(!section);
1290
1291 while (section->iov_len < sec_len) {
1292 BUG_ON(section->iov_base == NULL);
1293 left = sec_len - section->iov_len;
1294 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
1295 section->iov_len, left);
1296 if (ret <= 0)
1297 return ret;
1298 section->iov_len += ret;
1299 if (section->iov_len == sec_len)
1300 *crc = crc32c(0, section->iov_base,
1301 section->iov_len);
1302 }
1282 1303
1304 return 1;
1305}
1283 1306
1307static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1308 struct ceph_msg_header *hdr,
1309 int *skip);
1284/* 1310/*
1285 * read (part of) a message. 1311 * read (part of) a message.
1286 */ 1312 */
@@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con)
1292 int to, want, left; 1318 int to, want, left;
1293 unsigned front_len, middle_len, data_len, data_off; 1319 unsigned front_len, middle_len, data_len, data_off;
1294 int datacrc = con->msgr->nocrc; 1320 int datacrc = con->msgr->nocrc;
1321 int skip;
1295 1322
1296 dout("read_partial_message con %p msg %p\n", con, m); 1323 dout("read_partial_message con %p msg %p\n", con, m);
1297 1324
@@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con)
1315 } 1342 }
1316 } 1343 }
1317 } 1344 }
1318
1319 front_len = le32_to_cpu(con->in_hdr.front_len); 1345 front_len = le32_to_cpu(con->in_hdr.front_len);
1320 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1346 if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1321 return -EIO; 1347 return -EIO;
@@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con)
1330 if (!con->in_msg) { 1356 if (!con->in_msg) {
1331 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1357 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1332 con->in_hdr.front_len, con->in_hdr.data_len); 1358 con->in_hdr.front_len, con->in_hdr.data_len);
1333 con->in_msg = con->ops->alloc_msg(con, &con->in_hdr); 1359 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
1334 if (!con->in_msg) { 1360 if (skip) {
1335 /* skip this message */ 1361 /* skip this message */
1336 pr_err("alloc_msg returned NULL, skipping message\n"); 1362 pr_err("alloc_msg returned NULL, skipping message\n");
1337 con->in_base_pos = -front_len - middle_len - data_len - 1363 con->in_base_pos = -front_len - middle_len - data_len -
@@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con)
1342 if (IS_ERR(con->in_msg)) { 1368 if (IS_ERR(con->in_msg)) {
1343 ret = PTR_ERR(con->in_msg); 1369 ret = PTR_ERR(con->in_msg);
1344 con->in_msg = NULL; 1370 con->in_msg = NULL;
1345 con->error_msg = "out of memory for incoming message"; 1371 con->error_msg = "error allocating memory for incoming message";
1346 return ret; 1372 return ret;
1347 } 1373 }
1348 m = con->in_msg; 1374 m = con->in_msg;
1349 m->front.iov_len = 0; /* haven't read it yet */ 1375 m->front.iov_len = 0; /* haven't read it yet */
1376 if (m->middle)
1377 m->middle->vec.iov_len = 0;
1350 memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); 1378 memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
1351 } 1379 }
1352 1380
1353 /* front */ 1381 /* front */
1354 while (m->front.iov_len < front_len) { 1382 ret = read_partial_message_section(con, &m->front, front_len,
1355 BUG_ON(m->front.iov_base == NULL); 1383 &con->in_front_crc);
1356 left = front_len - m->front.iov_len; 1384 if (ret <= 0)
1357 ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + 1385 return ret;
1358 m->front.iov_len, left);
1359 if (ret <= 0)
1360 return ret;
1361 m->front.iov_len += ret;
1362 if (m->front.iov_len == front_len)
1363 con->in_front_crc = crc32c(0, m->front.iov_base,
1364 m->front.iov_len);
1365 }
1366 1386
1367 /* middle */ 1387 /* middle */
1368 while (middle_len > 0 && (!m->middle || 1388 if (m->middle) {
1369 m->middle->vec.iov_len < middle_len)) { 1389 ret = read_partial_message_section(con, &m->middle->vec, middle_len,
1370 if (m->middle == NULL) { 1390 &con->in_middle_crc);
1371 ret = -EOPNOTSUPP;
1372 if (con->ops->alloc_middle)
1373 ret = con->ops->alloc_middle(con, m);
1374 if (ret < 0) {
1375 pr_err("alloc_middle fail skipping payload\n");
1376 con->in_base_pos = -middle_len - data_len
1377 - sizeof(m->footer);
1378 ceph_msg_put(con->in_msg);
1379 con->in_msg = NULL;
1380 con->in_tag = CEPH_MSGR_TAG_READY;
1381 return 0;
1382 }
1383 m->middle->vec.iov_len = 0;
1384 }
1385 left = middle_len - m->middle->vec.iov_len;
1386 ret = ceph_tcp_recvmsg(con->sock,
1387 (char *)m->middle->vec.iov_base +
1388 m->middle->vec.iov_len, left);
1389 if (ret <= 0) 1391 if (ret <= 0)
1390 return ret; 1392 return ret;
1391 m->middle->vec.iov_len += ret;
1392 if (m->middle->vec.iov_len == middle_len)
1393 con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
1394 m->middle->vec.iov_len);
1395 } 1393 }
1396 1394
1397 /* (page) data */ 1395 /* (page) data */
@@ -2116,31 +2114,13 @@ out:
2116} 2114}
2117 2115
2118/* 2116/*
2119 * Generic message allocator, for incoming messages.
2120 */
2121struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2122 struct ceph_msg_header *hdr)
2123{
2124 int type = le16_to_cpu(hdr->type);
2125 int front_len = le32_to_cpu(hdr->front_len);
2126 struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
2127
2128 if (!msg) {
2129 pr_err("unable to allocate msg type %d len %d\n",
2130 type, front_len);
2131 return ERR_PTR(-ENOMEM);
2132 }
2133 return msg;
2134}
2135
2136/*
2137 * Allocate "middle" portion of a message, if it is needed and wasn't 2117 * Allocate "middle" portion of a message, if it is needed and wasn't
2138 * allocated by alloc_msg. This allows us to read a small fixed-size 2118 * allocated by alloc_msg. This allows us to read a small fixed-size
2139 * per-type header in the front and then gracefully fail (i.e., 2119 * per-type header in the front and then gracefully fail (i.e.,
2140 * propagate the error to the caller based on info in the front) when 2120 * propagate the error to the caller based on info in the front) when
2141 * the middle is too large. 2121 * the middle is too large.
2142 */ 2122 */
2143int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2123static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2144{ 2124{
2145 int type = le16_to_cpu(msg->hdr.type); 2125 int type = le16_to_cpu(msg->hdr.type);
2146 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2126 int middle_len = le32_to_cpu(msg->hdr.middle_len);
@@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2156 return 0; 2136 return 0;
2157} 2137}
2158 2138
2139/*
2140 * Generic message allocator, for incoming messages.
2141 */
2142static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2143 struct ceph_msg_header *hdr,
2144 int *skip)
2145{
2146 int type = le16_to_cpu(hdr->type);
2147 int front_len = le32_to_cpu(hdr->front_len);
2148 int middle_len = le32_to_cpu(hdr->middle_len);
2149 struct ceph_msg *msg = NULL;
2150 int ret;
2151
2152 if (con->ops->alloc_msg) {
2153 msg = con->ops->alloc_msg(con, hdr, skip);
2154 if (IS_ERR(msg))
2155 return msg;
2156
2157 if (*skip)
2158 return NULL;
2159 }
2160 if (!msg) {
2161 *skip = 0;
2162 msg = ceph_msg_new(type, front_len, 0, 0, NULL);
2163 if (!msg) {
2164 pr_err("unable to allocate msg type %d len %d\n",
2165 type, front_len);
2166 return ERR_PTR(-ENOMEM);
2167 }
2168 }
2169
2170 if (middle_len) {
2171 ret = ceph_alloc_middle(con, msg);
2172
2173 if (ret < 0) {
2174 ceph_msg_put(msg);
2175 return msg;
2176 }
2177 }
2178 return msg;
2179}
2180
2159 2181
2160/* 2182/*
2161 * Free a generically kmalloc'd message. 2183 * Free a generically kmalloc'd message.
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index a7b684145092..b6bec59056d7 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -44,9 +44,8 @@ struct ceph_connection_operations {
44 void (*peer_reset) (struct ceph_connection *con); 44 void (*peer_reset) (struct ceph_connection *con);
45 45
46 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, 46 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
47 struct ceph_msg_header *hdr); 47 struct ceph_msg_header *hdr,
48 int (*alloc_middle) (struct ceph_connection *con, 48 int *skip);
49 struct ceph_msg *msg);
50 /* an incoming message has a data payload; tell me what pages I 49 /* an incoming message has a data payload; tell me what pages I
51 * should read the data into. */ 50 * should read the data into. */
52 int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m, 51 int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
@@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len,
242 struct page **pages); 241 struct page **pages);
243extern void ceph_msg_kfree(struct ceph_msg *m); 242extern void ceph_msg_kfree(struct ceph_msg *m);
244 243
245extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
246 struct ceph_msg_header *hdr);
247extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
248
249 244
250static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) 245static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
251{ 246{
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 223e8bc207e3..6c00b37cc554 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
692 * Allocate memory for incoming message 692 * Allocate memory for incoming message
693 */ 693 */
694static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 694static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
695 struct ceph_msg_header *hdr) 695 struct ceph_msg_header *hdr,
696 int *skip)
696{ 697{
697 struct ceph_mon_client *monc = con->private; 698 struct ceph_mon_client *monc = con->private;
698 int type = le16_to_cpu(hdr->type); 699 int type = le16_to_cpu(hdr->type);
699 int front = le32_to_cpu(hdr->front_len); 700 int front_len = le32_to_cpu(hdr->front_len);
701 struct ceph_msg *m;
700 702
703 *skip = 0;
701 switch (type) { 704 switch (type) {
702 case CEPH_MSG_MON_SUBSCRIBE_ACK: 705 case CEPH_MSG_MON_SUBSCRIBE_ACK:
703 return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front); 706 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
707 break;
704 case CEPH_MSG_STATFS_REPLY: 708 case CEPH_MSG_STATFS_REPLY:
705 return ceph_msgpool_get(&monc->msgpool_statfs_reply, front); 709 m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len);
710 break;
706 case CEPH_MSG_AUTH_REPLY: 711 case CEPH_MSG_AUTH_REPLY:
707 return ceph_msgpool_get(&monc->msgpool_auth_reply, front); 712 m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
713 break;
714 default:
715 return NULL;
708 } 716 }
709 return ceph_alloc_msg(con, hdr); 717
718 if (!m)
719 *skip = 1;
720
721 return m;
710} 722}
711 723
712/* 724/*
@@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = {
749 .dispatch = dispatch, 761 .dispatch = dispatch,
750 .fault = mon_fault, 762 .fault = mon_fault,
751 .alloc_msg = mon_alloc_msg, 763 .alloc_msg = mon_alloc_msg,
752 .alloc_middle = ceph_alloc_middle,
753}; 764};
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 8417e21a3cb2..545e93617993 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1304} 1304}
1305 1305
1306static struct ceph_msg *alloc_msg(struct ceph_connection *con, 1306static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1307 struct ceph_msg_header *hdr) 1307 struct ceph_msg_header *hdr,
1308 int *skip)
1308{ 1309{
1309 struct ceph_osd *osd = con->private; 1310 struct ceph_osd *osd = con->private;
1310 struct ceph_osd_client *osdc = osd->o_osdc; 1311 struct ceph_osd_client *osdc = osd->o_osdc;
1311 int type = le16_to_cpu(hdr->type); 1312 int type = le16_to_cpu(hdr->type);
1312 int front = le32_to_cpu(hdr->front_len); 1313 int front = le32_to_cpu(hdr->front_len);
1314 struct ceph_msg *m;
1313 1315
1316 *skip = 0;
1314 switch (type) { 1317 switch (type) {
1315 case CEPH_MSG_OSD_OPREPLY: 1318 case CEPH_MSG_OSD_OPREPLY:
1316 return ceph_msgpool_get(&osdc->msgpool_op_reply, front); 1319 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1320 break;
1321 default:
1322 return NULL;
1317 } 1323 }
1318 return ceph_alloc_msg(con, hdr); 1324
1325 if (!m)
1326 *skip = 1;
1327
1328 return m;
1319} 1329}
1320 1330
1321/* 1331/*
@@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = {
1390 .verify_authorizer_reply = verify_authorizer_reply, 1400 .verify_authorizer_reply = verify_authorizer_reply,
1391 .alloc_msg = alloc_msg, 1401 .alloc_msg = alloc_msg,
1392 .fault = osd_reset, 1402 .fault = osd_reset,
1393 .alloc_middle = ceph_alloc_middle,
1394 .prepare_pages = prepare_pages, 1403 .prepare_pages = prepare_pages,
1395}; 1404};