aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/messenger.c29
-rw-r--r--fs/ceph/messenger.h2
-rw-r--r--fs/ceph/osd_client.c42
-rw-r--r--fs/ceph/osd_client.h4
4 files changed, 67 insertions, 10 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index c03b4185c143..506b638a023b 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -1976,6 +1976,35 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1976} 1976}
1977 1977
1978/* 1978/*
1979 * Revoke a page vector that we may be reading data into
1980 */
1981void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
1982{
1983 mutex_lock(&con->mutex);
1984 if (con->in_msg && con->in_msg->pages == pages) {
1985 unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
1986
1987 /* skip rest of message */
1988 dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
1989 con->in_msg, pages);
1990 if (con->in_msg_pos.data_pos < data_len)
1991 con->in_base_pos = con->in_msg_pos.data_pos - data_len;
1992 else
1993 con->in_base_pos = con->in_base_pos -
1994 sizeof(struct ceph_msg_header) -
1995 sizeof(struct ceph_msg_footer);
1996 con->in_msg->pages = NULL;
1997 ceph_msg_put(con->in_msg);
1998 con->in_msg = NULL;
1999 con->in_tag = CEPH_MSGR_TAG_READY;
2000 } else {
2001 dout("con_revoke_pages %p msg %p pages %p no-op\n",
2002 con, con->in_msg, pages);
2003 }
2004 mutex_unlock(&con->mutex);
2005}
2006
2007/*
1979 * Queue a keepalive byte to ensure the tcp connection is alive. 2008 * Queue a keepalive byte to ensure the tcp connection is alive.
1980 */ 2009 */
1981void ceph_con_keepalive(struct ceph_connection *con) 2010void ceph_con_keepalive(struct ceph_connection *con)
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 94b55de90331..7e2aab1d3ce2 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -230,6 +230,8 @@ extern void ceph_con_open(struct ceph_connection *con,
230extern void ceph_con_close(struct ceph_connection *con); 230extern void ceph_con_close(struct ceph_connection *con);
231extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); 231extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
232extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); 232extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
233extern void ceph_con_revoke_pages(struct ceph_connection *con,
234 struct page **pages);
233extern void ceph_con_keepalive(struct ceph_connection *con); 235extern void ceph_con_keepalive(struct ceph_connection *con);
234extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); 236extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
235extern void ceph_con_put(struct ceph_connection *con); 237extern void ceph_con_put(struct ceph_connection *con);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index a1800fb63237..374f0013956c 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref)
87 ceph_msg_put(req->r_request); 87 ceph_msg_put(req->r_request);
88 if (req->r_reply) 88 if (req->r_reply)
89 ceph_msg_put(req->r_reply); 89 ceph_msg_put(req->r_reply);
90 if (req->r_con_filling_pages) {
91 dout("release_request revoking pages %p from con %p\n",
92 req->r_pages, req->r_con_filling_pages);
93 ceph_con_revoke_pages(req->r_con_filling_pages,
94 req->r_pages);
95 ceph_con_put(req->r_con_filling_pages);
96 }
90 if (req->r_own_pages) 97 if (req->r_own_pages)
91 ceph_release_page_vector(req->r_pages, 98 ceph_release_page_vector(req->r_pages,
92 req->r_num_pages); 99 req->r_num_pages);
@@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work)
687 * handle osd op reply. either call the callback if it is specified, 694 * handle osd op reply. either call the callback if it is specified,
688 * or do the completion to wake up the waiting thread. 695 * or do the completion to wake up the waiting thread.
689 */ 696 */
690static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) 697static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
698 struct ceph_connection *con)
691{ 699{
692 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 700 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
693 struct ceph_osd_request *req; 701 struct ceph_osd_request *req;
@@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
715 ceph_osdc_get_request(req); 723 ceph_osdc_get_request(req);
716 flags = le32_to_cpu(rhead->flags); 724 flags = le32_to_cpu(rhead->flags);
717 725
726 /*
727 * if this connection filled our pages, drop our reference now, to
728 * avoid a (safe but slower) revoke later.
729 */
730 if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
731 dout(" got pages, dropping con_filling_pages ref %p\n", con);
732 req->r_con_filling_pages = NULL;
733 ceph_con_put(con);
734 }
735
718 if (req->r_reply) { 736 if (req->r_reply) {
719 /* 737 /*
720 * once we see the message has been received, we don't 738 * once we see the message has been received, we don't
@@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1007 } 1025 }
1008 dout("prepare_pages tid %llu has %d pages, want %d\n", 1026 dout("prepare_pages tid %llu has %d pages, want %d\n",
1009 tid, req->r_num_pages, want); 1027 tid, req->r_num_pages, want);
1010 if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) { 1028 if (unlikely(req->r_num_pages < want))
1011 m->pages = req->r_pages; 1029 goto out;
1012 m->nr_pages = req->r_num_pages; 1030
1013 req->r_reply = m; /* only for duration of read over socket */ 1031 if (req->r_con_filling_pages) {
1014 ceph_msg_get(m); 1032 dout("revoking pages %p from old con %p\n", req->r_pages,
1015 req->r_prepared_pages = 1; 1033 req->r_con_filling_pages);
1016 ret = 0; /* success */ 1034 ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
1035 ceph_con_put(req->r_con_filling_pages);
1017 } 1036 }
1037 req->r_con_filling_pages = ceph_con_get(con);
1038 req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
1039 m->pages = req->r_pages;
1040 m->nr_pages = req->r_num_pages;
1041 ret = 0; /* success */
1018out: 1042out:
1019 mutex_unlock(&osdc->request_mutex); 1043 mutex_unlock(&osdc->request_mutex);
1020 return ret; 1044 return ret;
@@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1269 ceph_osdc_handle_map(osdc, msg); 1293 ceph_osdc_handle_map(osdc, msg);
1270 break; 1294 break;
1271 case CEPH_MSG_OSD_OPREPLY: 1295 case CEPH_MSG_OSD_OPREPLY:
1272 handle_reply(osdc, msg); 1296 handle_reply(osdc, msg, con);
1273 break; 1297 break;
1274 1298
1275 default: 1299 default:
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 2e4cfd1e9f10..8fef71cc4457 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -43,11 +43,13 @@ struct ceph_osd_request {
43 struct list_head r_osd_item; 43 struct list_head r_osd_item;
44 struct ceph_osd *r_osd; 44 struct ceph_osd *r_osd;
45 45
46 struct ceph_connection *r_con_filling_pages;
47
46 struct ceph_msg *r_request, *r_reply; 48 struct ceph_msg *r_request, *r_reply;
47 int r_result; 49 int r_result;
48 int r_flags; /* any additional flags for the osd */ 50 int r_flags; /* any additional flags for the osd */
49 u32 r_sent; /* >0 if r_request is sending/sent */ 51 u32 r_sent; /* >0 if r_request is sending/sent */
50 int r_prepared_pages, r_got_reply; 52 int r_got_reply;
51 int r_num_prealloc_reply; 53 int r_num_prealloc_reply;
52 54
53 struct ceph_osd_client *r_osdc; 55 struct ceph_osd_client *r_osdc;