aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-01-13 20:03:23 -0500
committerSage Weil <sage@newdream.net>2010-01-25 15:58:08 -0500
commit0d59ab81c3d3adf466c3fd37d7fb6d46b05d1fd4 (patch)
tree1cdf338188bc323379ced8d5e1bdec31f500768c
parent0547a9b30a5ac8680325752b61d3ffa9d4971b6e (diff)
ceph: keep reserved replies on the request structure
This includes treating all the data preallocation and revokation at the same place, not having to have a special case for the reserved pages. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
-rw-r--r--fs/ceph/messenger.c20
-rw-r--r--fs/ceph/messenger.h4
-rw-r--r--fs/ceph/osd_client.c118
-rw-r--r--fs/ceph/osd_client.h8
4 files changed, 100 insertions, 50 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index f708803e6857..81bc779adb90 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -1985,30 +1985,30 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1985} 1985}
1986 1986
1987/* 1987/*
1988 * Revoke a page vector that we may be reading data into 1988 * Revoke a message that we may be reading data into
1989 */ 1989 */
1990void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages) 1990void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
1991{ 1991{
1992 mutex_lock(&con->mutex); 1992 mutex_lock(&con->mutex);
1993 if (con->in_msg && con->in_msg->pages == pages) { 1993 if (con->in_msg && con->in_msg == msg) {
1994 unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
1995 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
1994 unsigned data_len = le32_to_cpu(con->in_hdr.data_len); 1996 unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
1995 1997
1996 /* skip rest of message */ 1998 /* skip rest of message */
1997 dout("con_revoke_pages %p msg %p pages %p revoked\n", con, 1999 dout("con_revoke_pages %p msg %p revoked\n", con, msg);
1998 con->in_msg, pages);
1999 if (con->in_msg_pos.data_pos < data_len)
2000 con->in_base_pos = con->in_msg_pos.data_pos - data_len;
2001 else
2002 con->in_base_pos = con->in_base_pos - 2000 con->in_base_pos = con->in_base_pos -
2003 sizeof(struct ceph_msg_header) - 2001 sizeof(struct ceph_msg_header) -
2002 front_len -
2003 middle_len -
2004 data_len -
2004 sizeof(struct ceph_msg_footer); 2005 sizeof(struct ceph_msg_footer);
2005 con->in_msg->pages = NULL;
2006 ceph_msg_put(con->in_msg); 2006 ceph_msg_put(con->in_msg);
2007 con->in_msg = NULL; 2007 con->in_msg = NULL;
2008 con->in_tag = CEPH_MSGR_TAG_READY; 2008 con->in_tag = CEPH_MSGR_TAG_READY;
2009 } else { 2009 } else {
2010 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2010 dout("con_revoke_pages %p msg %p pages %p no-op\n",
2011 con, con->in_msg, pages); 2011 con, con->in_msg, msg);
2012 } 2012 }
2013 mutex_unlock(&con->mutex); 2013 mutex_unlock(&con->mutex);
2014} 2014}
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index dca2d32b40de..c26a3d8aa78c 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -226,8 +226,8 @@ extern void ceph_con_open(struct ceph_connection *con,
226extern void ceph_con_close(struct ceph_connection *con); 226extern void ceph_con_close(struct ceph_connection *con);
227extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); 227extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
228extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); 228extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
229extern void ceph_con_revoke_pages(struct ceph_connection *con, 229extern void ceph_con_revoke_message(struct ceph_connection *con,
230 struct page **pages); 230 struct ceph_msg *msg);
231extern void ceph_con_keepalive(struct ceph_connection *con); 231extern void ceph_con_keepalive(struct ceph_connection *con);
232extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); 232extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
233extern void ceph_con_put(struct ceph_connection *con); 233extern void ceph_con_put(struct ceph_connection *con);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 44abe299c69f..df2106839713 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -13,6 +13,8 @@
13#include "decode.h" 13#include "decode.h"
14#include "auth.h" 14#include "auth.h"
15 15
16#define OSD_REPLY_RESERVE_FRONT_LEN 512
17
16const static struct ceph_connection_operations osd_con_ops; 18const static struct ceph_connection_operations osd_con_ops;
17 19
18static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 20static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
@@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc,
73 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); 75 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
74} 76}
75 77
78static void remove_replies(struct ceph_osd_request *req)
79{
80 int i;
81 int max = ARRAY_SIZE(req->replies);
82
83 for (i=0; i<max; i++) {
84 if (req->replies[i])
85 ceph_msg_put(req->replies[i]);
86 }
87}
76 88
77/* 89/*
78 * requests 90 * requests
@@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref)
87 ceph_msg_put(req->r_request); 99 ceph_msg_put(req->r_request);
88 if (req->r_reply) 100 if (req->r_reply)
89 ceph_msg_put(req->r_reply); 101 ceph_msg_put(req->r_reply);
90 if (req->r_con_filling_pages) { 102 remove_replies(req);
103 if (req->r_con_filling_msg) {
91 dout("release_request revoking pages %p from con %p\n", 104 dout("release_request revoking pages %p from con %p\n",
92 req->r_pages, req->r_con_filling_pages); 105 req->r_pages, req->r_con_filling_msg);
93 ceph_con_revoke_pages(req->r_con_filling_pages, 106 ceph_con_revoke_message(req->r_con_filling_msg,
94 req->r_pages); 107 req->r_reply);
95 ceph_con_put(req->r_con_filling_pages); 108 ceph_con_put(req->r_con_filling_msg);
96 } 109 }
97 if (req->r_own_pages) 110 if (req->r_own_pages)
98 ceph_release_page_vector(req->r_pages, 111 ceph_release_page_vector(req->r_pages,
@@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref)
104 kfree(req); 117 kfree(req);
105} 118}
106 119
120static int alloc_replies(struct ceph_osd_request *req, int num_reply)
121{
122 int i;
123 int max = ARRAY_SIZE(req->replies);
124
125 BUG_ON(num_reply > max);
126
127 for (i=0; i<num_reply; i++) {
128 req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
129 if (IS_ERR(req->replies[i])) {
130 int j;
131 int err = PTR_ERR(req->replies[i]);
132 for (j = 0; j<=i; j++) {
133 ceph_msg_put(req->replies[j]);
134 }
135 return err;
136 }
137 }
138
139 for (; i<max; i++) {
140 req->replies[i] = NULL;
141 }
142
143 req->cur_reply = 0;
144
145 return 0;
146}
147
148static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
149 struct ceph_osd_request *req,
150 int front_len)
151{
152 struct ceph_msg *reply;
153 if (req->r_con_filling_msg) {
154 dout("revoking reply msg %p from old con %p\n", req->r_reply,
155 req->r_con_filling_msg);
156 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
157 ceph_con_put(req->r_con_filling_msg);
158 req->cur_reply = 0;
159 }
160 reply = req->replies[req->cur_reply];
161 if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
162 /* maybe we can allocate it now? */
163 reply = ceph_msg_new(0, front_len, 0, 0, NULL);
164 if (!reply || IS_ERR(reply)) {
165 pr_err(" reply alloc failed, front_len=%d\n", front_len);
166 return ERR_PTR(-ENOMEM);
167 }
168 }
169 req->r_con_filling_msg = ceph_con_get(con);
170 req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
171 return ceph_msg_get(reply);
172}
173
107/* 174/*
108 * build new request AND message, calculate layout, and adjust file 175 * build new request AND message, calculate layout, and adjust file
109 * extent as needed. 176 * extent as needed.
@@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
147 if (req == NULL) 214 if (req == NULL)
148 return ERR_PTR(-ENOMEM); 215 return ERR_PTR(-ENOMEM);
149 216
150 err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply); 217 err = alloc_replies(req, num_reply);
151 if (err) { 218 if (err) {
152 ceph_osdc_put_request(req); 219 ceph_osdc_put_request(req);
153 return ERR_PTR(-ENOMEM); 220 return ERR_PTR(-ENOMEM);
@@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
173 else 240 else
174 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); 241 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
175 if (IS_ERR(msg)) { 242 if (IS_ERR(msg)) {
176 ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply);
177 ceph_osdc_put_request(req); 243 ceph_osdc_put_request(req);
178 return ERR_PTR(PTR_ERR(msg)); 244 return ERR_PTR(PTR_ERR(msg));
179 } 245 }
@@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
471 rb_erase(&req->r_node, &osdc->requests); 537 rb_erase(&req->r_node, &osdc->requests);
472 osdc->num_requests--; 538 osdc->num_requests--;
473 539
474 ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply);
475
476 if (req->r_osd) { 540 if (req->r_osd) {
477 /* make sure the original request isn't in flight. */ 541 /* make sure the original request isn't in flight. */
478 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 542 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
@@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
724 flags = le32_to_cpu(rhead->flags); 788 flags = le32_to_cpu(rhead->flags);
725 789
726 /* 790 /*
727 * if this connection filled our pages, drop our reference now, to 791 * if this connection filled our message, drop our reference now, to
728 * avoid a (safe but slower) revoke later. 792 * avoid a (safe but slower) revoke later.
729 */ 793 */
730 if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { 794 if (req->r_con_filling_msg == con && req->r_reply == msg) {
731 dout(" got pages, dropping con_filling_pages ref %p\n", con); 795 dout(" got pages, dropping con_filling_msg ref %p\n", con);
732 req->r_con_filling_pages = NULL; 796 req->r_con_filling_msg = NULL;
733 ceph_con_put(con); 797 ceph_con_put(con);
734 } 798 }
735 799
@@ -998,7 +1062,7 @@ bad:
998 * find those pages. 1062 * find those pages.
999 * 0 = success, -1 failure. 1063 * 0 = success, -1 failure.
1000 */ 1064 */
1001static int prepare_pages(struct ceph_connection *con, 1065static int __prepare_pages(struct ceph_connection *con,
1002 struct ceph_msg_header *hdr, 1066 struct ceph_msg_header *hdr,
1003 struct ceph_osd_request *req, 1067 struct ceph_osd_request *req,
1004 u64 tid, 1068 u64 tid,
@@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con,
1017 1081
1018 osdc = osd->o_osdc; 1082 osdc = osd->o_osdc;
1019 1083
1020 dout("prepare_pages on msg %p want %d\n", m, want); 1084 dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
1021 dout("prepare_pages tid %llu has %d pages, want %d\n",
1022 tid, req->r_num_pages, want); 1085 tid, req->r_num_pages, want);
1023 if (unlikely(req->r_num_pages < want)) 1086 if (unlikely(req->r_num_pages < want))
1024 goto out; 1087 goto out;
1025
1026 if (req->r_con_filling_pages) {
1027 dout("revoking pages %p from old con %p\n", req->r_pages,
1028 req->r_con_filling_pages);
1029 ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
1030 ceph_con_put(req->r_con_filling_pages);
1031 }
1032 req->r_con_filling_pages = ceph_con_get(con);
1033 req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
1034 m->pages = req->r_pages; 1088 m->pages = req->r_pages;
1035 m->nr_pages = req->r_num_pages; 1089 m->nr_pages = req->r_num_pages;
1036 ret = 0; /* success */ 1090 ret = 0; /* success */
@@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1164 err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); 1218 err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
1165 if (err < 0) 1219 if (err < 0)
1166 goto out_mempool; 1220 goto out_mempool;
1167 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
1168 if (err < 0)
1169 goto out_msgpool;
1170 return 0; 1221 return 0;
1171 1222
1172out_msgpool:
1173 ceph_msgpool_destroy(&osdc->msgpool_op);
1174out_mempool: 1223out_mempool:
1175 mempool_destroy(osdc->req_mempool); 1224 mempool_destroy(osdc->req_mempool);
1176out: 1225out:
@@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1186 } 1235 }
1187 mempool_destroy(osdc->req_mempool); 1236 mempool_destroy(osdc->req_mempool);
1188 ceph_msgpool_destroy(&osdc->msgpool_op); 1237 ceph_msgpool_destroy(&osdc->msgpool_op);
1189 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1190} 1238}
1191 1239
1192/* 1240/*
@@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1323 if (!req) { 1371 if (!req) {
1324 *skip = 1; 1372 *skip = 1;
1325 m = NULL; 1373 m = NULL;
1326 dout("prepare_pages unknown tid %llu\n", tid); 1374 dout("alloc_msg unknown tid %llu\n", tid);
1327 goto out; 1375 goto out;
1328 } 1376 }
1329 m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); 1377 m = __get_next_reply(con, req, front);
1330 if (!m) { 1378 if (!m || IS_ERR(m)) {
1331 *skip = 1; 1379 *skip = 1;
1332 goto out; 1380 goto out;
1333 } 1381 }
1334 1382
1335 if (data_len > 0) { 1383 if (data_len > 0) {
1336 err = prepare_pages(con, hdr, req, tid, m); 1384 err = __prepare_pages(con, hdr, req, tid, m);
1337 if (err < 0) { 1385 if (err < 0) {
1338 *skip = 1; 1386 *skip = 1;
1339 ceph_msg_put(m); 1387 ceph_msg_put(m);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 4162c6810a8f..8d533d9406ff 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -44,7 +44,7 @@ struct ceph_osd_request {
44 struct ceph_osd *r_osd; 44 struct ceph_osd *r_osd;
45 struct ceph_pg r_pgid; 45 struct ceph_pg r_pgid;
46 46
47 struct ceph_connection *r_con_filling_pages; 47 struct ceph_connection *r_con_filling_msg;
48 48
49 struct ceph_msg *r_request, *r_reply; 49 struct ceph_msg *r_request, *r_reply;
50 int r_result; 50 int r_result;
@@ -75,6 +75,9 @@ struct ceph_osd_request {
75 struct page **r_pages; /* pages for data payload */ 75 struct page **r_pages; /* pages for data payload */
76 int r_pages_from_pool; 76 int r_pages_from_pool;
77 int r_own_pages; /* if true, i own page list */ 77 int r_own_pages; /* if true, i own page list */
78
79 struct ceph_msg *replies[2];
80 int cur_reply;
78}; 81};
79 82
80struct ceph_osd_client { 83struct ceph_osd_client {
@@ -98,8 +101,7 @@ struct ceph_osd_client {
98 101
99 mempool_t *req_mempool; 102 mempool_t *req_mempool;
100 103
101 struct ceph_msgpool msgpool_op; 104 struct ceph_msgpool msgpool_op;
102 struct ceph_msgpool msgpool_op_reply;
103}; 105};
104 106
105extern int ceph_osdc_init(struct ceph_osd_client *osdc, 107extern int ceph_osdc_init(struct ceph_osd_client *osdc,