aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c235
1 files changed, 188 insertions, 47 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index dd440bd438a..a75ddbf9fe3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
3#include <linux/wait.h> 3#include <linux/wait.h>
4#include <linux/slab.h> 4#include <linux/slab.h>
5#include <linux/sched.h> 5#include <linux/sched.h>
6#include <linux/smp_lock.h>
6 7
7#include "mds_client.h" 8#include "mds_client.h"
8#include "mon_client.h" 9#include "mon_client.h"
@@ -37,6 +38,11 @@
37 * are no longer valid. 38 * are no longer valid.
38 */ 39 */
39 40
41struct ceph_reconnect_state {
42 struct ceph_pagelist *pagelist;
43 bool flock;
44};
45
40static void __wake_requests(struct ceph_mds_client *mdsc, 46static void __wake_requests(struct ceph_mds_client *mdsc,
41 struct list_head *head); 47 struct list_head *head);
42 48
@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
449 kfree(req->r_path1); 455 kfree(req->r_path1);
450 kfree(req->r_path2); 456 kfree(req->r_path2);
451 put_request_session(req); 457 put_request_session(req);
452 ceph_unreserve_caps(&req->r_caps_reservation); 458 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
453 kfree(req); 459 kfree(req);
454} 460}
455 461
@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
512{ 518{
513 req->r_tid = ++mdsc->last_tid; 519 req->r_tid = ++mdsc->last_tid;
514 if (req->r_num_caps) 520 if (req->r_num_caps)
515 ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps); 521 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
522 req->r_num_caps);
516 dout("__register_request %p tid %lld\n", req, req->r_tid); 523 dout("__register_request %p tid %lld\n", req, req->r_tid);
517 ceph_mdsc_get_request(req); 524 ceph_mdsc_get_request(req);
518 __insert_request(mdsc, req); 525 __insert_request(mdsc, req);
@@ -704,6 +711,51 @@ static int __open_session(struct ceph_mds_client *mdsc,
704} 711}
705 712
706/* 713/*
714 * open sessions for any export targets for the given mds
715 *
716 * called under mdsc->mutex
717 */
718static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
719 struct ceph_mds_session *session)
720{
721 struct ceph_mds_info *mi;
722 struct ceph_mds_session *ts;
723 int i, mds = session->s_mds;
724 int target;
725
726 if (mds >= mdsc->mdsmap->m_max_mds)
727 return;
728 mi = &mdsc->mdsmap->m_info[mds];
729 dout("open_export_target_sessions for mds%d (%d targets)\n",
730 session->s_mds, mi->num_export_targets);
731
732 for (i = 0; i < mi->num_export_targets; i++) {
733 target = mi->export_targets[i];
734 ts = __ceph_lookup_mds_session(mdsc, target);
735 if (!ts) {
736 ts = register_session(mdsc, target);
737 if (IS_ERR(ts))
738 return;
739 }
740 if (session->s_state == CEPH_MDS_SESSION_NEW ||
741 session->s_state == CEPH_MDS_SESSION_CLOSING)
742 __open_session(mdsc, session);
743 else
744 dout(" mds%d target mds%d %p is %s\n", session->s_mds,
745 i, ts, session_state_name(ts->s_state));
746 ceph_put_mds_session(ts);
747 }
748}
749
750void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
751 struct ceph_mds_session *session)
752{
753 mutex_lock(&mdsc->mutex);
754 __open_export_target_sessions(mdsc, session);
755 mutex_unlock(&mdsc->mutex);
756}
757
758/*
707 * session caps 759 * session caps
708 */ 760 */
709 761
@@ -764,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
764 last_inode = NULL; 816 last_inode = NULL;
765 } 817 }
766 if (old_cap) { 818 if (old_cap) {
767 ceph_put_cap(old_cap); 819 ceph_put_cap(session->s_mdsc, old_cap);
768 old_cap = NULL; 820 old_cap = NULL;
769 } 821 }
770 822
@@ -793,7 +845,7 @@ out:
793 if (last_inode) 845 if (last_inode)
794 iput(last_inode); 846 iput(last_inode);
795 if (old_cap) 847 if (old_cap)
796 ceph_put_cap(old_cap); 848 ceph_put_cap(session->s_mdsc, old_cap);
797 849
798 return ret; 850 return ret;
799} 851}
@@ -1067,15 +1119,16 @@ static int trim_caps(struct ceph_mds_client *mdsc,
1067 * Called under s_mutex. 1119 * Called under s_mutex.
1068 */ 1120 */
1069int ceph_add_cap_releases(struct ceph_mds_client *mdsc, 1121int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1070 struct ceph_mds_session *session, 1122 struct ceph_mds_session *session)
1071 int extra)
1072{ 1123{
1073 struct ceph_msg *msg; 1124 struct ceph_msg *msg, *partial = NULL;
1074 struct ceph_mds_cap_release *head; 1125 struct ceph_mds_cap_release *head;
1075 int err = -ENOMEM; 1126 int err = -ENOMEM;
1127 int extra = mdsc->client->mount_args->cap_release_safety;
1128 int num;
1076 1129
1077 if (extra < 0) 1130 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1078 extra = mdsc->client->mount_args->cap_release_safety; 1131 extra);
1079 1132
1080 spin_lock(&session->s_cap_lock); 1133 spin_lock(&session->s_cap_lock);
1081 1134
@@ -1084,9 +1137,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1084 struct ceph_msg, 1137 struct ceph_msg,
1085 list_head); 1138 list_head);
1086 head = msg->front.iov_base; 1139 head = msg->front.iov_base;
1087 extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num); 1140 num = le32_to_cpu(head->num);
1141 if (num) {
1142 dout(" partial %p with (%d/%d)\n", msg, num,
1143 (int)CEPH_CAPS_PER_RELEASE);
1144 extra += CEPH_CAPS_PER_RELEASE - num;
1145 partial = msg;
1146 }
1088 } 1147 }
1089
1090 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1148 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1091 spin_unlock(&session->s_cap_lock); 1149 spin_unlock(&session->s_cap_lock);
1092 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1150 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
@@ -1103,19 +1161,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1103 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; 1161 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1104 } 1162 }
1105 1163
1106 if (!list_empty(&session->s_cap_releases)) { 1164 if (partial) {
1107 msg = list_first_entry(&session->s_cap_releases, 1165 head = partial->front.iov_base;
1108 struct ceph_msg, 1166 num = le32_to_cpu(head->num);
1109 list_head); 1167 dout(" queueing partial %p with %d/%d\n", partial, num,
1110 head = msg->front.iov_base; 1168 (int)CEPH_CAPS_PER_RELEASE);
1111 if (head->num) { 1169 list_move_tail(&partial->list_head,
1112 dout(" queueing non-full %p (%d)\n", msg, 1170 &session->s_cap_releases_done);
1113 le32_to_cpu(head->num)); 1171 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1114 list_move_tail(&msg->list_head,
1115 &session->s_cap_releases_done);
1116 session->s_num_cap_releases -=
1117 CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
1118 }
1119 } 1172 }
1120 err = 0; 1173 err = 0;
1121 spin_unlock(&session->s_cap_lock); 1174 spin_unlock(&session->s_cap_lock);
@@ -1250,6 +1303,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1250 return ERR_PTR(-ENOMEM); 1303 return ERR_PTR(-ENOMEM);
1251 1304
1252 mutex_init(&req->r_fill_mutex); 1305 mutex_init(&req->r_fill_mutex);
1306 req->r_mdsc = mdsc;
1253 req->r_started = jiffies; 1307 req->r_started = jiffies;
1254 req->r_resend_mds = -1; 1308 req->r_resend_mds = -1;
1255 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1309 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1580,6 +1634,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1580 1634
1581 req->r_mds = mds; 1635 req->r_mds = mds;
1582 req->r_attempts++; 1636 req->r_attempts++;
1637 if (req->r_inode) {
1638 struct ceph_cap *cap =
1639 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1640
1641 if (cap)
1642 req->r_sent_on_mseq = cap->mseq;
1643 else
1644 req->r_sent_on_mseq = -1;
1645 }
1583 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 1646 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1584 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1647 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1585 1648
@@ -1914,21 +1977,40 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1914 result = le32_to_cpu(head->result); 1977 result = le32_to_cpu(head->result);
1915 1978
1916 /* 1979 /*
1917 * Tolerate 2 consecutive ESTALEs from the same mds. 1980 * Handle an ESTALE
1918 * FIXME: we should be looking at the cap migrate_seq. 1981 * if we're not talking to the authority, send to them
1982 * if the authority has changed while we weren't looking,
1983 * send to new authority
1984 * Otherwise we just have to return an ESTALE
1919 */ 1985 */
1920 if (result == -ESTALE) { 1986 if (result == -ESTALE) {
1921 req->r_direct_mode = USE_AUTH_MDS; 1987 dout("got ESTALE on request %llu", req->r_tid);
1922 req->r_num_stale++; 1988 if (!req->r_inode) {
1923 if (req->r_num_stale <= 2) { 1989 /* do nothing; not an authority problem */
1990 } else if (req->r_direct_mode != USE_AUTH_MDS) {
1991 dout("not using auth, setting for that now");
1992 req->r_direct_mode = USE_AUTH_MDS;
1924 __do_request(mdsc, req); 1993 __do_request(mdsc, req);
1925 mutex_unlock(&mdsc->mutex); 1994 mutex_unlock(&mdsc->mutex);
1926 goto out; 1995 goto out;
1996 } else {
1997 struct ceph_inode_info *ci = ceph_inode(req->r_inode);
1998 struct ceph_cap *cap =
1999 ceph_get_cap_for_mds(ci, req->r_mds);;
2000
2001 dout("already using auth");
2002 if ((!cap || cap != ci->i_auth_cap) ||
2003 (cap->mseq != req->r_sent_on_mseq)) {
2004 dout("but cap changed, so resending");
2005 __do_request(mdsc, req);
2006 mutex_unlock(&mdsc->mutex);
2007 goto out;
2008 }
1927 } 2009 }
1928 } else { 2010 dout("have to return ESTALE on request %llu", req->r_tid);
1929 req->r_num_stale = 0;
1930 } 2011 }
1931 2012
2013
1932 if (head->safe) { 2014 if (head->safe) {
1933 req->r_got_safe = true; 2015 req->r_got_safe = true;
1934 __unregister_request(mdsc, req); 2016 __unregister_request(mdsc, req);
@@ -1985,7 +2067,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1985 if (err == 0) { 2067 if (err == 0) {
1986 if (result == 0 && rinfo->dir_nr) 2068 if (result == 0 && rinfo->dir_nr)
1987 ceph_readdir_prepopulate(req, req->r_session); 2069 ceph_readdir_prepopulate(req, req->r_session);
1988 ceph_unreserve_caps(&req->r_caps_reservation); 2070 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
1989 } 2071 }
1990 mutex_unlock(&req->r_fill_mutex); 2072 mutex_unlock(&req->r_fill_mutex);
1991 2073
@@ -2005,7 +2087,7 @@ out_err:
2005 } 2087 }
2006 mutex_unlock(&mdsc->mutex); 2088 mutex_unlock(&mdsc->mutex);
2007 2089
2008 ceph_add_cap_releases(mdsc, req->r_session, -1); 2090 ceph_add_cap_releases(mdsc, req->r_session);
2009 mutex_unlock(&session->s_mutex); 2091 mutex_unlock(&session->s_mutex);
2010 2092
2011 /* kick calling process */ 2093 /* kick calling process */
@@ -2193,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2193static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2275static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2194 void *arg) 2276 void *arg)
2195{ 2277{
2196 struct ceph_mds_cap_reconnect rec; 2278 union {
2279 struct ceph_mds_cap_reconnect v2;
2280 struct ceph_mds_cap_reconnect_v1 v1;
2281 } rec;
2282 size_t reclen;
2197 struct ceph_inode_info *ci; 2283 struct ceph_inode_info *ci;
2198 struct ceph_pagelist *pagelist = arg; 2284 struct ceph_reconnect_state *recon_state = arg;
2285 struct ceph_pagelist *pagelist = recon_state->pagelist;
2199 char *path; 2286 char *path;
2200 int pathlen, err; 2287 int pathlen, err;
2201 u64 pathbase; 2288 u64 pathbase;
@@ -2228,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2228 spin_lock(&inode->i_lock); 2315 spin_lock(&inode->i_lock);
2229 cap->seq = 0; /* reset cap seq */ 2316 cap->seq = 0; /* reset cap seq */
2230 cap->issue_seq = 0; /* and issue_seq */ 2317 cap->issue_seq = 0; /* and issue_seq */
2231 rec.cap_id = cpu_to_le64(cap->cap_id); 2318
2232 rec.pathbase = cpu_to_le64(pathbase); 2319 if (recon_state->flock) {
2233 rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2320 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2234 rec.issued = cpu_to_le32(cap->issued); 2321 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2235 rec.size = cpu_to_le64(inode->i_size); 2322 rec.v2.issued = cpu_to_le32(cap->issued);
2236 ceph_encode_timespec(&rec.mtime, &inode->i_mtime); 2323 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2237 ceph_encode_timespec(&rec.atime, &inode->i_atime); 2324 rec.v2.pathbase = cpu_to_le64(pathbase);
2238 rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2325 rec.v2.flock_len = 0;
2326 reclen = sizeof(rec.v2);
2327 } else {
2328 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2329 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2330 rec.v1.issued = cpu_to_le32(cap->issued);
2331 rec.v1.size = cpu_to_le64(inode->i_size);
2332 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2333 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2334 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2335 rec.v1.pathbase = cpu_to_le64(pathbase);
2336 reclen = sizeof(rec.v1);
2337 }
2239 spin_unlock(&inode->i_lock); 2338 spin_unlock(&inode->i_lock);
2240 2339
2241 err = ceph_pagelist_append(pagelist, &rec, sizeof(rec)); 2340 if (recon_state->flock) {
2341 int num_fcntl_locks, num_flock_locks;
2342
2343 lock_kernel();
2344 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2345 rec.v2.flock_len = (2*sizeof(u32) +
2346 (num_fcntl_locks+num_flock_locks) *
2347 sizeof(struct ceph_filelock));
2348
2349 err = ceph_pagelist_append(pagelist, &rec, reclen);
2350 if (!err)
2351 err = ceph_encode_locks(inode, pagelist,
2352 num_fcntl_locks,
2353 num_flock_locks);
2354 unlock_kernel();
2355 }
2242 2356
2243out: 2357out:
2244 kfree(path); 2358 kfree(path);
@@ -2267,6 +2381,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2267 int mds = session->s_mds; 2381 int mds = session->s_mds;
2268 int err = -ENOMEM; 2382 int err = -ENOMEM;
2269 struct ceph_pagelist *pagelist; 2383 struct ceph_pagelist *pagelist;
2384 struct ceph_reconnect_state recon_state;
2270 2385
2271 pr_info("mds%d reconnect start\n", mds); 2386 pr_info("mds%d reconnect start\n", mds);
2272 2387
@@ -2301,7 +2416,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2301 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2416 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2302 if (err) 2417 if (err)
2303 goto fail; 2418 goto fail;
2304 err = iterate_session_caps(session, encode_caps_cb, pagelist); 2419
2420 recon_state.pagelist = pagelist;
2421 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2422 err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2305 if (err < 0) 2423 if (err < 0)
2306 goto fail; 2424 goto fail;
2307 2425
@@ -2326,6 +2444,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2326 } 2444 }
2327 2445
2328 reply->pagelist = pagelist; 2446 reply->pagelist = pagelist;
2447 if (recon_state.flock)
2448 reply->hdr.version = cpu_to_le16(2);
2329 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2449 reply->hdr.data_len = cpu_to_le32(pagelist->length);
2330 reply->nr_pages = calc_pages_for(0, pagelist->length); 2450 reply->nr_pages = calc_pages_for(0, pagelist->length);
2331 ceph_con_send(&session->s_con, reply); 2451 ceph_con_send(&session->s_con, reply);
@@ -2376,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2376 oldstate = ceph_mdsmap_get_state(oldmap, i); 2496 oldstate = ceph_mdsmap_get_state(oldmap, i);
2377 newstate = ceph_mdsmap_get_state(newmap, i); 2497 newstate = ceph_mdsmap_get_state(newmap, i);
2378 2498
2379 dout("check_new_map mds%d state %s -> %s (session %s)\n", 2499 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2380 i, ceph_mds_state_name(oldstate), 2500 i, ceph_mds_state_name(oldstate),
2501 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2381 ceph_mds_state_name(newstate), 2502 ceph_mds_state_name(newstate),
2503 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2382 session_state_name(s->s_state)); 2504 session_state_name(s->s_state));
2383 2505
2384 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 2506 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2428,6 +2550,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
2428 wake_up_session_caps(s, 1); 2550 wake_up_session_caps(s, 1);
2429 } 2551 }
2430 } 2552 }
2553
2554 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2555 s = mdsc->sessions[i];
2556 if (!s)
2557 continue;
2558 if (!ceph_mdsmap_is_laggy(newmap, i))
2559 continue;
2560 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2561 s->s_state == CEPH_MDS_SESSION_HUNG ||
2562 s->s_state == CEPH_MDS_SESSION_CLOSING) {
2563 dout(" connecting to export targets of laggy mds%d\n",
2564 i);
2565 __open_export_target_sessions(mdsc, s);
2566 }
2567 }
2431} 2568}
2432 2569
2433 2570
@@ -2715,7 +2852,7 @@ static void delayed_work(struct work_struct *work)
2715 send_renew_caps(mdsc, s); 2852 send_renew_caps(mdsc, s);
2716 else 2853 else
2717 ceph_con_keepalive(&s->s_con); 2854 ceph_con_keepalive(&s->s_con);
2718 ceph_add_cap_releases(mdsc, s, -1); 2855 ceph_add_cap_releases(mdsc, s);
2719 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2856 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2720 s->s_state == CEPH_MDS_SESSION_HUNG) 2857 s->s_state == CEPH_MDS_SESSION_HUNG)
2721 ceph_send_cap_releases(mdsc, s); 2858 ceph_send_cap_releases(mdsc, s);
@@ -2764,6 +2901,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2764 spin_lock_init(&mdsc->dentry_lru_lock); 2901 spin_lock_init(&mdsc->dentry_lru_lock);
2765 INIT_LIST_HEAD(&mdsc->dentry_lru); 2902 INIT_LIST_HEAD(&mdsc->dentry_lru);
2766 2903
2904 ceph_caps_init(mdsc);
2905 ceph_adjust_min_caps(mdsc, client->min_caps);
2906
2767 return 0; 2907 return 0;
2768} 2908}
2769 2909
@@ -2959,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2959 if (mdsc->mdsmap) 3099 if (mdsc->mdsmap)
2960 ceph_mdsmap_destroy(mdsc->mdsmap); 3100 ceph_mdsmap_destroy(mdsc->mdsmap);
2961 kfree(mdsc->sessions); 3101 kfree(mdsc->sessions);
3102 ceph_caps_finalize(mdsc);
2962} 3103}
2963 3104
2964 3105