diff options
-rw-r--r-- | fs/ceph/messenger.c | 29 | ||||
-rw-r--r-- | fs/ceph/messenger.h | 2 | ||||
-rw-r--r-- | fs/ceph/osd_client.c | 42 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 4 |
4 files changed, 67 insertions, 10 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index c03b4185c143..506b638a023b 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c | |||
@@ -1976,6 +1976,35 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | |||
1976 | } | 1976 | } |
1977 | 1977 | ||
1978 | /* | 1978 | /* |
1979 | * Revoke a page vector that we may be reading data into | ||
1980 | */ | ||
1981 | void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages) | ||
1982 | { | ||
1983 | mutex_lock(&con->mutex); | ||
1984 | if (con->in_msg && con->in_msg->pages == pages) { | ||
1985 | unsigned data_len = le32_to_cpu(con->in_hdr.data_len); | ||
1986 | |||
1987 | /* skip rest of message */ | ||
1988 | dout("con_revoke_pages %p msg %p pages %p revoked\n", con, | ||
1989 | con->in_msg, pages); | ||
1990 | if (con->in_msg_pos.data_pos < data_len) | ||
1991 | con->in_base_pos = con->in_msg_pos.data_pos - data_len; | ||
1992 | else | ||
1993 | con->in_base_pos = con->in_base_pos - | ||
1994 | sizeof(struct ceph_msg_header) - | ||
1995 | sizeof(struct ceph_msg_footer); | ||
1996 | con->in_msg->pages = NULL; | ||
1997 | ceph_msg_put(con->in_msg); | ||
1998 | con->in_msg = NULL; | ||
1999 | con->in_tag = CEPH_MSGR_TAG_READY; | ||
2000 | } else { | ||
2001 | dout("con_revoke_pages %p msg %p pages %p no-op\n", | ||
2002 | con, con->in_msg, pages); | ||
2003 | } | ||
2004 | mutex_unlock(&con->mutex); | ||
2005 | } | ||
2006 | |||
2007 | /* | ||
1979 | * Queue a keepalive byte to ensure the tcp connection is alive. | 2008 | * Queue a keepalive byte to ensure the tcp connection is alive. |
1980 | */ | 2009 | */ |
1981 | void ceph_con_keepalive(struct ceph_connection *con) | 2010 | void ceph_con_keepalive(struct ceph_connection *con) |
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index 94b55de90331..7e2aab1d3ce2 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h | |||
@@ -230,6 +230,8 @@ extern void ceph_con_open(struct ceph_connection *con, | |||
230 | extern void ceph_con_close(struct ceph_connection *con); | 230 | extern void ceph_con_close(struct ceph_connection *con); |
231 | extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); | 231 | extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); |
232 | extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); | 232 | extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); |
233 | extern void ceph_con_revoke_pages(struct ceph_connection *con, | ||
234 | struct page **pages); | ||
233 | extern void ceph_con_keepalive(struct ceph_connection *con); | 235 | extern void ceph_con_keepalive(struct ceph_connection *con); |
234 | extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); | 236 | extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); |
235 | extern void ceph_con_put(struct ceph_connection *con); | 237 | extern void ceph_con_put(struct ceph_connection *con); |
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index a1800fb63237..374f0013956c 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c | |||
@@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref) | |||
87 | ceph_msg_put(req->r_request); | 87 | ceph_msg_put(req->r_request); |
88 | if (req->r_reply) | 88 | if (req->r_reply) |
89 | ceph_msg_put(req->r_reply); | 89 | ceph_msg_put(req->r_reply); |
90 | if (req->r_con_filling_pages) { | ||
91 | dout("release_request revoking pages %p from con %p\n", | ||
92 | req->r_pages, req->r_con_filling_pages); | ||
93 | ceph_con_revoke_pages(req->r_con_filling_pages, | ||
94 | req->r_pages); | ||
95 | ceph_con_put(req->r_con_filling_pages); | ||
96 | } | ||
90 | if (req->r_own_pages) | 97 | if (req->r_own_pages) |
91 | ceph_release_page_vector(req->r_pages, | 98 | ceph_release_page_vector(req->r_pages, |
92 | req->r_num_pages); | 99 | req->r_num_pages); |
@@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work) | |||
687 | * handle osd op reply. either call the callback if it is specified, | 694 | * handle osd op reply. either call the callback if it is specified, |
688 | * or do the completion to wake up the waiting thread. | 695 | * or do the completion to wake up the waiting thread. |
689 | */ | 696 | */ |
690 | static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | 697 | static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, |
698 | struct ceph_connection *con) | ||
691 | { | 699 | { |
692 | struct ceph_osd_reply_head *rhead = msg->front.iov_base; | 700 | struct ceph_osd_reply_head *rhead = msg->front.iov_base; |
693 | struct ceph_osd_request *req; | 701 | struct ceph_osd_request *req; |
@@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) | |||
715 | ceph_osdc_get_request(req); | 723 | ceph_osdc_get_request(req); |
716 | flags = le32_to_cpu(rhead->flags); | 724 | flags = le32_to_cpu(rhead->flags); |
717 | 725 | ||
726 | /* | ||
727 | * if this connection filled our pages, drop our reference now, to | ||
728 | * avoid a (safe but slower) revoke later. | ||
729 | */ | ||
730 | if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { | ||
731 | dout(" got pages, dropping con_filling_pages ref %p\n", con); | ||
732 | req->r_con_filling_pages = NULL; | ||
733 | ceph_con_put(con); | ||
734 | } | ||
735 | |||
718 | if (req->r_reply) { | 736 | if (req->r_reply) { |
719 | /* | 737 | /* |
720 | * once we see the message has been received, we don't | 738 | * once we see the message has been received, we don't |
@@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, | |||
1007 | } | 1025 | } |
1008 | dout("prepare_pages tid %llu has %d pages, want %d\n", | 1026 | dout("prepare_pages tid %llu has %d pages, want %d\n", |
1009 | tid, req->r_num_pages, want); | 1027 | tid, req->r_num_pages, want); |
1010 | if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) { | 1028 | if (unlikely(req->r_num_pages < want)) |
1011 | m->pages = req->r_pages; | 1029 | goto out; |
1012 | m->nr_pages = req->r_num_pages; | 1030 | |
1013 | req->r_reply = m; /* only for duration of read over socket */ | 1031 | if (req->r_con_filling_pages) { |
1014 | ceph_msg_get(m); | 1032 | dout("revoking pages %p from old con %p\n", req->r_pages, |
1015 | req->r_prepared_pages = 1; | 1033 | req->r_con_filling_pages); |
1016 | ret = 0; /* success */ | 1034 | ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages); |
1035 | ceph_con_put(req->r_con_filling_pages); | ||
1017 | } | 1036 | } |
1037 | req->r_con_filling_pages = ceph_con_get(con); | ||
1038 | req->r_reply = ceph_msg_get(m); /* for duration of read over socket */ | ||
1039 | m->pages = req->r_pages; | ||
1040 | m->nr_pages = req->r_num_pages; | ||
1041 | ret = 0; /* success */ | ||
1018 | out: | 1042 | out: |
1019 | mutex_unlock(&osdc->request_mutex); | 1043 | mutex_unlock(&osdc->request_mutex); |
1020 | return ret; | 1044 | return ret; |
@@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) | |||
1269 | ceph_osdc_handle_map(osdc, msg); | 1293 | ceph_osdc_handle_map(osdc, msg); |
1270 | break; | 1294 | break; |
1271 | case CEPH_MSG_OSD_OPREPLY: | 1295 | case CEPH_MSG_OSD_OPREPLY: |
1272 | handle_reply(osdc, msg); | 1296 | handle_reply(osdc, msg, con); |
1273 | break; | 1297 | break; |
1274 | 1298 | ||
1275 | default: | 1299 | default: |
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index 2e4cfd1e9f10..8fef71cc4457 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h | |||
@@ -43,11 +43,13 @@ struct ceph_osd_request { | |||
43 | struct list_head r_osd_item; | 43 | struct list_head r_osd_item; |
44 | struct ceph_osd *r_osd; | 44 | struct ceph_osd *r_osd; |
45 | 45 | ||
46 | struct ceph_connection *r_con_filling_pages; | ||
47 | |||
46 | struct ceph_msg *r_request, *r_reply; | 48 | struct ceph_msg *r_request, *r_reply; |
47 | int r_result; | 49 | int r_result; |
48 | int r_flags; /* any additional flags for the osd */ | 50 | int r_flags; /* any additional flags for the osd */ |
49 | u32 r_sent; /* >0 if r_request is sending/sent */ | 51 | u32 r_sent; /* >0 if r_request is sending/sent */ |
50 | int r_prepared_pages, r_got_reply; | 52 | int r_got_reply; |
51 | int r_num_prealloc_reply; | 53 | int r_num_prealloc_reply; |
52 | 54 | ||
53 | struct ceph_osd_client *r_osdc; | 55 | struct ceph_osd_client *r_osdc; |