aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2010-02-22 18:12:16 -0500
committerSage Weil <sage@newdream.net>2010-02-23 17:26:35 -0500
commit2600d2dd5085ab6fb09540226138a60055abf335 (patch)
tree5aeb8a110eb7a41dab2d16b3a5ba8d67ad068a82
parenta6369741c48815fedfce7072b7a9cd98b5bafd56 (diff)
ceph: drop messages on unregistered mds sessions; cleanup
Verify the mds session is currently registered before handling incoming messages. Clean up message handlers to pull mds out of session->s_mds instead of less trustworthy src field. Clean up con_{get,put} debug output. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/caps.c2
-rw-r--r--fs/ceph/mds_client.c85
-rw-r--r--fs/ceph/snap.c17
-rw-r--r--fs/ceph/super.h1
4 files changed, 46 insertions, 59 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index b6154ffe70d..bb846164add 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2600,7 +2600,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2600 struct inode *inode; 2600 struct inode *inode;
2601 struct ceph_cap *cap; 2601 struct ceph_cap *cap;
2602 struct ceph_mds_caps *h; 2602 struct ceph_mds_caps *h;
2603 int mds = le64_to_cpu(msg->hdr.src.name.num); 2603 int mds = session->s_mds;
2604 int op; 2604 int op;
2605 u32 seq; 2605 u32 seq;
2606 struct ceph_vino vino; 2606 struct ceph_vino vino;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 124c0c17a14..4d00ea2af00 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -309,6 +309,15 @@ static bool __have_session(struct ceph_mds_client *mdsc, int mds)
309 return mdsc->sessions[mds]; 309 return mdsc->sessions[mds];
310} 310}
311 311
312static int __verify_registered_session(struct ceph_mds_client *mdsc,
313 struct ceph_mds_session *s)
314{
315 if (s->s_mds >= mdsc->max_sessions ||
316 mdsc->sessions[s->s_mds] != s)
317 return -ENOENT;
318 return 0;
319}
320
312/* 321/*
313 * create+register a new session for given mds. 322 * create+register a new session for given mds.
314 * called under mdsc->mutex. 323 * called under mdsc->mutex.
@@ -382,10 +391,11 @@ fail_realloc:
382/* 391/*
383 * called under mdsc->mutex 392 * called under mdsc->mutex
384 */ 393 */
385static void unregister_session(struct ceph_mds_client *mdsc, 394static void __unregister_session(struct ceph_mds_client *mdsc,
386 struct ceph_mds_session *s) 395 struct ceph_mds_session *s)
387{ 396{
388 dout("unregister_session mds%d %p\n", s->s_mds, s); 397 dout("__unregister_session mds%d %p\n", s->s_mds, s);
398 BUG_ON(mdsc->sessions[s->s_mds] != s);
389 mdsc->sessions[s->s_mds] = NULL; 399 mdsc->sessions[s->s_mds] = NULL;
390 ceph_con_close(&s->s_con); 400 ceph_con_close(&s->s_con);
391 ceph_put_mds_session(s); 401 ceph_put_mds_session(s);
@@ -1740,10 +1750,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1740 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 1750 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
1741 u64 tid; 1751 u64 tid;
1742 int err, result; 1752 int err, result;
1743 int mds; 1753 int mds = session->s_mds;
1744 1754
1745 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1746 return;
1747 if (msg->front.iov_len < sizeof(*head)) { 1755 if (msg->front.iov_len < sizeof(*head)) {
1748 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 1756 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1749 ceph_msg_dump(msg); 1757 ceph_msg_dump(msg);
@@ -1760,7 +1768,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1760 return; 1768 return;
1761 } 1769 }
1762 dout("handle_reply %p\n", req); 1770 dout("handle_reply %p\n", req);
1763 mds = le64_to_cpu(msg->hdr.src.name.num);
1764 1771
1765 /* correct session? */ 1772 /* correct session? */
1766 if (!req->r_session && req->r_session != session) { 1773 if (!req->r_session && req->r_session != session) {
@@ -1884,7 +1891,9 @@ out:
1884/* 1891/*
1885 * handle mds notification that our request has been forwarded. 1892 * handle mds notification that our request has been forwarded.
1886 */ 1893 */
1887static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 1894static void handle_forward(struct ceph_mds_client *mdsc,
1895 struct ceph_mds_session *session,
1896 struct ceph_msg *msg)
1888{ 1897{
1889 struct ceph_mds_request *req; 1898 struct ceph_mds_request *req;
1890 u64 tid; 1899 u64 tid;
@@ -1894,11 +1903,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
1894 int err = -EINVAL; 1903 int err = -EINVAL;
1895 void *p = msg->front.iov_base; 1904 void *p = msg->front.iov_base;
1896 void *end = p + msg->front.iov_len; 1905 void *end = p + msg->front.iov_len;
1897 int from_mds, state; 1906 int state;
1898
1899 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1900 goto bad;
1901 from_mds = le64_to_cpu(msg->hdr.src.name.num);
1902 1907
1903 ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad); 1908 ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
1904 tid = ceph_decode_64(&p); 1909 tid = ceph_decode_64(&p);
@@ -1915,6 +1920,9 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
1915 goto out; /* dup reply? */ 1920 goto out; /* dup reply? */
1916 } 1921 }
1917 1922
1923 if (next_mds >= mdsc->max_sessions)
1924 goto out;
1925
1918 state = mdsc->sessions[next_mds]->s_state; 1926 state = mdsc->sessions[next_mds]->s_state;
1919 if (fwd_seq <= req->r_num_fwd) { 1927 if (fwd_seq <= req->r_num_fwd) {
1920 dout("forward %llu to mds%d - old seq %d <= %d\n", 1928 dout("forward %llu to mds%d - old seq %d <= %d\n",
@@ -1945,14 +1953,10 @@ static void handle_session(struct ceph_mds_session *session,
1945 struct ceph_mds_client *mdsc = session->s_mdsc; 1953 struct ceph_mds_client *mdsc = session->s_mdsc;
1946 u32 op; 1954 u32 op;
1947 u64 seq; 1955 u64 seq;
1948 int mds; 1956 int mds = session->s_mds;
1949 struct ceph_mds_session_head *h = msg->front.iov_base; 1957 struct ceph_mds_session_head *h = msg->front.iov_base;
1950 int wake = 0; 1958 int wake = 0;
1951 1959
1952 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1953 return;
1954 mds = le64_to_cpu(msg->hdr.src.name.num);
1955
1956 /* decode */ 1960 /* decode */
1957 if (msg->front.iov_len != sizeof(*h)) 1961 if (msg->front.iov_len != sizeof(*h))
1958 goto bad; 1962 goto bad;
@@ -1960,6 +1964,8 @@ static void handle_session(struct ceph_mds_session *session,
1960 seq = le64_to_cpu(h->seq); 1964 seq = le64_to_cpu(h->seq);
1961 1965
1962 mutex_lock(&mdsc->mutex); 1966 mutex_lock(&mdsc->mutex);
1967 if (op == CEPH_SESSION_CLOSE)
1968 __unregister_session(mdsc, session);
1963 /* FIXME: this ttl calculation is generous */ 1969 /* FIXME: this ttl calculation is generous */
1964 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 1970 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
1965 mutex_unlock(&mdsc->mutex); 1971 mutex_unlock(&mdsc->mutex);
@@ -1990,7 +1996,6 @@ static void handle_session(struct ceph_mds_session *session,
1990 break; 1996 break;
1991 1997
1992 case CEPH_SESSION_CLOSE: 1998 case CEPH_SESSION_CLOSE:
1993 unregister_session(mdsc, session);
1994 remove_session_caps(session); 1999 remove_session_caps(session);
1995 wake = 1; /* for good measure */ 2000 wake = 1; /* for good measure */
1996 complete(&mdsc->session_close_waiters); 2001 complete(&mdsc->session_close_waiters);
@@ -2269,7 +2274,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2269 /* the session never opened, just close it 2274 /* the session never opened, just close it
2270 * out now */ 2275 * out now */
2271 __wake_requests(mdsc, &s->s_waiting); 2276 __wake_requests(mdsc, &s->s_waiting);
2272 unregister_session(mdsc, s); 2277 __unregister_session(mdsc, s);
2273 } else { 2278 } else {
2274 /* just close it */ 2279 /* just close it */
2275 mutex_unlock(&mdsc->mutex); 2280 mutex_unlock(&mdsc->mutex);
@@ -2329,24 +2334,22 @@ void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2329 di->lease_session = NULL; 2334 di->lease_session = NULL;
2330} 2335}
2331 2336
2332static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 2337static void handle_lease(struct ceph_mds_client *mdsc,
2338 struct ceph_mds_session *session,
2339 struct ceph_msg *msg)
2333{ 2340{
2334 struct super_block *sb = mdsc->client->sb; 2341 struct super_block *sb = mdsc->client->sb;
2335 struct inode *inode; 2342 struct inode *inode;
2336 struct ceph_mds_session *session;
2337 struct ceph_inode_info *ci; 2343 struct ceph_inode_info *ci;
2338 struct dentry *parent, *dentry; 2344 struct dentry *parent, *dentry;
2339 struct ceph_dentry_info *di; 2345 struct ceph_dentry_info *di;
2340 int mds; 2346 int mds = session->s_mds;
2341 struct ceph_mds_lease *h = msg->front.iov_base; 2347 struct ceph_mds_lease *h = msg->front.iov_base;
2342 struct ceph_vino vino; 2348 struct ceph_vino vino;
2343 int mask; 2349 int mask;
2344 struct qstr dname; 2350 struct qstr dname;
2345 int release = 0; 2351 int release = 0;
2346 2352
2347 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
2348 return;
2349 mds = le64_to_cpu(msg->hdr.src.name.num);
2350 dout("handle_lease from mds%d\n", mds); 2353 dout("handle_lease from mds%d\n", mds);
2351 2354
2352 /* decode */ 2355 /* decode */
@@ -2360,15 +2363,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2360 if (dname.len != get_unaligned_le32(h+1)) 2363 if (dname.len != get_unaligned_le32(h+1))
2361 goto bad; 2364 goto bad;
2362 2365
2363 /* find session */
2364 mutex_lock(&mdsc->mutex);
2365 session = __ceph_lookup_mds_session(mdsc, mds);
2366 mutex_unlock(&mdsc->mutex);
2367 if (!session) {
2368 pr_err("handle_lease got lease but no session mds%d\n", mds);
2369 return;
2370 }
2371
2372 mutex_lock(&session->s_mutex); 2366 mutex_lock(&session->s_mutex);
2373 session->s_seq++; 2367 session->s_seq++;
2374 2368
@@ -2437,7 +2431,6 @@ release:
2437out: 2431out:
2438 iput(inode); 2432 iput(inode);
2439 mutex_unlock(&session->s_mutex); 2433 mutex_unlock(&session->s_mutex);
2440 ceph_put_mds_session(session);
2441 return; 2434 return;
2442 2435
2443bad: 2436bad:
@@ -2794,7 +2787,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
2794 for (i = 0; i < mdsc->max_sessions; i++) { 2787 for (i = 0; i < mdsc->max_sessions; i++) {
2795 if (mdsc->sessions[i]) { 2788 if (mdsc->sessions[i]) {
2796 session = get_session(mdsc->sessions[i]); 2789 session = get_session(mdsc->sessions[i]);
2797 unregister_session(mdsc, session); 2790 __unregister_session(mdsc, session);
2798 mutex_unlock(&mdsc->mutex); 2791 mutex_unlock(&mdsc->mutex);
2799 mutex_lock(&session->s_mutex); 2792 mutex_lock(&session->s_mutex);
2800 remove_session_caps(session); 2793 remove_session_caps(session);
@@ -2891,8 +2884,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
2891 struct ceph_mds_session *s = con->private; 2884 struct ceph_mds_session *s = con->private;
2892 2885
2893 if (get_session(s)) { 2886 if (get_session(s)) {
2894 dout("mdsc con_get %p %d -> %d\n", s, 2887 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
2895 atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
2896 return con; 2888 return con;
2897 } 2889 }
2898 dout("mdsc con_get %p FAIL\n", s); 2890 dout("mdsc con_get %p FAIL\n", s);
@@ -2903,9 +2895,8 @@ static void con_put(struct ceph_connection *con)
2903{ 2895{
2904 struct ceph_mds_session *s = con->private; 2896 struct ceph_mds_session *s = con->private;
2905 2897
2906 dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
2907 atomic_read(&s->s_ref) - 1);
2908 ceph_put_mds_session(s); 2898 ceph_put_mds_session(s);
2899 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
2909} 2900}
2910 2901
2911/* 2902/*
@@ -2926,6 +2917,13 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2926 struct ceph_mds_client *mdsc = s->s_mdsc; 2917 struct ceph_mds_client *mdsc = s->s_mdsc;
2927 int type = le16_to_cpu(msg->hdr.type); 2918 int type = le16_to_cpu(msg->hdr.type);
2928 2919
2920 mutex_lock(&mdsc->mutex);
2921 if (__verify_registered_session(mdsc, s) < 0) {
2922 mutex_unlock(&mdsc->mutex);
2923 goto out;
2924 }
2925 mutex_unlock(&mdsc->mutex);
2926
2929 switch (type) { 2927 switch (type) {
2930 case CEPH_MSG_MDS_MAP: 2928 case CEPH_MSG_MDS_MAP:
2931 ceph_mdsc_handle_map(mdsc, msg); 2929 ceph_mdsc_handle_map(mdsc, msg);
@@ -2937,22 +2935,23 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2937 handle_reply(s, msg); 2935 handle_reply(s, msg);
2938 break; 2936 break;
2939 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 2937 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2940 handle_forward(mdsc, msg); 2938 handle_forward(mdsc, s, msg);
2941 break; 2939 break;
2942 case CEPH_MSG_CLIENT_CAPS: 2940 case CEPH_MSG_CLIENT_CAPS:
2943 ceph_handle_caps(s, msg); 2941 ceph_handle_caps(s, msg);
2944 break; 2942 break;
2945 case CEPH_MSG_CLIENT_SNAP: 2943 case CEPH_MSG_CLIENT_SNAP:
2946 ceph_handle_snap(mdsc, msg); 2944 ceph_handle_snap(mdsc, s, msg);
2947 break; 2945 break;
2948 case CEPH_MSG_CLIENT_LEASE: 2946 case CEPH_MSG_CLIENT_LEASE:
2949 handle_lease(mdsc, msg); 2947 handle_lease(mdsc, s, msg);
2950 break; 2948 break;
2951 2949
2952 default: 2950 default:
2953 pr_err("received unknown message type %d %s\n", type, 2951 pr_err("received unknown message type %d %s\n", type,
2954 ceph_msg_type_name(type)); 2952 ceph_msg_type_name(type));
2955 } 2953 }
2954out:
2956 ceph_msg_put(msg); 2955 ceph_msg_put(msg);
2957} 2956}
2958 2957
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 49d0c4c59d8..bf2a5f3846a 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -713,11 +713,11 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
713 * directory into another realm. 713 * directory into another realm.
714 */ 714 */
715void ceph_handle_snap(struct ceph_mds_client *mdsc, 715void ceph_handle_snap(struct ceph_mds_client *mdsc,
716 struct ceph_mds_session *session,
716 struct ceph_msg *msg) 717 struct ceph_msg *msg)
717{ 718{
718 struct super_block *sb = mdsc->client->sb; 719 struct super_block *sb = mdsc->client->sb;
719 struct ceph_mds_session *session; 720 int mds = session->s_mds;
720 int mds;
721 u64 split; 721 u64 split;
722 int op; 722 int op;
723 int trace_len; 723 int trace_len;
@@ -730,10 +730,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
730 int i; 730 int i;
731 int locked_rwsem = 0; 731 int locked_rwsem = 0;
732 732
733 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
734 return;
735 mds = le64_to_cpu(msg->hdr.src.name.num);
736
737 /* decode */ 733 /* decode */
738 if (msg->front.iov_len < sizeof(*h)) 734 if (msg->front.iov_len < sizeof(*h))
739 goto bad; 735 goto bad;
@@ -749,15 +745,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
749 dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds, 745 dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
750 ceph_snap_op_name(op), split, trace_len); 746 ceph_snap_op_name(op), split, trace_len);
751 747
752 /* find session */
753 mutex_lock(&mdsc->mutex);
754 session = __ceph_lookup_mds_session(mdsc, mds);
755 mutex_unlock(&mdsc->mutex);
756 if (!session) {
757 dout("WTF, got snap but no session for mds%d\n", mds);
758 return;
759 }
760
761 mutex_lock(&session->s_mutex); 748 mutex_lock(&session->s_mutex);
762 session->s_seq++; 749 session->s_seq++;
763 mutex_unlock(&session->s_mutex); 750 mutex_unlock(&session->s_mutex);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 384f0e2e7c6..ff7aaa32736 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -707,6 +707,7 @@ extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
707extern int ceph_update_snap_trace(struct ceph_mds_client *m, 707extern int ceph_update_snap_trace(struct ceph_mds_client *m,
708 void *p, void *e, bool deletion); 708 void *p, void *e, bool deletion);
709extern void ceph_handle_snap(struct ceph_mds_client *mdsc, 709extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
710 struct ceph_mds_session *session,
710 struct ceph_msg *msg); 711 struct ceph_msg *msg);
711extern void ceph_queue_cap_snap(struct ceph_inode_info *ci, 712extern void ceph_queue_cap_snap(struct ceph_inode_info *ci,
712 struct ceph_snap_context *snapc); 713 struct ceph_snap_context *snapc);