aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-12-22 13:45:45 -0500
committerSage Weil <sage@newdream.net>2009-12-23 11:17:20 -0500
commit350b1c32ea58d29e25d63fc25e92dd48f9339546 (patch)
tree4b9e8b06de8fd7e07c2292307e9a67c121f1e079
parentec302645f4a9bd9ec757c30d185557e1c0972c1a (diff)
ceph: control access to page vector for incoming data
When we issue an OSD read, we specify a vector of pages that the data is to be read into. The request may be sent multiple times, to multiple OSDs, if the osdmap changes, which means we can get more than one reply. Only read data into the page vector if the reply is coming from the OSD we last sent the request to. Keep track of which connection is using the vector by taking a reference. If another connection was already using the vector before and a new reply comes in on the right connection, revoke the pages from the other connection. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/messenger.c29
-rw-r--r--fs/ceph/messenger.h2
-rw-r--r--fs/ceph/osd_client.c42
-rw-r--r--fs/ceph/osd_client.h4
4 files changed, 67 insertions, 10 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index c03b4185c143..506b638a023b 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -1976,6 +1976,35 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1976} 1976}
1977 1977
1978/* 1978/*
1979 * Revoke a page vector that we may be reading data into
1980 */
1981void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
1982{
1983 mutex_lock(&con->mutex);
1984 if (con->in_msg && con->in_msg->pages == pages) {
1985 unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
1986
1987 /* skip rest of message */
1988 dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
1989 con->in_msg, pages);
1990 if (con->in_msg_pos.data_pos < data_len)
1991 con->in_base_pos = con->in_msg_pos.data_pos - data_len;
1992 else
1993 con->in_base_pos = con->in_base_pos -
1994 sizeof(struct ceph_msg_header) -
1995 sizeof(struct ceph_msg_footer);
1996 con->in_msg->pages = NULL;
1997 ceph_msg_put(con->in_msg);
1998 con->in_msg = NULL;
1999 con->in_tag = CEPH_MSGR_TAG_READY;
2000 } else {
2001 dout("con_revoke_pages %p msg %p pages %p no-op\n",
2002 con, con->in_msg, pages);
2003 }
2004 mutex_unlock(&con->mutex);
2005}
2006
2007/*
1979 * Queue a keepalive byte to ensure the tcp connection is alive. 2008 * Queue a keepalive byte to ensure the tcp connection is alive.
1980 */ 2009 */
1981void ceph_con_keepalive(struct ceph_connection *con) 2010void ceph_con_keepalive(struct ceph_connection *con)
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 94b55de90331..7e2aab1d3ce2 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -230,6 +230,8 @@ extern void ceph_con_open(struct ceph_connection *con,
230extern void ceph_con_close(struct ceph_connection *con); 230extern void ceph_con_close(struct ceph_connection *con);
231extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); 231extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
232extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); 232extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
233extern void ceph_con_revoke_pages(struct ceph_connection *con,
234 struct page **pages);
233extern void ceph_con_keepalive(struct ceph_connection *con); 235extern void ceph_con_keepalive(struct ceph_connection *con);
234extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); 236extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
235extern void ceph_con_put(struct ceph_connection *con); 237extern void ceph_con_put(struct ceph_connection *con);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index a1800fb63237..374f0013956c 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref)
87 ceph_msg_put(req->r_request); 87 ceph_msg_put(req->r_request);
88 if (req->r_reply) 88 if (req->r_reply)
89 ceph_msg_put(req->r_reply); 89 ceph_msg_put(req->r_reply);
90 if (req->r_con_filling_pages) {
91 dout("release_request revoking pages %p from con %p\n",
92 req->r_pages, req->r_con_filling_pages);
93 ceph_con_revoke_pages(req->r_con_filling_pages,
94 req->r_pages);
95 ceph_con_put(req->r_con_filling_pages);
96 }
90 if (req->r_own_pages) 97 if (req->r_own_pages)
91 ceph_release_page_vector(req->r_pages, 98 ceph_release_page_vector(req->r_pages,
92 req->r_num_pages); 99 req->r_num_pages);
@@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work)
687 * handle osd op reply. either call the callback if it is specified, 694 * handle osd op reply. either call the callback if it is specified,
688 * or do the completion to wake up the waiting thread. 695 * or do the completion to wake up the waiting thread.
689 */ 696 */
690static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) 697static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
698 struct ceph_connection *con)
691{ 699{
692 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 700 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
693 struct ceph_osd_request *req; 701 struct ceph_osd_request *req;
@@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
715 ceph_osdc_get_request(req); 723 ceph_osdc_get_request(req);
716 flags = le32_to_cpu(rhead->flags); 724 flags = le32_to_cpu(rhead->flags);
717 725
726 /*
727 * if this connection filled our pages, drop our reference now, to
728 * avoid a (safe but slower) revoke later.
729 */
730 if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
731 dout(" got pages, dropping con_filling_pages ref %p\n", con);
732 req->r_con_filling_pages = NULL;
733 ceph_con_put(con);
734 }
735
718 if (req->r_reply) { 736 if (req->r_reply) {
719 /* 737 /*
720 * once we see the message has been received, we don't 738 * once we see the message has been received, we don't
@@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1007 } 1025 }
1008 dout("prepare_pages tid %llu has %d pages, want %d\n", 1026 dout("prepare_pages tid %llu has %d pages, want %d\n",
1009 tid, req->r_num_pages, want); 1027 tid, req->r_num_pages, want);
1010 if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) { 1028 if (unlikely(req->r_num_pages < want))
1011 m->pages = req->r_pages; 1029 goto out;
1012 m->nr_pages = req->r_num_pages; 1030
1013 req->r_reply = m; /* only for duration of read over socket */ 1031 if (req->r_con_filling_pages) {
1014 ceph_msg_get(m); 1032 dout("revoking pages %p from old con %p\n", req->r_pages,
1015 req->r_prepared_pages = 1; 1033 req->r_con_filling_pages);
1016 ret = 0; /* success */ 1034 ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
1035 ceph_con_put(req->r_con_filling_pages);
1017 } 1036 }
1037 req->r_con_filling_pages = ceph_con_get(con);
1038 req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
1039 m->pages = req->r_pages;
1040 m->nr_pages = req->r_num_pages;
1041 ret = 0; /* success */
1018out: 1042out:
1019 mutex_unlock(&osdc->request_mutex); 1043 mutex_unlock(&osdc->request_mutex);
1020 return ret; 1044 return ret;
@@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1269 ceph_osdc_handle_map(osdc, msg); 1293 ceph_osdc_handle_map(osdc, msg);
1270 break; 1294 break;
1271 case CEPH_MSG_OSD_OPREPLY: 1295 case CEPH_MSG_OSD_OPREPLY:
1272 handle_reply(osdc, msg); 1296 handle_reply(osdc, msg, con);
1273 break; 1297 break;
1274 1298
1275 default: 1299 default:
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 2e4cfd1e9f10..8fef71cc4457 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -43,11 +43,13 @@ struct ceph_osd_request {
43 struct list_head r_osd_item; 43 struct list_head r_osd_item;
44 struct ceph_osd *r_osd; 44 struct ceph_osd *r_osd;
45 45
46 struct ceph_connection *r_con_filling_pages;
47
46 struct ceph_msg *r_request, *r_reply; 48 struct ceph_msg *r_request, *r_reply;
47 int r_result; 49 int r_result;
48 int r_flags; /* any additional flags for the osd */ 50 int r_flags; /* any additional flags for the osd */
49 u32 r_sent; /* >0 if r_request is sending/sent */ 51 u32 r_sent; /* >0 if r_request is sending/sent */
50 int r_prepared_pages, r_got_reply; 52 int r_got_reply;
51 int r_num_prealloc_reply; 53 int r_num_prealloc_reply;
52 54
53 struct ceph_osd_client *r_osdc; 55 struct ceph_osd_client *r_osdc;