aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorYan, Zheng <zheng.z.yan@intel.com>2013-11-24 01:43:46 -0500
committerYan, Zheng <zheng.z.yan@intel.com>2014-01-21 03:30:28 -0500
commit4ee6a914edbbd2543884f0ad7d58ea471136be32 (patch)
tree459a9db93b394a25317c8ab29338668a0973d57c /fs
parent186e4f7a4b1883f3f46aa15366c0bcebc28fdda7 (diff)
ceph: remove exported caps when handling cap import message
Version 3 cap import message includes the ID of the exported caps. It allow us to remove the exported caps if we still haven't received the corresponding cap export message. We remove the exported caps because they are stale, keeping them can compromise consistence. Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/caps.c79
1 files changed, 52 insertions, 27 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index d65ff334901c..98f3ca4a5ddf 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -611,6 +611,7 @@ retry:
611 if (ci->i_auth_cap == NULL || 611 if (ci->i_auth_cap == NULL ||
612 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) 612 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
613 ci->i_auth_cap = cap; 613 ci->i_auth_cap = cap;
614 ci->i_cap_exporting_issued = 0;
614 } else if (ci->i_auth_cap == cap) { 615 } else if (ci->i_auth_cap == cap) {
615 ci->i_auth_cap = NULL; 616 ci->i_auth_cap = NULL;
616 spin_lock(&mdsc->cap_dirty_lock); 617 spin_lock(&mdsc->cap_dirty_lock);
@@ -2823,10 +2824,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2823 */ 2824 */
2824static void handle_cap_import(struct ceph_mds_client *mdsc, 2825static void handle_cap_import(struct ceph_mds_client *mdsc,
2825 struct inode *inode, struct ceph_mds_caps *im, 2826 struct inode *inode, struct ceph_mds_caps *im,
2827 struct ceph_mds_cap_peer *ph,
2826 struct ceph_mds_session *session, 2828 struct ceph_mds_session *session,
2827 void *snaptrace, int snaptrace_len) 2829 void *snaptrace, int snaptrace_len)
2828{ 2830{
2829 struct ceph_inode_info *ci = ceph_inode(inode); 2831 struct ceph_inode_info *ci = ceph_inode(inode);
2832 struct ceph_cap *cap;
2830 int mds = session->s_mds; 2833 int mds = session->s_mds;
2831 unsigned issued = le32_to_cpu(im->caps); 2834 unsigned issued = le32_to_cpu(im->caps);
2832 unsigned wanted = le32_to_cpu(im->wanted); 2835 unsigned wanted = le32_to_cpu(im->wanted);
@@ -2834,28 +2837,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2834 unsigned mseq = le32_to_cpu(im->migrate_seq); 2837 unsigned mseq = le32_to_cpu(im->migrate_seq);
2835 u64 realmino = le64_to_cpu(im->realm); 2838 u64 realmino = le64_to_cpu(im->realm);
2836 u64 cap_id = le64_to_cpu(im->cap_id); 2839 u64 cap_id = le64_to_cpu(im->cap_id);
2840 u64 p_cap_id;
2841 int peer;
2837 2842
2838 if (ci->i_cap_exporting_mds >= 0 && 2843 if (ph) {
2839 ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { 2844 p_cap_id = le64_to_cpu(ph->cap_id);
2840 dout("handle_cap_import inode %p ci %p mds%d mseq %d" 2845 peer = le32_to_cpu(ph->mds);
2841 " - cleared exporting from mds%d\n", 2846 } else {
2842 inode, ci, mds, mseq, 2847 p_cap_id = 0;
2843 ci->i_cap_exporting_mds); 2848 peer = -1;
2844 ci->i_cap_exporting_issued = 0; 2849 }
2845 ci->i_cap_exporting_mseq = 0;
2846 ci->i_cap_exporting_mds = -1;
2847 2850
2848 spin_lock(&mdsc->cap_dirty_lock); 2851 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
2849 if (!list_empty(&ci->i_dirty_item)) { 2852 inode, ci, mds, mseq, peer);
2850 dout(" moving %p back to cap_dirty\n", inode); 2853
2851 list_move(&ci->i_dirty_item, &mdsc->cap_dirty); 2854 spin_lock(&ci->i_ceph_lock);
2855 cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
2856 if (cap && cap->cap_id == p_cap_id) {
2857 dout(" remove export cap %p mds%d flags %d\n",
2858 cap, peer, ph->flags);
2859 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
2860 (cap->seq != le32_to_cpu(ph->seq) ||
2861 cap->mseq != le32_to_cpu(ph->mseq))) {
2862 pr_err("handle_cap_import: mismatched seq/mseq: "
2863 "ino (%llx.%llx) mds%d seq %d mseq %d "
2864 "importer mds%d has peer seq %d mseq %d\n",
2865 ceph_vinop(inode), peer, cap->seq,
2866 cap->mseq, mds, le32_to_cpu(ph->seq),
2867 le32_to_cpu(ph->mseq));
2852 } 2868 }
2853 spin_unlock(&mdsc->cap_dirty_lock); 2869 ci->i_cap_exporting_issued = cap->issued;
2854 } else { 2870 __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
2855 dout("handle_cap_import inode %p ci %p mds%d mseq %d\n",
2856 inode, ci, mds, mseq);
2857 } 2871 }
2858 2872
2873 /* make sure we re-request max_size, if necessary */
2874 ci->i_wanted_max_size = 0;
2875 ci->i_requested_max_size = 0;
2876 spin_unlock(&ci->i_ceph_lock);
2877
2859 down_write(&mdsc->snap_rwsem); 2878 down_write(&mdsc->snap_rwsem);
2860 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, 2879 ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
2861 false); 2880 false);
@@ -2866,11 +2885,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
2866 kick_flushing_inode_caps(mdsc, session, inode); 2885 kick_flushing_inode_caps(mdsc, session, inode);
2867 up_read(&mdsc->snap_rwsem); 2886 up_read(&mdsc->snap_rwsem);
2868 2887
2869 /* make sure we re-request max_size, if necessary */
2870 spin_lock(&ci->i_ceph_lock);
2871 ci->i_wanted_max_size = 0; /* reset */
2872 ci->i_requested_max_size = 0;
2873 spin_unlock(&ci->i_ceph_lock);
2874} 2888}
2875 2889
2876/* 2890/*
@@ -2888,6 +2902,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2888 struct ceph_inode_info *ci; 2902 struct ceph_inode_info *ci;
2889 struct ceph_cap *cap; 2903 struct ceph_cap *cap;
2890 struct ceph_mds_caps *h; 2904 struct ceph_mds_caps *h;
2905 struct ceph_mds_cap_peer *peer = NULL;
2891 int mds = session->s_mds; 2906 int mds = session->s_mds;
2892 int op; 2907 int op;
2893 u32 seq, mseq; 2908 u32 seq, mseq;
@@ -2898,12 +2913,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2898 void *snaptrace; 2913 void *snaptrace;
2899 size_t snaptrace_len; 2914 size_t snaptrace_len;
2900 void *flock; 2915 void *flock;
2916 void *end;
2901 u32 flock_len; 2917 u32 flock_len;
2902 int open_target_sessions = 0; 2918 int open_target_sessions = 0;
2903 2919
2904 dout("handle_caps from mds%d\n", mds); 2920 dout("handle_caps from mds%d\n", mds);
2905 2921
2906 /* decode */ 2922 /* decode */
2923 end = msg->front.iov_base + msg->front.iov_len;
2907 tid = le64_to_cpu(msg->hdr.tid); 2924 tid = le64_to_cpu(msg->hdr.tid);
2908 if (msg->front.iov_len < sizeof(*h)) 2925 if (msg->front.iov_len < sizeof(*h))
2909 goto bad; 2926 goto bad;
@@ -2921,17 +2938,25 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2921 snaptrace_len = le32_to_cpu(h->snap_trace_len); 2938 snaptrace_len = le32_to_cpu(h->snap_trace_len);
2922 2939
2923 if (le16_to_cpu(msg->hdr.version) >= 2) { 2940 if (le16_to_cpu(msg->hdr.version) >= 2) {
2924 void *p, *end; 2941 void *p = snaptrace + snaptrace_len;
2925
2926 p = snaptrace + snaptrace_len;
2927 end = msg->front.iov_base + msg->front.iov_len;
2928 ceph_decode_32_safe(&p, end, flock_len, bad); 2942 ceph_decode_32_safe(&p, end, flock_len, bad);
2943 if (p + flock_len > end)
2944 goto bad;
2929 flock = p; 2945 flock = p;
2930 } else { 2946 } else {
2931 flock = NULL; 2947 flock = NULL;
2932 flock_len = 0; 2948 flock_len = 0;
2933 } 2949 }
2934 2950
2951 if (le16_to_cpu(msg->hdr.version) >= 3) {
2952 if (op == CEPH_CAP_OP_IMPORT) {
2953 void *p = flock + flock_len;
2954 if (p + sizeof(*peer) > end)
2955 goto bad;
2956 peer = p;
2957 }
2958 }
2959
2935 mutex_lock(&session->s_mutex); 2960 mutex_lock(&session->s_mutex);
2936 session->s_seq++; 2961 session->s_seq++;
2937 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 2962 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2968,7 +2993,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2968 goto done; 2993 goto done;
2969 2994
2970 case CEPH_CAP_OP_IMPORT: 2995 case CEPH_CAP_OP_IMPORT:
2971 handle_cap_import(mdsc, inode, h, session, 2996 handle_cap_import(mdsc, inode, h, peer, session,
2972 snaptrace, snaptrace_len); 2997 snaptrace, snaptrace_len);
2973 } 2998 }
2974 2999