aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-01-11 17:47:13 -0500
committerSage Weil <sage@newdream.net>2010-01-25 15:57:46 -0500
commit0547a9b30a5ac8680325752b61d3ffa9d4971b6e (patch)
tree0e54e227d44fab7b98c97ee4c3bed185a1238487 /fs/ceph
parent9d7f0f139edfdce1a1539b100c617fd9182b0829 (diff)
ceph: alloc message data pages and check if tid exists
Now doing it in the same callback that is also responsible for allocating the 'front' part of the message. If we get a message that we haven't got a corresponding tid for, mark it for skipping. Moving the mutex unlock/lock from the osd alloc_msg callback to the calling function in the messenger. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/messenger.c33
-rw-r--r--fs/ceph/messenger.h4
-rw-r--r--fs/ceph/mon_client.c1
-rw-r--r--fs/ceph/osd_client.c66
4 files changed, 45 insertions, 59 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index e8742cc9ecdf..f708803e6857 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -2114,25 +2114,6 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2114 return 0; 2114 return 0;
2115} 2115}
2116 2116
2117static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
2118{
2119 int ret;
2120 int want;
2121 int data_len = le32_to_cpu(msg->hdr.data_len);
2122 unsigned data_off = le16_to_cpu(msg->hdr.data_off);
2123
2124 want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
2125 ret = -1;
2126 mutex_unlock(&con->mutex);
2127 if (con->ops->prepare_pages)
2128 ret = con->ops->prepare_pages(con, msg, want);
2129 mutex_lock(&con->mutex);
2130
2131 BUG_ON(msg->nr_pages < want);
2132
2133 return ret;
2134}
2135
2136/* 2117/*
2137 * Generic message allocator, for incoming messages. 2118 * Generic message allocator, for incoming messages.
2138 */ 2119 */
@@ -2143,12 +2124,13 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2143 int type = le16_to_cpu(hdr->type); 2124 int type = le16_to_cpu(hdr->type);
2144 int front_len = le32_to_cpu(hdr->front_len); 2125 int front_len = le32_to_cpu(hdr->front_len);
2145 int middle_len = le32_to_cpu(hdr->middle_len); 2126 int middle_len = le32_to_cpu(hdr->middle_len);
2146 int data_len = le32_to_cpu(hdr->data_len);
2147 struct ceph_msg *msg = NULL; 2127 struct ceph_msg *msg = NULL;
2148 int ret; 2128 int ret;
2149 2129
2150 if (con->ops->alloc_msg) { 2130 if (con->ops->alloc_msg) {
2131 mutex_unlock(&con->mutex);
2151 msg = con->ops->alloc_msg(con, hdr, skip); 2132 msg = con->ops->alloc_msg(con, hdr, skip);
2133 mutex_lock(&con->mutex);
2152 if (IS_ERR(msg)) 2134 if (IS_ERR(msg))
2153 return msg; 2135 return msg;
2154 2136
@@ -2175,17 +2157,6 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2175 } 2157 }
2176 } 2158 }
2177 2159
2178 if (data_len) {
2179 ret = ceph_alloc_data_section(con, msg);
2180
2181 if (ret < 0) {
2182 *skip = 1;
2183 ceph_msg_put(msg);
2184 return NULL;
2185 }
2186 }
2187
2188
2189 return msg; 2160 return msg;
2190} 2161}
2191 2162
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index b6bec59056d7..dca2d32b40de 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -46,10 +46,6 @@ struct ceph_connection_operations {
46 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, 46 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
47 struct ceph_msg_header *hdr, 47 struct ceph_msg_header *hdr,
48 int *skip); 48 int *skip);
49 /* an incoming message has a data payload; tell me what pages I
50 * should read the data into. */
51 int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
52 int want);
53}; 49};
54 50
55extern const char *ceph_name_type_str(int t); 51extern const char *ceph_name_type_str(int t);
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 6c00b37cc554..3f7ae7f73c50 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -701,6 +701,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
701 struct ceph_msg *m; 701 struct ceph_msg *m;
702 702
703 *skip = 0; 703 *skip = 0;
704
704 switch (type) { 705 switch (type) {
705 case CEPH_MSG_MON_SUBSCRIBE_ACK: 706 case CEPH_MSG_MON_SUBSCRIBE_ACK:
706 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); 707 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 545e93617993..44abe299c69f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -998,31 +998,26 @@ bad:
998 * find those pages. 998 * find those pages.
999 * 0 = success, -1 failure. 999 * 0 = success, -1 failure.
1000 */ 1000 */
1001static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, 1001static int prepare_pages(struct ceph_connection *con,
1002 int want) 1002 struct ceph_msg_header *hdr,
1003 struct ceph_osd_request *req,
1004 u64 tid,
1005 struct ceph_msg *m)
1003{ 1006{
1004 struct ceph_osd *osd = con->private; 1007 struct ceph_osd *osd = con->private;
1005 struct ceph_osd_client *osdc; 1008 struct ceph_osd_client *osdc;
1006 struct ceph_osd_request *req;
1007 u64 tid;
1008 int ret = -1; 1009 int ret = -1;
1009 int type = le16_to_cpu(m->hdr.type); 1010 int data_len = le32_to_cpu(hdr->data_len);
1011 unsigned data_off = le16_to_cpu(hdr->data_off);
1012
1013 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1010 1014
1011 if (!osd) 1015 if (!osd)
1012 return -1; 1016 return -1;
1017
1013 osdc = osd->o_osdc; 1018 osdc = osd->o_osdc;
1014 1019
1015 dout("prepare_pages on msg %p want %d\n", m, want); 1020 dout("prepare_pages on msg %p want %d\n", m, want);
1016 if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
1017 return -1; /* hmm! */
1018
1019 tid = le64_to_cpu(m->hdr.tid);
1020 mutex_lock(&osdc->request_mutex);
1021 req = __lookup_request(osdc, tid);
1022 if (!req) {
1023 dout("prepare_pages unknown tid %llu\n", tid);
1024 goto out;
1025 }
1026 dout("prepare_pages tid %llu has %d pages, want %d\n", 1021 dout("prepare_pages tid %llu has %d pages, want %d\n",
1027 tid, req->r_num_pages, want); 1022 tid, req->r_num_pages, want);
1028 if (unlikely(req->r_num_pages < want)) 1023 if (unlikely(req->r_num_pages < want))
@@ -1040,7 +1035,8 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1040 m->nr_pages = req->r_num_pages; 1035 m->nr_pages = req->r_num_pages;
1041 ret = 0; /* success */ 1036 ret = 0; /* success */
1042out: 1037out:
1043 mutex_unlock(&osdc->request_mutex); 1038 BUG_ON(ret < 0 || m->nr_pages < want);
1039
1044 return ret; 1040 return ret;
1045} 1041}
1046 1042
@@ -1311,19 +1307,42 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1311 struct ceph_osd_client *osdc = osd->o_osdc; 1307 struct ceph_osd_client *osdc = osd->o_osdc;
1312 int type = le16_to_cpu(hdr->type); 1308 int type = le16_to_cpu(hdr->type);
1313 int front = le32_to_cpu(hdr->front_len); 1309 int front = le32_to_cpu(hdr->front_len);
1310 int data_len = le32_to_cpu(hdr->data_len);
1314 struct ceph_msg *m; 1311 struct ceph_msg *m;
1312 struct ceph_osd_request *req;
1313 u64 tid;
1314 int err;
1315 1315
1316 *skip = 0; 1316 *skip = 0;
1317 switch (type) { 1317 if (type != CEPH_MSG_OSD_OPREPLY)
1318 case CEPH_MSG_OSD_OPREPLY:
1319 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1320 break;
1321 default:
1322 return NULL; 1318 return NULL;
1323 }
1324 1319
1325 if (!m) 1320 tid = le64_to_cpu(hdr->tid);
1321 mutex_lock(&osdc->request_mutex);
1322 req = __lookup_request(osdc, tid);
1323 if (!req) {
1324 *skip = 1;
1325 m = NULL;
1326 dout("prepare_pages unknown tid %llu\n", tid);
1327 goto out;
1328 }
1329 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1330 if (!m) {
1326 *skip = 1; 1331 *skip = 1;
1332 goto out;
1333 }
1334
1335 if (data_len > 0) {
1336 err = prepare_pages(con, hdr, req, tid, m);
1337 if (err < 0) {
1338 *skip = 1;
1339 ceph_msg_put(m);
1340 m = ERR_PTR(err);
1341 }
1342 }
1343
1344out:
1345 mutex_unlock(&osdc->request_mutex);
1327 1346
1328 return m; 1347 return m;
1329} 1348}
@@ -1400,5 +1419,4 @@ const static struct ceph_connection_operations osd_con_ops = {
1400 .verify_authorizer_reply = verify_authorizer_reply, 1419 .verify_authorizer_reply = verify_authorizer_reply,
1401 .alloc_msg = alloc_msg, 1420 .alloc_msg = alloc_msg,
1402 .fault = osd_reset, 1421 .fault = osd_reset,
1403 .prepare_pages = prepare_pages,
1404}; 1422};