diff options
author | Sage Weil <sage@newdream.net> | 2009-12-22 13:45:45 -0500 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2009-12-23 11:17:20 -0500 |
commit | 350b1c32ea58d29e25d63fc25e92dd48f9339546 (patch) | |
tree | 4b9e8b06de8fd7e07c2292307e9a67c121f1e079 /fs/ceph/osd_client.c | |
parent | ec302645f4a9bd9ec757c30d185557e1c0972c1a (diff) |
ceph: control access to page vector for incoming data
When we issue an OSD read, we specify a vector of pages that the data is to
be read into. The request may be sent multiple times, to multiple OSDs, if
the osdmap changes, which means we can get more than one reply.
Only read data into the page vector if the reply is coming from the
OSD we last sent the request to. Keep track of which connection is using
the vector by taking a reference. If another connection was already
using the vector before and a new reply comes in on the right connection,
revoke the pages from the other connection.
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r-- | fs/ceph/osd_client.c | 42 |
1 files changed, 33 insertions, 9 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index a1800fb63237..374f0013956c 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c | |||
@@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref) | |||
87 | ceph_msg_put(req->r_request); | 87 | ceph_msg_put(req->r_request); |
88 | if (req->r_reply) | 88 | if (req->r_reply) |
89 | ceph_msg_put(req->r_reply); | 89 | ceph_msg_put(req->r_reply); |
90 | if (req->r_con_filling_pages) { | ||
91 | dout("release_request revoking pages %p from con %p\n", | ||
92 | req->r_pages, req->r_con_filling_pages); | ||
93 | ceph_con_revoke_pages(req->r_con_filling_pages, | ||
94 | req->r_pages); | ||
95 | ceph_con_put(req->r_con_filling_pages); | ||
96 | } | ||
90 | if (req->r_own_pages) | 97 | if (req->r_own_pages) |
91 | ceph_release_page_vector(req->r_pages, | 98 | ceph_release_page_vector(req->r_pages, |
92 | req->r_num_pages); | 99 | req->r_num_pages); |
@@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work) | |||
687 | * handle osd op reply. either call the callback if it is specified, | 694 | * handle osd op reply. either call the callback if it is specified, |
688 | * or do the completion to wake up the waiting thread. | 695 | * or do the completion to wake up the waiting thread. |
689 | */ | 696 | */ |
690 | static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | 697 | static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, |
698 | struct ceph_connection *con) | ||
691 | { | 699 | { |
692 | struct ceph_osd_reply_head *rhead = msg->front.iov_base; | 700 | struct ceph_osd_reply_head *rhead = msg->front.iov_base; |
693 | struct ceph_osd_request *req; | 701 | struct ceph_osd_request *req; |
@@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | |||
715 | ceph_osdc_get_request(req); | 723 | ceph_osdc_get_request(req); |
716 | flags = le32_to_cpu(rhead->flags); | 724 | flags = le32_to_cpu(rhead->flags); |
717 | 725 | ||
726 | /* | ||
727 | * if this connection filled our pages, drop our reference now, to | ||
728 | * avoid a (safe but slower) revoke later. | ||
729 | */ | ||
730 | if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { | ||
731 | dout(" got pages, dropping con_filling_pages ref %p\n", con); | ||
732 | req->r_con_filling_pages = NULL; | ||
733 | ceph_con_put(con); | ||
734 | } | ||
735 | |||
718 | if (req->r_reply) { | 736 | if (req->r_reply) { |
719 | /* | 737 | /* |
720 | * once we see the message has been received, we don't | 738 | * once we see the message has been received, we don't |
@@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, | |||
1007 | } | 1025 | } |
1008 | dout("prepare_pages tid %llu has %d pages, want %d\n", | 1026 | dout("prepare_pages tid %llu has %d pages, want %d\n", |
1009 | tid, req->r_num_pages, want); | 1027 | tid, req->r_num_pages, want); |
1010 | if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) { | 1028 | if (unlikely(req->r_num_pages < want)) |
1011 | m->pages = req->r_pages; | 1029 | goto out; |
1012 | m->nr_pages = req->r_num_pages; | 1030 | |
1013 | req->r_reply = m; /* only for duration of read over socket */ | 1031 | if (req->r_con_filling_pages) { |
1014 | ceph_msg_get(m); | 1032 | dout("revoking pages %p from old con %p\n", req->r_pages, |
1015 | req->r_prepared_pages = 1; | 1033 | req->r_con_filling_pages); |
1016 | ret = 0; /* success */ | 1034 | ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages); |
1035 | ceph_con_put(req->r_con_filling_pages); | ||
1017 | } | 1036 | } |
1037 | req->r_con_filling_pages = ceph_con_get(con); | ||
1038 | req->r_reply = ceph_msg_get(m); /* for duration of read over socket */ | ||
1039 | m->pages = req->r_pages; | ||
1040 | m->nr_pages = req->r_num_pages; | ||
1041 | ret = 0; /* success */ | ||
1018 | out: | 1042 | out: |
1019 | mutex_unlock(&osdc->request_mutex); | 1043 | mutex_unlock(&osdc->request_mutex); |
1020 | return ret; | 1044 | return ret; |
@@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) | |||
1269 | ceph_osdc_handle_map(osdc, msg); | 1293 | ceph_osdc_handle_map(osdc, msg); |
1270 | break; | 1294 | break; |
1271 | case CEPH_MSG_OSD_OPREPLY: | 1295 | case CEPH_MSG_OSD_OPREPLY: |
1272 | handle_reply(osdc, msg); | 1296 | handle_reply(osdc, msg, con); |
1273 | break; | 1297 | break; |
1274 | 1298 | ||
1275 | default: | 1299 | default: |