diff options
author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2010-01-08 16:58:34 -0500 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2010-01-25 15:57:37 -0500 |
commit | 2450418c47b7998ad55a73f23707b1e21c371eef (patch) | |
tree | 1e17dd88f86c5daa1bfbca1aeea0c909391b5829 | |
parent | 5b1daecd59f95eb24dc629407ed80369c9929520 (diff) |
ceph: allocate middle of message before stating to read
Both front and middle parts of the message are now being
allocated at the ceph_alloc_msg().
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
-rw-r--r-- | fs/ceph/mds_client.c | 2 | ||||
-rw-r--r-- | fs/ceph/messenger.c | 142 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 9 | ||||
-rw-r--r-- | fs/ceph/mon_client.c | 25 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 17 |
5 files changed, 115 insertions, 80 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 623c67cd484b..93998a0678c4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c | |||
@@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = { | |||
2953 | .get_authorizer = get_authorizer, | 2953 | .get_authorizer = get_authorizer, |
2954 | .verify_authorizer_reply = verify_authorizer_reply, | 2954 | .verify_authorizer_reply = verify_authorizer_reply, |
2955 | .peer_reset = peer_reset, | 2955 | .peer_reset = peer_reset, |
2956 | .alloc_msg = ceph_alloc_msg, | ||
2957 | .alloc_middle = ceph_alloc_middle, | ||
2958 | }; | 2956 | }; |
2959 | 2957 | ||
2960 | 2958 | ||
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 1360708d7505..25de15c006b1 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con) | |||
1279 | 1279 | ||
1280 | 1280 | ||
1281 | 1281 | ||
1282 | static int read_partial_message_section(struct ceph_connection *con, | ||
1283 | struct kvec *section, unsigned int sec_len, | ||
1284 | u32 *crc) | ||
1285 | { | ||
1286 | int left; | ||
1287 | int ret; | ||
1288 | |||
1289 | BUG_ON(!section); | ||
1290 | |||
1291 | while (section->iov_len < sec_len) { | ||
1292 | BUG_ON(section->iov_base == NULL); | ||
1293 | left = sec_len - section->iov_len; | ||
1294 | ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + | ||
1295 | section->iov_len, left); | ||
1296 | if (ret <= 0) | ||
1297 | return ret; | ||
1298 | section->iov_len += ret; | ||
1299 | if (section->iov_len == sec_len) | ||
1300 | *crc = crc32c(0, section->iov_base, | ||
1301 | section->iov_len); | ||
1302 | } | ||
1282 | 1303 | ||
1304 | return 1; | ||
1305 | } | ||
1283 | 1306 | ||
1307 | static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | ||
1308 | struct ceph_msg_header *hdr, | ||
1309 | int *skip); | ||
1284 | /* | 1310 | /* |
1285 | * read (part of) a message. | 1311 | * read (part of) a message. |
1286 | */ | 1312 | */ |
@@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con) | |||
1292 | int to, want, left; | 1318 | int to, want, left; |
1293 | unsigned front_len, middle_len, data_len, data_off; | 1319 | unsigned front_len, middle_len, data_len, data_off; |
1294 | int datacrc = con->msgr->nocrc; | 1320 | int datacrc = con->msgr->nocrc; |
1321 | int skip; | ||
1295 | 1322 | ||
1296 | dout("read_partial_message con %p msg %p\n", con, m); | 1323 | dout("read_partial_message con %p msg %p\n", con, m); |
1297 | 1324 | ||
@@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con) | |||
1315 | } | 1342 | } |
1316 | } | 1343 | } |
1317 | } | 1344 | } |
1318 | |||
1319 | front_len = le32_to_cpu(con->in_hdr.front_len); | 1345 | front_len = le32_to_cpu(con->in_hdr.front_len); |
1320 | if (front_len > CEPH_MSG_MAX_FRONT_LEN) | 1346 | if (front_len > CEPH_MSG_MAX_FRONT_LEN) |
1321 | return -EIO; | 1347 | return -EIO; |
@@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con) | |||
1330 | if (!con->in_msg) { | 1356 | if (!con->in_msg) { |
1331 | dout("got hdr type %d front %d data %d\n", con->in_hdr.type, | 1357 | dout("got hdr type %d front %d data %d\n", con->in_hdr.type, |
1332 | con->in_hdr.front_len, con->in_hdr.data_len); | 1358 | con->in_hdr.front_len, con->in_hdr.data_len); |
1333 | con->in_msg = con->ops->alloc_msg(con, &con->in_hdr); | 1359 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); |
1334 | if (!con->in_msg) { | 1360 | if (skip) { |
1335 | /* skip this message */ | 1361 | /* skip this message */ |
1336 | pr_err("alloc_msg returned NULL, skipping message\n"); | 1362 | pr_err("alloc_msg returned NULL, skipping message\n"); |
1337 | con->in_base_pos = -front_len - middle_len - data_len - | 1363 | con->in_base_pos = -front_len - middle_len - data_len - |
@@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con) | |||
1342 | if (IS_ERR(con->in_msg)) { | 1368 | if (IS_ERR(con->in_msg)) { |
1343 | ret = PTR_ERR(con->in_msg); | 1369 | ret = PTR_ERR(con->in_msg); |
1344 | con->in_msg = NULL; | 1370 | con->in_msg = NULL; |
1345 | con->error_msg = "out of memory for incoming message"; | 1371 | con->error_msg = "error allocating memory for incoming message"; |
1346 | return ret; | 1372 | return ret; |
1347 | } | 1373 | } |
1348 | m = con->in_msg; | 1374 | m = con->in_msg; |
1349 | m->front.iov_len = 0; /* haven't read it yet */ | 1375 | m->front.iov_len = 0; /* haven't read it yet */ |
1376 | if (m->middle) | ||
1377 | m->middle->vec.iov_len = 0; | ||
1350 | memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 1378 | memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
1351 | } | 1379 | } |
1352 | 1380 | ||
1353 | /* front */ | 1381 | /* front */ |
1354 | while (m->front.iov_len < front_len) { | 1382 | ret = read_partial_message_section(con, &m->front, front_len, |
1355 | BUG_ON(m->front.iov_base == NULL); | 1383 | &con->in_front_crc); |
1356 | left = front_len - m->front.iov_len; | 1384 | if (ret <= 0) |
1357 | ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + | 1385 | return ret; |
1358 | m->front.iov_len, left); | ||
1359 | if (ret <= 0) | ||
1360 | return ret; | ||
1361 | m->front.iov_len += ret; | ||
1362 | if (m->front.iov_len == front_len) | ||
1363 | con->in_front_crc = crc32c(0, m->front.iov_base, | ||
1364 | m->front.iov_len); | ||
1365 | } | ||
1366 | 1386 | ||
1367 | /* middle */ | 1387 | /* middle */ |
1368 | while (middle_len > 0 && (!m->middle || | 1388 | if (m->middle) { |
1369 | m->middle->vec.iov_len < middle_len)) { | 1389 | ret = read_partial_message_section(con, &m->middle->vec, middle_len, |
1370 | if (m->middle == NULL) { | 1390 | &con->in_middle_crc); |
1371 | ret = -EOPNOTSUPP; | ||
1372 | if (con->ops->alloc_middle) | ||
1373 | ret = con->ops->alloc_middle(con, m); | ||
1374 | if (ret < 0) { | ||
1375 | pr_err("alloc_middle fail skipping payload\n"); | ||
1376 | con->in_base_pos = -middle_len - data_len | ||
1377 | - sizeof(m->footer); | ||
1378 | ceph_msg_put(con->in_msg); | ||
1379 | con->in_msg = NULL; | ||
1380 | con->in_tag = CEPH_MSGR_TAG_READY; | ||
1381 | return 0; | ||
1382 | } | ||
1383 | m->middle->vec.iov_len = 0; | ||
1384 | } | ||
1385 | left = middle_len - m->middle->vec.iov_len; | ||
1386 | ret = ceph_tcp_recvmsg(con->sock, | ||
1387 | (char *)m->middle->vec.iov_base + | ||
1388 | m->middle->vec.iov_len, left); | ||
1389 | if (ret <= 0) | 1391 | if (ret <= 0) |
1390 | return ret; | 1392 | return ret; |
1391 | m->middle->vec.iov_len += ret; | ||
1392 | if (m->middle->vec.iov_len == middle_len) | ||
1393 | con->in_middle_crc = crc32c(0, m->middle->vec.iov_base, | ||
1394 | m->middle->vec.iov_len); | ||
1395 | } | 1393 | } |
1396 | 1394 | ||
1397 | /* (page) data */ | 1395 | /* (page) data */ |
@@ -2116,31 +2114,13 @@ out: | |||
2116 | } | 2114 | } |
2117 | 2115 | ||
2118 | /* | 2116 | /* |
2119 | * Generic message allocator, for incoming messages. | ||
2120 | */ | ||
2121 | struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | ||
2122 | struct ceph_msg_header *hdr) | ||
2123 | { | ||
2124 | int type = le16_to_cpu(hdr->type); | ||
2125 | int front_len = le32_to_cpu(hdr->front_len); | ||
2126 | struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL); | ||
2127 | |||
2128 | if (!msg) { | ||
2129 | pr_err("unable to allocate msg type %d len %d\n", | ||
2130 | type, front_len); | ||
2131 | return ERR_PTR(-ENOMEM); | ||
2132 | } | ||
2133 | return msg; | ||
2134 | } | ||
2135 | |||
2136 | /* | ||
2137 | * Allocate "middle" portion of a message, if it is needed and wasn't | 2117 | * Allocate "middle" portion of a message, if it is needed and wasn't |
2138 | * allocated by alloc_msg. This allows us to read a small fixed-size | 2118 | * allocated by alloc_msg. This allows us to read a small fixed-size |
2139 | * per-type header in the front and then gracefully fail (i.e., | 2119 | * per-type header in the front and then gracefully fail (i.e., |
2140 | * propagate the error to the caller based on info in the front) when | 2120 | * propagate the error to the caller based on info in the front) when |
2141 | * the middle is too large. | 2121 | * the middle is too large. |
2142 | */ | 2122 | */ |
2143 | int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) | 2123 | static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) |
2144 | { | 2124 | { |
2145 | int type = le16_to_cpu(msg->hdr.type); | 2125 | int type = le16_to_cpu(msg->hdr.type); |
2146 | int middle_len = le32_to_cpu(msg->hdr.middle_len); | 2126 | int middle_len = le32_to_cpu(msg->hdr.middle_len); |
@@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) | |||
2156 | return 0; | 2136 | return 0; |
2157 | } | 2137 | } |
2158 | 2138 | ||
2139 | /* | ||
2140 | * Generic message allocator, for incoming messages. | ||
2141 | */ | ||
2142 | static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | ||
2143 | struct ceph_msg_header *hdr, | ||
2144 | int *skip) | ||
2145 | { | ||
2146 | int type = le16_to_cpu(hdr->type); | ||
2147 | int front_len = le32_to_cpu(hdr->front_len); | ||
2148 | int middle_len = le32_to_cpu(hdr->middle_len); | ||
2149 | struct ceph_msg *msg = NULL; | ||
2150 | int ret; | ||
2151 | |||
2152 | if (con->ops->alloc_msg) { | ||
2153 | msg = con->ops->alloc_msg(con, hdr, skip); | ||
2154 | if (IS_ERR(msg)) | ||
2155 | return msg; | ||
2156 | |||
2157 | if (*skip) | ||
2158 | return NULL; | ||
2159 | } | ||
2160 | if (!msg) { | ||
2161 | *skip = 0; | ||
2162 | msg = ceph_msg_new(type, front_len, 0, 0, NULL); | ||
2163 | if (!msg) { | ||
2164 | pr_err("unable to allocate msg type %d len %d\n", | ||
2165 | type, front_len); | ||
2166 | return ERR_PTR(-ENOMEM); | ||
2167 | } | ||
2168 | } | ||
2169 | |||
2170 | if (middle_len) { | ||
2171 | ret = ceph_alloc_middle(con, msg); | ||
2172 | |||
2173 | if (ret < 0) { | ||
2174 | ceph_msg_put(msg); | ||
2175 | return msg; | ||
2176 | } | ||
2177 | } | ||
2178 | return msg; | ||
2179 | } | ||
2180 | |||
2159 | 2181 | ||
2160 | /* | 2182 | /* |
2161 | * Free a generically kmalloc'd message. | 2183 | * Free a generically kmalloc'd message. |
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index a7b684145092..b6bec59056d7 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h | |||
@@ -44,9 +44,8 @@ struct ceph_connection_operations { | |||
44 | void (*peer_reset) (struct ceph_connection *con); | 44 | void (*peer_reset) (struct ceph_connection *con); |
45 | 45 | ||
46 | struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, | 46 | struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, |
47 | struct ceph_msg_header *hdr); | 47 | struct ceph_msg_header *hdr, |
48 | int (*alloc_middle) (struct ceph_connection *con, | 48 | int *skip); |
49 | struct ceph_msg *msg); | ||
50 | /* an incoming message has a data payload; tell me what pages I | 49 | /* an incoming message has a data payload; tell me what pages I |
51 | * should read the data into. */ | 50 | * should read the data into. */ |
52 | int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m, | 51 | int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m, |
@@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len, | |||
242 | struct page **pages); | 241 | struct page **pages); |
243 | extern void ceph_msg_kfree(struct ceph_msg *m); | 242 | extern void ceph_msg_kfree(struct ceph_msg *m); |
244 | 243 | ||
245 | extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | ||
246 | struct ceph_msg_header *hdr); | ||
247 | extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg); | ||
248 | |||
249 | 244 | ||
250 | static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) | 245 | static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) |
251 | { | 246 | { |
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c index 223e8bc207e3..6c00b37cc554 100644 --- a/fs/ceph/mon_client.c +++ b/fs/ceph/mon_client.c | |||
@@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) | |||
692 | * Allocate memory for incoming message | 692 | * Allocate memory for incoming message |
693 | */ | 693 | */ |
694 | static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, | 694 | static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, |
695 | struct ceph_msg_header *hdr) | 695 | struct ceph_msg_header *hdr, |
696 | int *skip) | ||
696 | { | 697 | { |
697 | struct ceph_mon_client *monc = con->private; | 698 | struct ceph_mon_client *monc = con->private; |
698 | int type = le16_to_cpu(hdr->type); | 699 | int type = le16_to_cpu(hdr->type); |
699 | int front = le32_to_cpu(hdr->front_len); | 700 | int front_len = le32_to_cpu(hdr->front_len); |
701 | struct ceph_msg *m; | ||
700 | 702 | ||
703 | *skip = 0; | ||
701 | switch (type) { | 704 | switch (type) { |
702 | case CEPH_MSG_MON_SUBSCRIBE_ACK: | 705 | case CEPH_MSG_MON_SUBSCRIBE_ACK: |
703 | return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front); | 706 | m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); |
707 | break; | ||
704 | case CEPH_MSG_STATFS_REPLY: | 708 | case CEPH_MSG_STATFS_REPLY: |
705 | return ceph_msgpool_get(&monc->msgpool_statfs_reply, front); | 709 | m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len); |
710 | break; | ||
706 | case CEPH_MSG_AUTH_REPLY: | 711 | case CEPH_MSG_AUTH_REPLY: |
707 | return ceph_msgpool_get(&monc->msgpool_auth_reply, front); | 712 | m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len); |
713 | break; | ||
714 | default: | ||
715 | return NULL; | ||
708 | } | 716 | } |
709 | return ceph_alloc_msg(con, hdr); | 717 | |
718 | if (!m) | ||
719 | *skip = 1; | ||
720 | |||
721 | return m; | ||
710 | } | 722 | } |
711 | 723 | ||
712 | /* | 724 | /* |
@@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = { | |||
749 | .dispatch = dispatch, | 761 | .dispatch = dispatch, |
750 | .fault = mon_fault, | 762 | .fault = mon_fault, |
751 | .alloc_msg = mon_alloc_msg, | 763 | .alloc_msg = mon_alloc_msg, |
752 | .alloc_middle = ceph_alloc_middle, | ||
753 | }; | 764 | }; |
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index 8417e21a3cb2..545e93617993 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c | |||
@@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) | |||
1304 | } | 1304 | } |
1305 | 1305 | ||
1306 | static struct ceph_msg *alloc_msg(struct ceph_connection *con, | 1306 | static struct ceph_msg *alloc_msg(struct ceph_connection *con, |
1307 | struct ceph_msg_header *hdr) | 1307 | struct ceph_msg_header *hdr, |
1308 | int *skip) | ||
1308 | { | 1309 | { |
1309 | struct ceph_osd *osd = con->private; | 1310 | struct ceph_osd *osd = con->private; |
1310 | struct ceph_osd_client *osdc = osd->o_osdc; | 1311 | struct ceph_osd_client *osdc = osd->o_osdc; |
1311 | int type = le16_to_cpu(hdr->type); | 1312 | int type = le16_to_cpu(hdr->type); |
1312 | int front = le32_to_cpu(hdr->front_len); | 1313 | int front = le32_to_cpu(hdr->front_len); |
1314 | struct ceph_msg *m; | ||
1313 | 1315 | ||
1316 | *skip = 0; | ||
1314 | switch (type) { | 1317 | switch (type) { |
1315 | case CEPH_MSG_OSD_OPREPLY: | 1318 | case CEPH_MSG_OSD_OPREPLY: |
1316 | return ceph_msgpool_get(&osdc->msgpool_op_reply, front); | 1319 | m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); |
1320 | break; | ||
1321 | default: | ||
1322 | return NULL; | ||
1317 | } | 1323 | } |
1318 | return ceph_alloc_msg(con, hdr); | 1324 | |
1325 | if (!m) | ||
1326 | *skip = 1; | ||
1327 | |||
1328 | return m; | ||
1319 | } | 1329 | } |
1320 | 1330 | ||
1321 | /* | 1331 | /* |
@@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = { | |||
1390 | .verify_authorizer_reply = verify_authorizer_reply, | 1400 | .verify_authorizer_reply = verify_authorizer_reply, |
1391 | .alloc_msg = alloc_msg, | 1401 | .alloc_msg = alloc_msg, |
1392 | .fault = osd_reset, | 1402 | .fault = osd_reset, |
1393 | .alloc_middle = ceph_alloc_middle, | ||
1394 | .prepare_pages = prepare_pages, | 1403 | .prepare_pages = prepare_pages, |
1395 | }; | 1404 | }; |