diff options
author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2010-01-13 20:03:23 -0500 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2010-01-25 15:58:08 -0500 |
commit | 0d59ab81c3d3adf466c3fd37d7fb6d46b05d1fd4 (patch) | |
tree | 1cdf338188bc323379ced8d5e1bdec31f500768c /fs/ceph | |
parent | 0547a9b30a5ac8680325752b61d3ffa9d4971b6e (diff) |
ceph: keep reserved replies on the request structure
This includes treating all the data preallocation and revokation
at the same place, not having to have a special case for
the reserved pages.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/messenger.c | 20 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 4 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 118 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 8 |
4 files changed, 100 insertions, 50 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index f708803e6857..81bc779adb90 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -1985,30 +1985,30 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | |||
1985 | } | 1985 | } |
1986 | 1986 | ||
1987 | /* | 1987 | /* |
1988 | * Revoke a page vector that we may be reading data into | 1988 | * Revoke a message that we may be reading data into |
1989 | */ | 1989 | */ |
1990 | void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages) | 1990 | void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) |
1991 | { | 1991 | { |
1992 | mutex_lock(&con->mutex); | 1992 | mutex_lock(&con->mutex); |
1993 | if (con->in_msg && con->in_msg->pages == pages) { | 1993 | if (con->in_msg && con->in_msg == msg) { |
1994 | unsigned front_len = le32_to_cpu(con->in_hdr.front_len); | ||
1995 | unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); | ||
1994 | unsigned data_len = le32_to_cpu(con->in_hdr.data_len); | 1996 | unsigned data_len = le32_to_cpu(con->in_hdr.data_len); |
1995 | 1997 | ||
1996 | /* skip rest of message */ | 1998 | /* skip rest of message */ |
1997 | dout("con_revoke_pages %p msg %p pages %p revoked\n", con, | 1999 | dout("con_revoke_pages %p msg %p revoked\n", con, msg); |
1998 | con->in_msg, pages); | ||
1999 | if (con->in_msg_pos.data_pos < data_len) | ||
2000 | con->in_base_pos = con->in_msg_pos.data_pos - data_len; | ||
2001 | else | ||
2002 | con->in_base_pos = con->in_base_pos - | 2000 | con->in_base_pos = con->in_base_pos - |
2003 | sizeof(struct ceph_msg_header) - | 2001 | sizeof(struct ceph_msg_header) - |
2002 | front_len - | ||
2003 | middle_len - | ||
2004 | data_len - | ||
2004 | sizeof(struct ceph_msg_footer); | 2005 | sizeof(struct ceph_msg_footer); |
2005 | con->in_msg->pages = NULL; | ||
2006 | ceph_msg_put(con->in_msg); | 2006 | ceph_msg_put(con->in_msg); |
2007 | con->in_msg = NULL; | 2007 | con->in_msg = NULL; |
2008 | con->in_tag = CEPH_MSGR_TAG_READY; | 2008 | con->in_tag = CEPH_MSGR_TAG_READY; |
2009 | } else { | 2009 | } else { |
2010 | dout("con_revoke_pages %p msg %p pages %p no-op\n", | 2010 | dout("con_revoke_pages %p msg %p pages %p no-op\n", |
2011 | con, con->in_msg, pages); | 2011 | con, con->in_msg, msg); |
2012 | } | 2012 | } |
2013 | mutex_unlock(&con->mutex); | 2013 | mutex_unlock(&con->mutex); |
2014 | } | 2014 | } |
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index dca2d32b40de..c26a3d8aa78c 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h | |||
@@ -226,8 +226,8 @@ extern void ceph_con_open(struct ceph_connection *con, | |||
226 | extern void ceph_con_close(struct ceph_connection *con); | 226 | extern void ceph_con_close(struct ceph_connection *con); |
227 | extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); | 227 | extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); |
228 | extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); | 228 | extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); |
229 | extern void ceph_con_revoke_pages(struct ceph_connection *con, | 229 | extern void ceph_con_revoke_message(struct ceph_connection *con, |
230 | struct page **pages); | 230 | struct ceph_msg *msg); |
231 | extern void ceph_con_keepalive(struct ceph_connection *con); | 231 | extern void ceph_con_keepalive(struct ceph_connection *con); |
232 | extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); | 232 | extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); |
233 | extern void ceph_con_put(struct ceph_connection *con); | 233 | extern void ceph_con_put(struct ceph_connection *con); |
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index 44abe299c69f..df2106839713 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c | |||
@@ -13,6 +13,8 @@ | |||
13 | #include "decode.h" | 13 | #include "decode.h" |
14 | #include "auth.h" | 14 | #include "auth.h" |
15 | 15 | ||
16 | #define OSD_REPLY_RESERVE_FRONT_LEN 512 | ||
17 | |||
16 | const static struct ceph_connection_operations osd_con_ops; | 18 | const static struct ceph_connection_operations osd_con_ops; |
17 | 19 | ||
18 | static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); | 20 | static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); |
@@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc, | |||
73 | req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); | 75 | req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); |
74 | } | 76 | } |
75 | 77 | ||
78 | static void remove_replies(struct ceph_osd_request *req) | ||
79 | { | ||
80 | int i; | ||
81 | int max = ARRAY_SIZE(req->replies); | ||
82 | |||
83 | for (i=0; i<max; i++) { | ||
84 | if (req->replies[i]) | ||
85 | ceph_msg_put(req->replies[i]); | ||
86 | } | ||
87 | } | ||
76 | 88 | ||
77 | /* | 89 | /* |
78 | * requests | 90 | * requests |
@@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref) | |||
87 | ceph_msg_put(req->r_request); | 99 | ceph_msg_put(req->r_request); |
88 | if (req->r_reply) | 100 | if (req->r_reply) |
89 | ceph_msg_put(req->r_reply); | 101 | ceph_msg_put(req->r_reply); |
90 | if (req->r_con_filling_pages) { | 102 | remove_replies(req); |
103 | if (req->r_con_filling_msg) { | ||
91 | dout("release_request revoking pages %p from con %p\n", | 104 | dout("release_request revoking pages %p from con %p\n", |
92 | req->r_pages, req->r_con_filling_pages); | 105 | req->r_pages, req->r_con_filling_msg); |
93 | ceph_con_revoke_pages(req->r_con_filling_pages, | 106 | ceph_con_revoke_message(req->r_con_filling_msg, |
94 | req->r_pages); | 107 | req->r_reply); |
95 | ceph_con_put(req->r_con_filling_pages); | 108 | ceph_con_put(req->r_con_filling_msg); |
96 | } | 109 | } |
97 | if (req->r_own_pages) | 110 | if (req->r_own_pages) |
98 | ceph_release_page_vector(req->r_pages, | 111 | ceph_release_page_vector(req->r_pages, |
@@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref) | |||
104 | kfree(req); | 117 | kfree(req); |
105 | } | 118 | } |
106 | 119 | ||
120 | static int alloc_replies(struct ceph_osd_request *req, int num_reply) | ||
121 | { | ||
122 | int i; | ||
123 | int max = ARRAY_SIZE(req->replies); | ||
124 | |||
125 | BUG_ON(num_reply > max); | ||
126 | |||
127 | for (i=0; i<num_reply; i++) { | ||
128 | req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL); | ||
129 | if (IS_ERR(req->replies[i])) { | ||
130 | int j; | ||
131 | int err = PTR_ERR(req->replies[i]); | ||
132 | for (j = 0; j<=i; j++) { | ||
133 | ceph_msg_put(req->replies[j]); | ||
134 | } | ||
135 | return err; | ||
136 | } | ||
137 | } | ||
138 | |||
139 | for (; i<max; i++) { | ||
140 | req->replies[i] = NULL; | ||
141 | } | ||
142 | |||
143 | req->cur_reply = 0; | ||
144 | |||
145 | return 0; | ||
146 | } | ||
147 | |||
148 | static struct ceph_msg *__get_next_reply(struct ceph_connection *con, | ||
149 | struct ceph_osd_request *req, | ||
150 | int front_len) | ||
151 | { | ||
152 | struct ceph_msg *reply; | ||
153 | if (req->r_con_filling_msg) { | ||
154 | dout("revoking reply msg %p from old con %p\n", req->r_reply, | ||
155 | req->r_con_filling_msg); | ||
156 | ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); | ||
157 | ceph_con_put(req->r_con_filling_msg); | ||
158 | req->cur_reply = 0; | ||
159 | } | ||
160 | reply = req->replies[req->cur_reply]; | ||
161 | if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) { | ||
162 | /* maybe we can allocate it now? */ | ||
163 | reply = ceph_msg_new(0, front_len, 0, 0, NULL); | ||
164 | if (!reply || IS_ERR(reply)) { | ||
165 | pr_err(" reply alloc failed, front_len=%d\n", front_len); | ||
166 | return ERR_PTR(-ENOMEM); | ||
167 | } | ||
168 | } | ||
169 | req->r_con_filling_msg = ceph_con_get(con); | ||
170 | req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */ | ||
171 | return ceph_msg_get(reply); | ||
172 | } | ||
173 | |||
107 | /* | 174 | /* |
108 | * build new request AND message, calculate layout, and adjust file | 175 | * build new request AND message, calculate layout, and adjust file |
109 | * extent as needed. | 176 | * extent as needed. |
@@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, | |||
147 | if (req == NULL) | 214 | if (req == NULL) |
148 | return ERR_PTR(-ENOMEM); | 215 | return ERR_PTR(-ENOMEM); |
149 | 216 | ||
150 | err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply); | 217 | err = alloc_replies(req, num_reply); |
151 | if (err) { | 218 | if (err) { |
152 | ceph_osdc_put_request(req); | 219 | ceph_osdc_put_request(req); |
153 | return ERR_PTR(-ENOMEM); | 220 | return ERR_PTR(-ENOMEM); |
@@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, | |||
173 | else | 240 | else |
174 | msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); | 241 | msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); |
175 | if (IS_ERR(msg)) { | 242 | if (IS_ERR(msg)) { |
176 | ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply); | ||
177 | ceph_osdc_put_request(req); | 243 | ceph_osdc_put_request(req); |
178 | return ERR_PTR(PTR_ERR(msg)); | 244 | return ERR_PTR(PTR_ERR(msg)); |
179 | } | 245 | } |
@@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc, | |||
471 | rb_erase(&req->r_node, &osdc->requests); | 537 | rb_erase(&req->r_node, &osdc->requests); |
472 | osdc->num_requests--; | 538 | osdc->num_requests--; |
473 | 539 | ||
474 | ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply); | ||
475 | |||
476 | if (req->r_osd) { | 540 | if (req->r_osd) { |
477 | /* make sure the original request isn't in flight. */ | 541 | /* make sure the original request isn't in flight. */ |
478 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); | 542 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); |
@@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, | |||
724 | flags = le32_to_cpu(rhead->flags); | 788 | flags = le32_to_cpu(rhead->flags); |
725 | 789 | ||
726 | /* | 790 | /* |
727 | * if this connection filled our pages, drop our reference now, to | 791 | * if this connection filled our message, drop our reference now, to |
728 | * avoid a (safe but slower) revoke later. | 792 | * avoid a (safe but slower) revoke later. |
729 | */ | 793 | */ |
730 | if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { | 794 | if (req->r_con_filling_msg == con && req->r_reply == msg) { |
731 | dout(" got pages, dropping con_filling_pages ref %p\n", con); | 795 | dout(" got pages, dropping con_filling_msg ref %p\n", con); |
732 | req->r_con_filling_pages = NULL; | 796 | req->r_con_filling_msg = NULL; |
733 | ceph_con_put(con); | 797 | ceph_con_put(con); |
734 | } | 798 | } |
735 | 799 | ||
@@ -998,7 +1062,7 @@ bad: | |||
998 | * find those pages. | 1062 | * find those pages. |
999 | * 0 = success, -1 failure. | 1063 | * 0 = success, -1 failure. |
1000 | */ | 1064 | */ |
1001 | static int prepare_pages(struct ceph_connection *con, | 1065 | static int __prepare_pages(struct ceph_connection *con, |
1002 | struct ceph_msg_header *hdr, | 1066 | struct ceph_msg_header *hdr, |
1003 | struct ceph_osd_request *req, | 1067 | struct ceph_osd_request *req, |
1004 | u64 tid, | 1068 | u64 tid, |
@@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con, | |||
1017 | 1081 | ||
1018 | osdc = osd->o_osdc; | 1082 | osdc = osd->o_osdc; |
1019 | 1083 | ||
1020 | dout("prepare_pages on msg %p want %d\n", m, want); | 1084 | dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m, |
1021 | dout("prepare_pages tid %llu has %d pages, want %d\n", | ||
1022 | tid, req->r_num_pages, want); | 1085 | tid, req->r_num_pages, want); |
1023 | if (unlikely(req->r_num_pages < want)) | 1086 | if (unlikely(req->r_num_pages < want)) |
1024 | goto out; | 1087 | goto out; |
1025 | |||
1026 | if (req->r_con_filling_pages) { | ||
1027 | dout("revoking pages %p from old con %p\n", req->r_pages, | ||
1028 | req->r_con_filling_pages); | ||
1029 | ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages); | ||
1030 | ceph_con_put(req->r_con_filling_pages); | ||
1031 | } | ||
1032 | req->r_con_filling_pages = ceph_con_get(con); | ||
1033 | req->r_reply = ceph_msg_get(m); /* for duration of read over socket */ | ||
1034 | m->pages = req->r_pages; | 1088 | m->pages = req->r_pages; |
1035 | m->nr_pages = req->r_num_pages; | 1089 | m->nr_pages = req->r_num_pages; |
1036 | ret = 0; /* success */ | 1090 | ret = 0; /* success */ |
@@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) | |||
1164 | err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); | 1218 | err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); |
1165 | if (err < 0) | 1219 | if (err < 0) |
1166 | goto out_mempool; | 1220 | goto out_mempool; |
1167 | err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false); | ||
1168 | if (err < 0) | ||
1169 | goto out_msgpool; | ||
1170 | return 0; | 1221 | return 0; |
1171 | 1222 | ||
1172 | out_msgpool: | ||
1173 | ceph_msgpool_destroy(&osdc->msgpool_op); | ||
1174 | out_mempool: | 1223 | out_mempool: |
1175 | mempool_destroy(osdc->req_mempool); | 1224 | mempool_destroy(osdc->req_mempool); |
1176 | out: | 1225 | out: |
@@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) | |||
1186 | } | 1235 | } |
1187 | mempool_destroy(osdc->req_mempool); | 1236 | mempool_destroy(osdc->req_mempool); |
1188 | ceph_msgpool_destroy(&osdc->msgpool_op); | 1237 | ceph_msgpool_destroy(&osdc->msgpool_op); |
1189 | ceph_msgpool_destroy(&osdc->msgpool_op_reply); | ||
1190 | } | 1238 | } |
1191 | 1239 | ||
1192 | /* | 1240 | /* |
@@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, | |||
1323 | if (!req) { | 1371 | if (!req) { |
1324 | *skip = 1; | 1372 | *skip = 1; |
1325 | m = NULL; | 1373 | m = NULL; |
1326 | dout("prepare_pages unknown tid %llu\n", tid); | 1374 | dout("alloc_msg unknown tid %llu\n", tid); |
1327 | goto out; | 1375 | goto out; |
1328 | } | 1376 | } |
1329 | m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); | 1377 | m = __get_next_reply(con, req, front); |
1330 | if (!m) { | 1378 | if (!m || IS_ERR(m)) { |
1331 | *skip = 1; | 1379 | *skip = 1; |
1332 | goto out; | 1380 | goto out; |
1333 | } | 1381 | } |
1334 | 1382 | ||
1335 | if (data_len > 0) { | 1383 | if (data_len > 0) { |
1336 | err = prepare_pages(con, hdr, req, tid, m); | 1384 | err = __prepare_pages(con, hdr, req, tid, m); |
1337 | if (err < 0) { | 1385 | if (err < 0) { |
1338 | *skip = 1; | 1386 | *skip = 1; |
1339 | ceph_msg_put(m); | 1387 | ceph_msg_put(m); |
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index 4162c6810a8f..8d533d9406ff 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h | |||
@@ -44,7 +44,7 @@ struct ceph_osd_request { | |||
44 | struct ceph_osd *r_osd; | 44 | struct ceph_osd *r_osd; |
45 | struct ceph_pg r_pgid; | 45 | struct ceph_pg r_pgid; |
46 | 46 | ||
47 | struct ceph_connection *r_con_filling_pages; | 47 | struct ceph_connection *r_con_filling_msg; |
48 | 48 | ||
49 | struct ceph_msg *r_request, *r_reply; | 49 | struct ceph_msg *r_request, *r_reply; |
50 | int r_result; | 50 | int r_result; |
@@ -75,6 +75,9 @@ struct ceph_osd_request { | |||
75 | struct page **r_pages; /* pages for data payload */ | 75 | struct page **r_pages; /* pages for data payload */ |
76 | int r_pages_from_pool; | 76 | int r_pages_from_pool; |
77 | int r_own_pages; /* if true, i own page list */ | 77 | int r_own_pages; /* if true, i own page list */ |
78 | |||
79 | struct ceph_msg *replies[2]; | ||
80 | int cur_reply; | ||
78 | }; | 81 | }; |
79 | 82 | ||
80 | struct ceph_osd_client { | 83 | struct ceph_osd_client { |
@@ -98,8 +101,7 @@ struct ceph_osd_client { | |||
98 | 101 | ||
99 | mempool_t *req_mempool; | 102 | mempool_t *req_mempool; |
100 | 103 | ||
101 | struct ceph_msgpool msgpool_op; | 104 | struct ceph_msgpool msgpool_op; |
102 | struct ceph_msgpool msgpool_op_reply; | ||
103 | }; | 105 | }; |
104 | 106 | ||
105 | extern int ceph_osdc_init(struct ceph_osd_client *osdc, | 107 | extern int ceph_osdc_init(struct ceph_osd_client *osdc, |