aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-12-23 15:21:51 -0500
committerSage Weil <sage@newdream.net>2009-12-23 15:21:51 -0500
commit93cea5bebf91319095db866163a7e35c3e77d8f2 (patch)
tree31fbe2ddbcf2d510535b1eb5dd227bb13ce0d5d0
parent58bb3b374b07a2a43315213f00a48a5ffd6d0915 (diff)
ceph: use ceph_pagelist for mds reconnect message; change encoding (protocol change)
Use the ceph_pagelist to encode the MDS reconnect message. We change the message encoding (protocol change!) at the same time to make our life easier (we don't know how many snaprealms we have when we start encoding). An empty message implies the session is closed/does not exist. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/ceph_fs.h2
-rw-r--r--fs/ceph/mds_client.c156
2 files changed, 57 insertions, 101 deletions
diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h
index db3fed33c4aa..d0f2557bb41b 100644
--- a/fs/ceph/ceph_fs.h
+++ b/fs/ceph/ceph_fs.h
@@ -39,7 +39,7 @@
39#define CEPH_MDS_PROTOCOL 9 /* cluster internal */ 39#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
40#define CEPH_MON_PROTOCOL 5 /* cluster internal */ 40#define CEPH_MON_PROTOCOL 5 /* cluster internal */
41#define CEPH_OSDC_PROTOCOL 22 /* server/client */ 41#define CEPH_OSDC_PROTOCOL 22 /* server/client */
42#define CEPH_MDSC_PROTOCOL 30 /* server/client */ 42#define CEPH_MDSC_PROTOCOL 31 /* server/client */
43#define CEPH_MONC_PROTOCOL 15 /* server/client */ 43#define CEPH_MONC_PROTOCOL 15 /* server/client */
44 44
45 45
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index ec884e2845db..6e08f488a30f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -9,6 +9,7 @@
9#include "messenger.h" 9#include "messenger.h"
10#include "decode.h" 10#include "decode.h"
11#include "auth.h" 11#include "auth.h"
12#include "pagelist.h"
12 13
13/* 14/*
14 * A cluster of MDS (metadata server) daemons is responsible for 15 * A cluster of MDS (metadata server) daemons is responsible for
@@ -1971,20 +1972,12 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
1971/* 1972/*
1972 * Encode information about a cap for a reconnect with the MDS. 1973 * Encode information about a cap for a reconnect with the MDS.
1973 */ 1974 */
1974struct encode_caps_data {
1975 void **pp;
1976 void *end;
1977 int *num_caps;
1978};
1979
1980static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 1975static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
1981 void *arg) 1976 void *arg)
1982{ 1977{
1983 struct ceph_mds_cap_reconnect *rec; 1978 struct ceph_mds_cap_reconnect rec;
1984 struct ceph_inode_info *ci; 1979 struct ceph_inode_info *ci;
1985 struct encode_caps_data *data = (struct encode_caps_data *)arg; 1980 struct ceph_pagelist *pagelist = arg;
1986 void *p = *(data->pp);
1987 void *end = data->end;
1988 char *path; 1981 char *path;
1989 int pathlen, err; 1982 int pathlen, err;
1990 u64 pathbase; 1983 u64 pathbase;
@@ -1995,8 +1988,9 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
1995 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 1988 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
1996 inode, ceph_vinop(inode), cap, cap->cap_id, 1989 inode, ceph_vinop(inode), cap, cap->cap_id,
1997 ceph_cap_string(cap->issued)); 1990 ceph_cap_string(cap->issued));
1998 ceph_decode_need(&p, end, sizeof(u64), needmore); 1991 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
1999 ceph_encode_64(&p, ceph_ino(inode)); 1992 if (err)
1993 return err;
2000 1994
2001 dentry = d_find_alias(inode); 1995 dentry = d_find_alias(inode);
2002 if (dentry) { 1996 if (dentry) {
@@ -2009,33 +2003,29 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2009 path = NULL; 2003 path = NULL;
2010 pathlen = 0; 2004 pathlen = 0;
2011 } 2005 }
2012 ceph_decode_need(&p, end, pathlen+4, needmore); 2006 err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2013 ceph_encode_string(&p, end, path, pathlen); 2007 if (err)
2008 goto out;
2014 2009
2015 ceph_decode_need(&p, end, sizeof(*rec), needmore);
2016 rec = p;
2017 p += sizeof(*rec);
2018 BUG_ON(p > end);
2019 spin_lock(&inode->i_lock); 2010 spin_lock(&inode->i_lock);
2020 cap->seq = 0; /* reset cap seq */ 2011 cap->seq = 0; /* reset cap seq */
2021 cap->issue_seq = 0; /* and issue_seq */ 2012 cap->issue_seq = 0; /* and issue_seq */
2022 rec->cap_id = cpu_to_le64(cap->cap_id); 2013 rec.cap_id = cpu_to_le64(cap->cap_id);
2023 rec->pathbase = cpu_to_le64(pathbase); 2014 rec.pathbase = cpu_to_le64(pathbase);
2024 rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2015 rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2025 rec->issued = cpu_to_le32(cap->issued); 2016 rec.issued = cpu_to_le32(cap->issued);
2026 rec->size = cpu_to_le64(inode->i_size); 2017 rec.size = cpu_to_le64(inode->i_size);
2027 ceph_encode_timespec(&rec->mtime, &inode->i_mtime); 2018 ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
2028 ceph_encode_timespec(&rec->atime, &inode->i_atime); 2019 ceph_encode_timespec(&rec.atime, &inode->i_atime);
2029 rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2020 rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2030 spin_unlock(&inode->i_lock); 2021 spin_unlock(&inode->i_lock);
2031 2022
2023 err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
2024
2025out:
2032 kfree(path); 2026 kfree(path);
2033 dput(dentry); 2027 dput(dentry);
2034 (*data->num_caps)++; 2028 return err;
2035 *(data->pp) = p;
2036 return 0;
2037needmore:
2038 return -ENOSPC;
2039} 2029}
2040 2030
2041 2031
@@ -2053,19 +2043,26 @@ needmore:
2053 */ 2043 */
2054static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) 2044static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2055{ 2045{
2056 struct ceph_mds_session *session; 2046 struct ceph_mds_session *session = NULL;
2057 struct ceph_msg *reply; 2047 struct ceph_msg *reply;
2058 int newlen, len = 4 + 1;
2059 void *p, *end;
2060 int err; 2048 int err;
2061 int num_caps, num_realms = 0;
2062 int got; 2049 int got;
2063 u64 next_snap_ino = 0; 2050 u64 next_snap_ino = 0;
2064 __le32 *pnum_caps, *pnum_realms; 2051 struct ceph_pagelist *pagelist;
2065 struct encode_caps_data iter_args;
2066 2052
2067 pr_info("reconnect to recovering mds%d\n", mds); 2053 pr_info("reconnect to recovering mds%d\n", mds);
2068 2054
2055 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2056 if (!pagelist)
2057 goto fail_nopagelist;
2058 ceph_pagelist_init(pagelist);
2059
2060 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
2061 if (IS_ERR(reply)) {
2062 err = PTR_ERR(reply);
2063 goto fail_nomsg;
2064 }
2065
2069 /* find session */ 2066 /* find session */
2070 session = __ceph_lookup_mds_session(mdsc, mds); 2067 session = __ceph_lookup_mds_session(mdsc, mds);
2071 mutex_unlock(&mdsc->mutex); /* drop lock for duration */ 2068 mutex_unlock(&mdsc->mutex); /* drop lock for duration */
@@ -2081,12 +2078,6 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2081 2078
2082 /* replay unsafe requests */ 2079 /* replay unsafe requests */
2083 replay_unsafe_requests(mdsc, session); 2080 replay_unsafe_requests(mdsc, session);
2084
2085 /* estimate needed space */
2086 len += session->s_nr_caps *
2087 (100+sizeof(struct ceph_mds_cap_reconnect));
2088 pr_info("estimating i need %d bytes for %d caps\n",
2089 len, session->s_nr_caps);
2090 } else { 2081 } else {
2091 dout("no session for mds%d, will send short reconnect\n", 2082 dout("no session for mds%d, will send short reconnect\n",
2092 mds); 2083 mds);
@@ -2094,41 +2085,18 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2094 2085
2095 down_read(&mdsc->snap_rwsem); 2086 down_read(&mdsc->snap_rwsem);
2096 2087
2097retry: 2088 if (!session)
2098 /* build reply */
2099 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
2100 if (IS_ERR(reply)) {
2101 err = PTR_ERR(reply);
2102 pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
2103 len, mds);
2104 goto out;
2105 }
2106 p = reply->front.iov_base;
2107 end = p + len;
2108
2109 if (!session) {
2110 ceph_encode_8(&p, 1); /* session was closed */
2111 ceph_encode_32(&p, 0);
2112 goto send; 2089 goto send;
2113 }
2114 dout("session %p state %s\n", session, 2090 dout("session %p state %s\n", session,
2115 session_state_name(session->s_state)); 2091 session_state_name(session->s_state));
2116 2092
2117 /* traverse this session's caps */ 2093 /* traverse this session's caps */
2118 ceph_encode_8(&p, 0); 2094 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2119 pnum_caps = p; 2095 if (err)
2120 ceph_encode_32(&p, session->s_nr_caps); 2096 goto fail;
2121 num_caps = 0; 2097 err = iterate_session_caps(session, encode_caps_cb, pagelist);
2122
2123 iter_args.pp = &p;
2124 iter_args.end = end;
2125 iter_args.num_caps = &num_caps;
2126 err = iterate_session_caps(session, encode_caps_cb, &iter_args);
2127 if (err == -ENOSPC)
2128 goto needmore;
2129 if (err < 0) 2098 if (err < 0)
2130 goto out; 2099 goto out;
2131 *pnum_caps = cpu_to_le32(num_caps);
2132 2100
2133 /* 2101 /*
2134 * snaprealms. we provide mds with the ino, seq (version), and 2102 * snaprealms. we provide mds with the ino, seq (version), and
@@ -2136,14 +2104,9 @@ retry:
2136 * it will tell us. 2104 * it will tell us.
2137 */ 2105 */
2138 next_snap_ino = 0; 2106 next_snap_ino = 0;
2139 /* save some space for the snaprealm count */
2140 pnum_realms = p;
2141 ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
2142 p += sizeof(*pnum_realms);
2143 num_realms = 0;
2144 while (1) { 2107 while (1) {
2145 struct ceph_snap_realm *realm; 2108 struct ceph_snap_realm *realm;
2146 struct ceph_mds_snaprealm_reconnect *sr_rec; 2109 struct ceph_mds_snaprealm_reconnect sr_rec;
2147 got = radix_tree_gang_lookup(&mdsc->snap_realms, 2110 got = radix_tree_gang_lookup(&mdsc->snap_realms,
2148 (void **)&realm, next_snap_ino, 1); 2111 (void **)&realm, next_snap_ino, 1);
2149 if (!got) 2112 if (!got)
@@ -2151,22 +2114,19 @@ retry:
2151 2114
2152 dout(" adding snap realm %llx seq %lld parent %llx\n", 2115 dout(" adding snap realm %llx seq %lld parent %llx\n",
2153 realm->ino, realm->seq, realm->parent_ino); 2116 realm->ino, realm->seq, realm->parent_ino);
2154 ceph_decode_need(&p, end, sizeof(*sr_rec), needmore); 2117 sr_rec.ino = cpu_to_le64(realm->ino);
2155 sr_rec = p; 2118 sr_rec.seq = cpu_to_le64(realm->seq);
2156 sr_rec->ino = cpu_to_le64(realm->ino); 2119 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2157 sr_rec->seq = cpu_to_le64(realm->seq); 2120 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2158 sr_rec->parent = cpu_to_le64(realm->parent_ino); 2121 if (err)
2159 p += sizeof(*sr_rec); 2122 goto fail;
2160 num_realms++;
2161 next_snap_ino = realm->ino + 1; 2123 next_snap_ino = realm->ino + 1;
2162 } 2124 }
2163 *pnum_realms = cpu_to_le32(num_realms);
2164 2125
2165send: 2126send:
2166 reply->front.iov_len = p - reply->front.iov_base; 2127 reply->pagelist = pagelist;
2167 reply->hdr.front_len = cpu_to_le32(reply->front.iov_len); 2128 reply->hdr.data_len = cpu_to_le32(pagelist->length);
2168 dout("final len was %u (guessed %d)\n", 2129 reply->nr_pages = calc_pages_for(0, pagelist->length);
2169 (unsigned)reply->front.iov_len, len);
2170 ceph_con_send(&session->s_con, reply); 2130 ceph_con_send(&session->s_con, reply);
2171 2131
2172 if (session) { 2132 if (session) {
@@ -2183,18 +2143,14 @@ out:
2183 mutex_lock(&mdsc->mutex); 2143 mutex_lock(&mdsc->mutex);
2184 return; 2144 return;
2185 2145
2186needmore: 2146fail:
2187 /*
2188 * we need a larger buffer. this doesn't very accurately
2189 * factor in snap realms, but it's safe.
2190 */
2191 num_caps += num_realms;
2192 newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
2193 pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
2194 len, num_caps, session->s_nr_caps, newlen);
2195 len = newlen;
2196 ceph_msg_put(reply); 2147 ceph_msg_put(reply);
2197 goto retry; 2148fail_nomsg:
2149 ceph_pagelist_release(pagelist);
2150 kfree(pagelist);
2151fail_nopagelist:
2152 pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
2153 goto out;
2198} 2154}
2199 2155
2200 2156