aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/messenger.c33
-rw-r--r--fs/ceph/messenger.h4
-rw-r--r--fs/ceph/mon_client.c1
-rw-r--r--fs/ceph/osd_client.c66
4 files changed, 45 insertions, 59 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index e8742cc9ecdf..f708803e6857 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -2114,25 +2114,6 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2114 return 0; 2114 return 0;
2115} 2115}
2116 2116
2117static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
2118{
2119 int ret;
2120 int want;
2121 int data_len = le32_to_cpu(msg->hdr.data_len);
2122 unsigned data_off = le16_to_cpu(msg->hdr.data_off);
2123
2124 want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
2125 ret = -1;
2126 mutex_unlock(&con->mutex);
2127 if (con->ops->prepare_pages)
2128 ret = con->ops->prepare_pages(con, msg, want);
2129 mutex_lock(&con->mutex);
2130
2131 BUG_ON(msg->nr_pages < want);
2132
2133 return ret;
2134}
2135
2136/* 2117/*
2137 * Generic message allocator, for incoming messages. 2118 * Generic message allocator, for incoming messages.
2138 */ 2119 */
@@ -2143,12 +2124,13 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2143 int type = le16_to_cpu(hdr->type); 2124 int type = le16_to_cpu(hdr->type);
2144 int front_len = le32_to_cpu(hdr->front_len); 2125 int front_len = le32_to_cpu(hdr->front_len);
2145 int middle_len = le32_to_cpu(hdr->middle_len); 2126 int middle_len = le32_to_cpu(hdr->middle_len);
2146 int data_len = le32_to_cpu(hdr->data_len);
2147 struct ceph_msg *msg = NULL; 2127 struct ceph_msg *msg = NULL;
2148 int ret; 2128 int ret;
2149 2129
2150 if (con->ops->alloc_msg) { 2130 if (con->ops->alloc_msg) {
2131 mutex_unlock(&con->mutex);
2151 msg = con->ops->alloc_msg(con, hdr, skip); 2132 msg = con->ops->alloc_msg(con, hdr, skip);
2133 mutex_lock(&con->mutex);
2152 if (IS_ERR(msg)) 2134 if (IS_ERR(msg))
2153 return msg; 2135 return msg;
2154 2136
@@ -2175,17 +2157,6 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2175 } 2157 }
2176 } 2158 }
2177 2159
2178 if (data_len) {
2179 ret = ceph_alloc_data_section(con, msg);
2180
2181 if (ret < 0) {
2182 *skip = 1;
2183 ceph_msg_put(msg);
2184 return NULL;
2185 }
2186 }
2187
2188
2189 return msg; 2160 return msg;
2190} 2161}
2191 2162
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index b6bec59056d7..dca2d32b40de 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -46,10 +46,6 @@ struct ceph_connection_operations {
46 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, 46 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
47 struct ceph_msg_header *hdr, 47 struct ceph_msg_header *hdr,
48 int *skip); 48 int *skip);
49 /* an incoming message has a data payload; tell me what pages I
50 * should read the data into. */
51 int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
52 int want);
53}; 49};
54 50
55extern const char *ceph_name_type_str(int t); 51extern const char *ceph_name_type_str(int t);
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 6c00b37cc554..3f7ae7f73c50 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -701,6 +701,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
701 struct ceph_msg *m; 701 struct ceph_msg *m;
702 702
703 *skip = 0; 703 *skip = 0;
704
704 switch (type) { 705 switch (type) {
705 case CEPH_MSG_MON_SUBSCRIBE_ACK: 706 case CEPH_MSG_MON_SUBSCRIBE_ACK:
706 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); 707 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 545e93617993..44abe299c69f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -998,31 +998,26 @@ bad:
998 * find those pages. 998 * find those pages.
999 * 0 = success, -1 failure. 999 * 0 = success, -1 failure.
1000 */ 1000 */
1001static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, 1001static int prepare_pages(struct ceph_connection *con,
1002 int want) 1002 struct ceph_msg_header *hdr,
1003 struct ceph_osd_request *req,
1004 u64 tid,
1005 struct ceph_msg *m)
1003{ 1006{
1004 struct ceph_osd *osd = con->private; 1007 struct ceph_osd *osd = con->private;
1005 struct ceph_osd_client *osdc; 1008 struct ceph_osd_client *osdc;
1006 struct ceph_osd_request *req;
1007 u64 tid;
1008 int ret = -1; 1009 int ret = -1;
1009 int type = le16_to_cpu(m->hdr.type); 1010 int data_len = le32_to_cpu(hdr->data_len);
1011 unsigned data_off = le16_to_cpu(hdr->data_off);
1012
1013 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1010 1014
1011 if (!osd) 1015 if (!osd)
1012 return -1; 1016 return -1;
1017
1013 osdc = osd->o_osdc; 1018 osdc = osd->o_osdc;
1014 1019
1015 dout("prepare_pages on msg %p want %d\n", m, want); 1020 dout("prepare_pages on msg %p want %d\n", m, want);
1016 if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
1017 return -1; /* hmm! */
1018
1019 tid = le64_to_cpu(m->hdr.tid);
1020 mutex_lock(&osdc->request_mutex);
1021 req = __lookup_request(osdc, tid);
1022 if (!req) {
1023 dout("prepare_pages unknown tid %llu\n", tid);
1024 goto out;
1025 }
1026 dout("prepare_pages tid %llu has %d pages, want %d\n", 1021 dout("prepare_pages tid %llu has %d pages, want %d\n",
1027 tid, req->r_num_pages, want); 1022 tid, req->r_num_pages, want);
1028 if (unlikely(req->r_num_pages < want)) 1023 if (unlikely(req->r_num_pages < want))
@@ -1040,7 +1035,8 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1040 m->nr_pages = req->r_num_pages; 1035 m->nr_pages = req->r_num_pages;
1041 ret = 0; /* success */ 1036 ret = 0; /* success */
1042out: 1037out:
1043 mutex_unlock(&osdc->request_mutex); 1038 BUG_ON(ret < 0 || m->nr_pages < want);
1039
1044 return ret; 1040 return ret;
1045} 1041}
1046 1042
@@ -1311,19 +1307,42 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1311 struct ceph_osd_client *osdc = osd->o_osdc; 1307 struct ceph_osd_client *osdc = osd->o_osdc;
1312 int type = le16_to_cpu(hdr->type); 1308 int type = le16_to_cpu(hdr->type);
1313 int front = le32_to_cpu(hdr->front_len); 1309 int front = le32_to_cpu(hdr->front_len);
1310 int data_len = le32_to_cpu(hdr->data_len);
1314 struct ceph_msg *m; 1311 struct ceph_msg *m;
1312 struct ceph_osd_request *req;
1313 u64 tid;
1314 int err;
1315 1315
1316 *skip = 0; 1316 *skip = 0;
1317 switch (type) { 1317 if (type != CEPH_MSG_OSD_OPREPLY)
1318 case CEPH_MSG_OSD_OPREPLY:
1319 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1320 break;
1321 default:
1322 return NULL; 1318 return NULL;
1323 }
1324 1319
1325 if (!m) 1320 tid = le64_to_cpu(hdr->tid);
1321 mutex_lock(&osdc->request_mutex);
1322 req = __lookup_request(osdc, tid);
1323 if (!req) {
1324 *skip = 1;
1325 m = NULL;
1326 dout("prepare_pages unknown tid %llu\n", tid);
1327 goto out;
1328 }
1329 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
1330 if (!m) {
1326 *skip = 1; 1331 *skip = 1;
1332 goto out;
1333 }
1334
1335 if (data_len > 0) {
1336 err = prepare_pages(con, hdr, req, tid, m);
1337 if (err < 0) {
1338 *skip = 1;
1339 ceph_msg_put(m);
1340 m = ERR_PTR(err);
1341 }
1342 }
1343
1344out:
1345 mutex_unlock(&osdc->request_mutex);
1327 1346
1328 return m; 1347 return m;
1329} 1348}
@@ -1400,5 +1419,4 @@ const static struct ceph_connection_operations osd_con_ops = {
1400 .verify_authorizer_reply = verify_authorizer_reply, 1419 .verify_authorizer_reply = verify_authorizer_reply,
1401 .alloc_msg = alloc_msg, 1420 .alloc_msg = alloc_msg,
1402 .fault = osd_reset, 1421 .fault = osd_reset,
1403 .prepare_pages = prepare_pages,
1404}; 1422};