aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/osd_client.c
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-01-11 17:47:13 -0500
committerSage Weil <sage@newdream.net>2010-01-25 15:57:46 -0500
commit0547a9b30a5ac8680325752b61d3ffa9d4971b6e (patch)
tree0e54e227d44fab7b98c97ee4c3bed185a1238487 /fs/ceph/osd_client.c
parent9d7f0f139edfdce1a1539b100c617fd9182b0829 (diff)
ceph: alloc message data pages and check if tid exists
Now doing it in the same callback that is also responsible for allocating the 'front' part of the message. If we get a message that we haven't got a corresponding tid for, mark it for skipping. Moving the mutex unlock/lock from the osd alloc_msg callback to the calling function in the messenger. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r--fs/ceph/osd_client.c66
1 files changed, 42 insertions, 24 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 545e93617993..44abe299c69f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -998,31 +998,26 @@ bad:
998 * find those pages. 998 * find those pages.
999 * 0 = success, -1 failure. 999 * 0 = success, -1 failure.
1000 */ 1000 */
1001static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, 1001static int prepare_pages(struct ceph_connection *con,
1002 int want) 1002 struct ceph_msg_header *hdr,
1003 struct ceph_osd_request *req,
1004 u64 tid,
1005 struct ceph_msg *m)
1003{ 1006{
1004 struct ceph_osd *osd = con->private; 1007 struct ceph_osd *osd = con->private;
1005 struct ceph_osd_client *osdc; 1008 struct ceph_osd_client *osdc;
1006 struct ceph_osd_request *req;
1007 u64 tid;
1008 int ret = -1; 1009 int ret = -1;
1009 int type = le16_to_cpu(m->hdr.type); 1010 int data_len = le32_to_cpu(hdr->data_len);
1011 unsigned data_off = le16_to_cpu(hdr->data_off);
1012
1013 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1010 1014
1011 if (!osd) 1015 if (!osd)
1012 return -1; 1016 return -1;
1017
1013 osdc = osd->o_osdc; 1018 osdc = osd->o_osdc;
1014 1019
1015 dout("prepare_pages on msg %p want %d\n", m, want); 1020 dout("prepare_pages on msg %p want %d\n", m, want);
1016 if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
1017 return -1; /* hmm! */
1018
1019 tid = le64_to_cpu(m->hdr.tid);
1020 mutex_lock(&osdc->request_mutex);
1021 req = __lookup_request(osdc, tid);
1022 if (!req) {
1023 dout("prepare_pages unknown tid %llu\n", tid);
1024 goto out;
1025 }
1026 dout("prepare_pages tid %llu has %d pages, want %d\n", 1021 dout("prepare_pages tid %llu has %d pages, want %d\n",
1027 tid, req->r_num_pages, want); 1022 tid, req->r_num_pages, want);
1028 if (unlikely(req->r_num_pages < want)) 1023 if (unlikely(req->r_num_pages < want))
@@ -1040,7 +1035,8 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1040 m->nr_pages = req->r_num_pages; 1035 m->nr_pages = req->r_num_pages;
1041 ret = 0; /* success */ 1036 ret = 0; /* success */
1042out: 1037out:
1043 mutex_unlock(&osdc->request_mutex); 1038 BUG_ON(ret < 0 || m->nr_pages < want);
1039
1044 return ret; 1040 return ret;
1045} 1041}
1046 1042
@@ -1311,19 +1307,42 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1311 struct ceph_osd_client *osdc = osd->o_osdc; 1307 struct ceph_osd_client *osdc = osd->o_osdc;
1312 int type = le16_to_cpu(hdr->type); 1308 int type = le16_to_cpu(hdr->type);
1313 int front = le32_to_cpu(hdr->front_len); 1309 int front = le32_to_cpu(hdr->front_len);
1310 int data_len = le32_to_cpu(hdr->data_len);
1314 struct ceph_msg *m; 1311 struct ceph_msg *m;
1312 struct ceph_osd_request *req;
1313 u64 tid;
1314 int err;
1315 1315
1316 *skip = 0; 1316 *skip = 0;
1317 switch (type) { 1317 if (type != CEPH_MSG_OSD_OPREPLY)
1318 case CEPH_MSG_OSD_OPREPLY:
1319 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1320 break;
1321 default:
1322 return NULL; 1318 return NULL;
1323 }
1324 1319
1325 if (!m) 1320 tid = le64_to_cpu(hdr->tid);
1321 mutex_lock(&osdc->request_mutex);
1322 req = __lookup_request(osdc, tid);
1323 if (!req) {
1324 *skip = 1;
1325 m = NULL;
1326 dout("prepare_pages unknown tid %llu\n", tid);
1327 goto out;
1328 }
1329 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1330 if (!m) {
1326 *skip = 1; 1331 *skip = 1;
1332 goto out;
1333 }
1334
1335 if (data_len > 0) {
1336 err = prepare_pages(con, hdr, req, tid, m);
1337 if (err < 0) {
1338 *skip = 1;
1339 ceph_msg_put(m);
1340 m = ERR_PTR(err);
1341 }
1342 }
1343
1344out:
1345 mutex_unlock(&osdc->request_mutex);
1327 1346
1328 return m; 1347 return m;
1329} 1348}
@@ -1400,5 +1419,4 @@ const static struct ceph_connection_operations osd_con_ops = {
1400 .verify_authorizer_reply = verify_authorizer_reply, 1419 .verify_authorizer_reply = verify_authorizer_reply,
1401 .alloc_msg = alloc_msg, 1420 .alloc_msg = alloc_msg,
1402 .fault = osd_reset, 1421 .fault = osd_reset,
1403 .prepare_pages = prepare_pages,
1404}; 1422};