aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2010-03-01 16:02:00 -0500
committerSage Weil <sage@newdream.net>2010-03-01 18:20:02 -0500
commitc16e786927b977cb880873214bbd815e8d5ec4ba (patch)
tree82774e0b06015f2d4813b6cfd3d1924ad96da5d7
parent1679f876a641d209e7b22e43ebda0693c71003cf (diff)
ceph: use single osd op reply msg
Use a single ceph_msg for the osd reply, even when we are getting multiple replies. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/osd_client.c138
-rw-r--r--fs/ceph/osd_client.h5
2 files changed, 46 insertions, 97 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 3a631f27cc9e..ffe1f4064ccd 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -13,7 +13,8 @@
13#include "decode.h" 13#include "decode.h"
14#include "auth.h" 14#include "auth.h"
15 15
16#define OSD_REPLY_RESERVE_FRONT_LEN 512 16#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512
17 18
18const static struct ceph_connection_operations osd_con_ops; 19const static struct ceph_connection_operations osd_con_ops;
19 20
@@ -75,17 +76,6 @@ static void calc_layout(struct ceph_osd_client *osdc,
75 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); 76 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
76} 77}
77 78
78static void remove_replies(struct ceph_osd_request *req)
79{
80 int i;
81 int max = ARRAY_SIZE(req->replies);
82
83 for (i=0; i<max; i++) {
84 if (req->replies[i])
85 ceph_msg_put(req->replies[i]);
86 }
87}
88
89/* 79/*
90 * requests 80 * requests
91 */ 81 */
@@ -99,7 +89,6 @@ void ceph_osdc_release_request(struct kref *kref)
99 ceph_msg_put(req->r_request); 89 ceph_msg_put(req->r_request);
100 if (req->r_reply) 90 if (req->r_reply)
101 ceph_msg_put(req->r_reply); 91 ceph_msg_put(req->r_reply);
102 remove_replies(req);
103 if (req->r_con_filling_msg) { 92 if (req->r_con_filling_msg) {
104 dout("release_request revoking pages %p from con %p\n", 93 dout("release_request revoking pages %p from con %p\n",
105 req->r_pages, req->r_con_filling_msg); 94 req->r_pages, req->r_con_filling_msg);
@@ -117,60 +106,6 @@ void ceph_osdc_release_request(struct kref *kref)
117 kfree(req); 106 kfree(req);
118} 107}
119 108
120static int alloc_replies(struct ceph_osd_request *req, int num_reply)
121{
122 int i;
123 int max = ARRAY_SIZE(req->replies);
124
125 BUG_ON(num_reply > max);
126
127 for (i=0; i<num_reply; i++) {
128 req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
129 if (IS_ERR(req->replies[i])) {
130 int j;
131 int err = PTR_ERR(req->replies[i]);
132 for (j = 0; j<=i; j++) {
133 ceph_msg_put(req->replies[j]);
134 }
135 return err;
136 }
137 }
138
139 for (; i<max; i++) {
140 req->replies[i] = NULL;
141 }
142
143 req->cur_reply = 0;
144
145 return 0;
146}
147
148static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
149 struct ceph_osd_request *req,
150 int front_len)
151{
152 struct ceph_msg *reply;
153 if (req->r_con_filling_msg) {
154 dout("revoking reply msg %p from old con %p\n", req->r_reply,
155 req->r_con_filling_msg);
156 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
157 ceph_con_put(req->r_con_filling_msg);
158 req->cur_reply = 0;
159 }
160 reply = req->replies[req->cur_reply];
161 if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
162 /* maybe we can allocate it now? */
163 reply = ceph_msg_new(0, front_len, 0, 0, NULL);
164 if (!reply || IS_ERR(reply)) {
165 pr_err(" reply alloc failed, front_len=%d\n", front_len);
166 return ERR_PTR(-ENOMEM);
167 }
168 }
169 req->r_con_filling_msg = ceph_con_get(con);
170 req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
171 return ceph_msg_get(reply);
172}
173
174/* 109/*
175 * build new request AND message, calculate layout, and adjust file 110 * build new request AND message, calculate layout, and adjust file
176 * extent as needed. 111 * extent as needed.
@@ -201,7 +136,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
201 void *p; 136 void *p;
202 int num_op = 1 + do_sync; 137 int num_op = 1 + do_sync;
203 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 138 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
204 int err, i; 139 int i;
205 140
206 if (use_mempool) { 141 if (use_mempool) {
207 req = mempool_alloc(osdc->req_mempool, GFP_NOFS); 142 req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
@@ -212,13 +147,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
212 if (req == NULL) 147 if (req == NULL)
213 return ERR_PTR(-ENOMEM); 148 return ERR_PTR(-ENOMEM);
214 149
215 err = alloc_replies(req, num_reply);
216 if (err) {
217 ceph_osdc_put_request(req);
218 return ERR_PTR(-ENOMEM);
219 }
220 req->r_num_prealloc_reply = num_reply;
221
222 req->r_osdc = osdc; 150 req->r_osdc = osdc;
223 req->r_mempool = use_mempool; 151 req->r_mempool = use_mempool;
224 kref_init(&req->r_kref); 152 kref_init(&req->r_kref);
@@ -229,7 +157,19 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
229 157
230 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 158 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
231 159
232 /* create message; allow space for oid */ 160 /* create reply message */
161 if (use_mempool)
162 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
163 else
164 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
165 OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
166 if (IS_ERR(msg)) {
167 ceph_osdc_put_request(req);
168 return ERR_PTR(PTR_ERR(msg));
169 }
170 req->r_reply = msg;
171
172 /* create request message; allow space for oid */
233 msg_size += 40; 173 msg_size += 40;
234 if (snapc) 174 if (snapc)
235 msg_size += sizeof(u64) * snapc->num_snaps; 175 msg_size += sizeof(u64) * snapc->num_snaps;
@@ -819,21 +759,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
819 * avoid a (safe but slower) revoke later. 759 * avoid a (safe but slower) revoke later.
820 */ 760 */
821 if (req->r_con_filling_msg == con && req->r_reply == msg) { 761 if (req->r_con_filling_msg == con && req->r_reply == msg) {
822 dout(" got pages, dropping con_filling_msg ref %p\n", con); 762 dout(" dropping con_filling_msg ref %p\n", con);
823 req->r_con_filling_msg = NULL; 763 req->r_con_filling_msg = NULL;
824 ceph_con_put(con); 764 ceph_con_put(con);
825 } 765 }
826 766
827 if (req->r_reply) {
828 /*
829 * once we see the message has been received, we don't
830 * need a ref (which is only needed for revoking
831 * pages)
832 */
833 ceph_msg_put(req->r_reply);
834 req->r_reply = NULL;
835 }
836
837 if (!req->r_got_reply) { 767 if (!req->r_got_reply) {
838 unsigned bytes; 768 unsigned bytes;
839 769
@@ -1249,11 +1179,17 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1249 if (!osdc->req_mempool) 1179 if (!osdc->req_mempool)
1250 goto out; 1180 goto out;
1251 1181
1252 err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); 1182 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
1253 if (err < 0) 1183 if (err < 0)
1254 goto out_mempool; 1184 goto out_mempool;
1185 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1186 OSD_OPREPLY_FRONT_LEN, 10, true);
1187 if (err < 0)
1188 goto out_msgpool;
1255 return 0; 1189 return 0;
1256 1190
1191out_msgpool:
1192 ceph_msgpool_destroy(&osdc->msgpool_op);
1257out_mempool: 1193out_mempool:
1258 mempool_destroy(osdc->req_mempool); 1194 mempool_destroy(osdc->req_mempool);
1259out: 1195out:
@@ -1271,6 +1207,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1271 remove_old_osds(osdc, 1); 1207 remove_old_osds(osdc, 1);
1272 mempool_destroy(osdc->req_mempool); 1208 mempool_destroy(osdc->req_mempool);
1273 ceph_msgpool_destroy(&osdc->msgpool_op); 1209 ceph_msgpool_destroy(&osdc->msgpool_op);
1210 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1274} 1211}
1275 1212
1276/* 1213/*
@@ -1405,16 +1342,29 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1405 if (!req) { 1342 if (!req) {
1406 *skip = 1; 1343 *skip = 1;
1407 m = NULL; 1344 m = NULL;
1408 pr_info("alloc_msg unknown tid %llu from osd%d\n", tid, 1345 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1409 osd->o_osd); 1346 osd->o_osd);
1410 goto out; 1347 goto out;
1411 } 1348 }
1412 m = __get_next_reply(con, req, front); 1349
1413 if (!m || IS_ERR(m)) { 1350 if (req->r_con_filling_msg) {
1414 *skip = 1; 1351 dout("get_reply revoking msg %p from old con %p\n",
1415 goto out; 1352 req->r_reply, req->r_con_filling_msg);
1353 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1354 ceph_con_put(req->r_con_filling_msg);
1416 } 1355 }
1417 1356
1357 if (front > req->r_reply->front.iov_len) {
1358 pr_warning("get_reply front %d > preallocated %d\n",
1359 front, (int)req->r_reply->front.iov_len);
1360 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
1361 if (IS_ERR(m))
1362 goto out;
1363 ceph_msg_put(req->r_reply);
1364 req->r_reply = m;
1365 }
1366 m = ceph_msg_get(req->r_reply);
1367
1418 if (data_len > 0) { 1368 if (data_len > 0) {
1419 err = __prepare_pages(con, hdr, req, tid, m); 1369 err = __prepare_pages(con, hdr, req, tid, m);
1420 if (err < 0) { 1370 if (err < 0) {
@@ -1424,6 +1374,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1424 } 1374 }
1425 } 1375 }
1426 *skip = 0; 1376 *skip = 0;
1377 req->r_con_filling_msg = ceph_con_get(con);
1378 dout("get_reply tid %lld %p\n", tid, m);
1427 1379
1428out: 1380out:
1429 mutex_unlock(&osdc->request_mutex); 1381 mutex_unlock(&osdc->request_mutex);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 70f31b61f02c..f256eba6fe7a 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -53,7 +53,6 @@ struct ceph_osd_request {
53 int r_flags; /* any additional flags for the osd */ 53 int r_flags; /* any additional flags for the osd */
54 u32 r_sent; /* >0 if r_request is sending/sent */ 54 u32 r_sent; /* >0 if r_request is sending/sent */
55 int r_got_reply; 55 int r_got_reply;
56 int r_num_prealloc_reply;
57 56
58 struct ceph_osd_client *r_osdc; 57 struct ceph_osd_client *r_osdc;
59 struct kref r_kref; 58 struct kref r_kref;
@@ -77,9 +76,6 @@ struct ceph_osd_request {
77 struct page **r_pages; /* pages for data payload */ 76 struct page **r_pages; /* pages for data payload */
78 int r_pages_from_pool; 77 int r_pages_from_pool;
79 int r_own_pages; /* if true, i own page list */ 78 int r_own_pages; /* if true, i own page list */
80
81 struct ceph_msg *replies[2];
82 int cur_reply;
83}; 79};
84 80
85struct ceph_osd_client { 81struct ceph_osd_client {
@@ -106,6 +102,7 @@ struct ceph_osd_client {
106 mempool_t *req_mempool; 102 mempool_t *req_mempool;
107 103
108 struct ceph_msgpool msgpool_op; 104 struct ceph_msgpool msgpool_op;
105 struct ceph_msgpool msgpool_op_reply;
109}; 106};
110 107
111extern int ceph_osdc_init(struct ceph_osd_client *osdc, 108extern int ceph_osdc_init(struct ceph_osd_client *osdc,