aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Documentation/filesystems/ceph.txt8
-rw-r--r--drivers/block/rbd.c11
-rw-r--r--fs/ceph/addr.c1
-rw-r--r--fs/ceph/caps.c160
-rw-r--r--fs/ceph/dir.c2
-rw-r--r--fs/ceph/file.c1
-rw-r--r--fs/ceph/inode.c67
-rw-r--r--fs/ceph/super.c35
-rw-r--r--fs/ceph/xattr.c60
-rw-r--r--include/linux/ceph/ceph_fs.h1
-rw-r--r--include/linux/ceph/osd_client.h8
-rw-r--r--include/linux/ceph/osdmap.h8
-rw-r--r--net/ceph/messenger.c31
-rw-r--r--net/ceph/osd_client.c216
-rw-r--r--net/ceph/osdmap.c19
15 files changed, 372 insertions, 256 deletions
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index d7f011ddc150..8bf62240e10d 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -105,15 +105,13 @@ Mount Options
105 address its connection to the monitor originates from. 105 address its connection to the monitor originates from.
106 106
107 wsize=X 107 wsize=X
108 Specify the maximum write size in bytes. By default there is no 108 Specify the maximum write size in bytes. Default: 16 MB.
109 maximum. Ceph will normally size writes based on the file stripe
110 size.
111 109
112 rsize=X 110 rsize=X
113 Specify the maximum read size in bytes. Default: 64 MB. 111 Specify the maximum read size in bytes. Default: 16 MB.
114 112
115 rasize=X 113 rasize=X
116 Specify the maximum readahead. Default: 8 MB. 114 Specify the maximum readahead size in bytes. Default: 8 MB.
117 115
118 mount_timeout=X 116 mount_timeout=X
119 Specify the timeout value for mount (in seconds), in the case 117 Specify the timeout value for mount (in seconds), in the case
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index af354047ac4b..fa0729c1e776 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2339,6 +2339,7 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2339static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) 2339static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2340{ 2340{
2341 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; 2341 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2342 int ret;
2342 2343
2343 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); 2344 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2344 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); 2345 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
@@ -2353,6 +2354,11 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2353 if (!obj_req->osd_req) 2354 if (!obj_req->osd_req)
2354 return -ENOMEM; 2355 return -ENOMEM;
2355 2356
2357 ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2358 "copyup");
2359 if (ret)
2360 return ret;
2361
2356 /* 2362 /*
2357 * Only send non-zero copyup data to save some I/O and network 2363 * Only send non-zero copyup data to save some I/O and network
2358 * bandwidth -- zero copyup data is equivalent to the object not 2364 * bandwidth -- zero copyup data is equivalent to the object not
@@ -2362,9 +2368,6 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2362 dout("%s obj_req %p detected zeroes\n", __func__, obj_req); 2368 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2363 bytes = 0; 2369 bytes = 0;
2364 } 2370 }
2365
2366 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2367 "copyup");
2368 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, 2371 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2369 obj_req->copyup_bvecs, 2372 obj_req->copyup_bvecs,
2370 obj_req->copyup_bvec_count, 2373 obj_req->copyup_bvec_count,
@@ -3397,7 +3400,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3397{ 3400{
3398 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3401 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3399 3402
3400 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3401 cancel_work_sync(&rbd_dev->acquired_lock_work); 3403 cancel_work_sync(&rbd_dev->acquired_lock_work);
3402 cancel_work_sync(&rbd_dev->released_lock_work); 3404 cancel_work_sync(&rbd_dev->released_lock_work);
3403 cancel_delayed_work_sync(&rbd_dev->lock_dwork); 3405 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
@@ -3415,6 +3417,7 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3415 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; 3417 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3416 mutex_unlock(&rbd_dev->watch_mutex); 3418 mutex_unlock(&rbd_dev->watch_mutex);
3417 3419
3420 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3418 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3421 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3419} 3422}
3420 3423
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index c9cb2f33a6d6..afcc59ed7090 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1936,7 +1936,6 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1936 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); 1936 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1937 1937
1938 wr_req->r_mtime = ci->vfs_inode.i_mtime; 1938 wr_req->r_mtime = ci->vfs_inode.i_mtime;
1939 wr_req->r_abort_on_full = true;
1940 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); 1939 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1941 1940
1942 if (!err) 1941 if (!err)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 23dbfae16156..0ae41854d676 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -69,6 +69,8 @@ static char *gcap_string(char *s, int c)
69 *s++ = 'w'; 69 *s++ = 'w';
70 if (c & CEPH_CAP_GBUFFER) 70 if (c & CEPH_CAP_GBUFFER)
71 *s++ = 'b'; 71 *s++ = 'b';
72 if (c & CEPH_CAP_GWREXTEND)
73 *s++ = 'a';
72 if (c & CEPH_CAP_GLAZYIO) 74 if (c & CEPH_CAP_GLAZYIO)
73 *s++ = 'l'; 75 *s++ = 'l';
74 return s; 76 return s;
@@ -3022,30 +3024,41 @@ static void invalidate_aliases(struct inode *inode)
3022 dput(prev); 3024 dput(prev);
3023} 3025}
3024 3026
3027struct cap_extra_info {
3028 struct ceph_string *pool_ns;
3029 /* inline data */
3030 u64 inline_version;
3031 void *inline_data;
3032 u32 inline_len;
3033 /* dirstat */
3034 bool dirstat_valid;
3035 u64 nfiles;
3036 u64 nsubdirs;
3037 /* currently issued */
3038 int issued;
3039};
3040
3025/* 3041/*
3026 * Handle a cap GRANT message from the MDS. (Note that a GRANT may 3042 * Handle a cap GRANT message from the MDS. (Note that a GRANT may
3027 * actually be a revocation if it specifies a smaller cap set.) 3043 * actually be a revocation if it specifies a smaller cap set.)
3028 * 3044 *
3029 * caller holds s_mutex and i_ceph_lock, we drop both. 3045 * caller holds s_mutex and i_ceph_lock, we drop both.
3030 */ 3046 */
3031static void handle_cap_grant(struct ceph_mds_client *mdsc, 3047static void handle_cap_grant(struct inode *inode,
3032 struct inode *inode, struct ceph_mds_caps *grant,
3033 struct ceph_string **pns, u64 inline_version,
3034 void *inline_data, u32 inline_len,
3035 struct ceph_buffer *xattr_buf,
3036 struct ceph_mds_session *session, 3048 struct ceph_mds_session *session,
3037 struct ceph_cap *cap, int issued) 3049 struct ceph_cap *cap,
3050 struct ceph_mds_caps *grant,
3051 struct ceph_buffer *xattr_buf,
3052 struct cap_extra_info *extra_info)
3038 __releases(ci->i_ceph_lock) 3053 __releases(ci->i_ceph_lock)
3039 __releases(mdsc->snap_rwsem) 3054 __releases(session->s_mdsc->snap_rwsem)
3040{ 3055{
3041 struct ceph_inode_info *ci = ceph_inode(inode); 3056 struct ceph_inode_info *ci = ceph_inode(inode);
3042 int mds = session->s_mds;
3043 int seq = le32_to_cpu(grant->seq); 3057 int seq = le32_to_cpu(grant->seq);
3044 int newcaps = le32_to_cpu(grant->caps); 3058 int newcaps = le32_to_cpu(grant->caps);
3045 int used, wanted, dirty; 3059 int used, wanted, dirty;
3046 u64 size = le64_to_cpu(grant->size); 3060 u64 size = le64_to_cpu(grant->size);
3047 u64 max_size = le64_to_cpu(grant->max_size); 3061 u64 max_size = le64_to_cpu(grant->max_size);
3048 struct timespec mtime, atime, ctime;
3049 int check_caps = 0; 3062 int check_caps = 0;
3050 bool wake = false; 3063 bool wake = false;
3051 bool writeback = false; 3064 bool writeback = false;
@@ -3055,7 +3068,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3055 bool fill_inline = false; 3068 bool fill_inline = false;
3056 3069
3057 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 3070 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
3058 inode, cap, mds, seq, ceph_cap_string(newcaps)); 3071 inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
3059 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 3072 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
3060 inode->i_size); 3073 inode->i_size);
3061 3074
@@ -3101,7 +3114,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3101 __check_cap_issue(ci, cap, newcaps); 3114 __check_cap_issue(ci, cap, newcaps);
3102 3115
3103 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3116 if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
3104 (issued & CEPH_CAP_AUTH_EXCL) == 0) { 3117 (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
3105 inode->i_mode = le32_to_cpu(grant->mode); 3118 inode->i_mode = le32_to_cpu(grant->mode);
3106 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); 3119 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
3107 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); 3120 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
@@ -3110,15 +3123,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3110 from_kgid(&init_user_ns, inode->i_gid)); 3123 from_kgid(&init_user_ns, inode->i_gid));
3111 } 3124 }
3112 3125
3113 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3126 if ((newcaps & CEPH_CAP_LINK_SHARED) &&
3114 (issued & CEPH_CAP_LINK_EXCL) == 0) { 3127 (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
3115 set_nlink(inode, le32_to_cpu(grant->nlink)); 3128 set_nlink(inode, le32_to_cpu(grant->nlink));
3116 if (inode->i_nlink == 0 && 3129 if (inode->i_nlink == 0 &&
3117 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) 3130 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
3118 deleted_inode = true; 3131 deleted_inode = true;
3119 } 3132 }
3120 3133
3121 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { 3134 if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
3135 grant->xattr_len) {
3122 int len = le32_to_cpu(grant->xattr_len); 3136 int len = le32_to_cpu(grant->xattr_len);
3123 u64 version = le64_to_cpu(grant->xattr_version); 3137 u64 version = le64_to_cpu(grant->xattr_version);
3124 3138
@@ -3134,15 +3148,21 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3134 } 3148 }
3135 3149
3136 if (newcaps & CEPH_CAP_ANY_RD) { 3150 if (newcaps & CEPH_CAP_ANY_RD) {
3151 struct timespec mtime, atime, ctime;
3137 /* ctime/mtime/atime? */ 3152 /* ctime/mtime/atime? */
3138 ceph_decode_timespec(&mtime, &grant->mtime); 3153 ceph_decode_timespec(&mtime, &grant->mtime);
3139 ceph_decode_timespec(&atime, &grant->atime); 3154 ceph_decode_timespec(&atime, &grant->atime);
3140 ceph_decode_timespec(&ctime, &grant->ctime); 3155 ceph_decode_timespec(&ctime, &grant->ctime);
3141 ceph_fill_file_time(inode, issued, 3156 ceph_fill_file_time(inode, extra_info->issued,
3142 le32_to_cpu(grant->time_warp_seq), 3157 le32_to_cpu(grant->time_warp_seq),
3143 &ctime, &mtime, &atime); 3158 &ctime, &mtime, &atime);
3144 } 3159 }
3145 3160
3161 if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
3162 ci->i_files = extra_info->nfiles;
3163 ci->i_subdirs = extra_info->nsubdirs;
3164 }
3165
3146 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { 3166 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
3147 /* file layout may have changed */ 3167 /* file layout may have changed */
3148 s64 old_pool = ci->i_layout.pool_id; 3168 s64 old_pool = ci->i_layout.pool_id;
@@ -3151,15 +3171,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3151 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); 3171 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
3152 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, 3172 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
3153 lockdep_is_held(&ci->i_ceph_lock)); 3173 lockdep_is_held(&ci->i_ceph_lock));
3154 rcu_assign_pointer(ci->i_layout.pool_ns, *pns); 3174 rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
3155 3175
3156 if (ci->i_layout.pool_id != old_pool || *pns != old_ns) 3176 if (ci->i_layout.pool_id != old_pool ||
3177 extra_info->pool_ns != old_ns)
3157 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; 3178 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
3158 3179
3159 *pns = old_ns; 3180 extra_info->pool_ns = old_ns;
3160 3181
3161 /* size/truncate_seq? */ 3182 /* size/truncate_seq? */
3162 queue_trunc = ceph_fill_file_size(inode, issued, 3183 queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
3163 le32_to_cpu(grant->truncate_seq), 3184 le32_to_cpu(grant->truncate_seq),
3164 le64_to_cpu(grant->truncate_size), 3185 le64_to_cpu(grant->truncate_size),
3165 size); 3186 size);
@@ -3238,24 +3259,26 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
3238 } 3259 }
3239 BUG_ON(cap->issued & ~cap->implemented); 3260 BUG_ON(cap->issued & ~cap->implemented);
3240 3261
3241 if (inline_version > 0 && inline_version >= ci->i_inline_version) { 3262 if (extra_info->inline_version > 0 &&
3242 ci->i_inline_version = inline_version; 3263 extra_info->inline_version >= ci->i_inline_version) {
3264 ci->i_inline_version = extra_info->inline_version;
3243 if (ci->i_inline_version != CEPH_INLINE_NONE && 3265 if (ci->i_inline_version != CEPH_INLINE_NONE &&
3244 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) 3266 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
3245 fill_inline = true; 3267 fill_inline = true;
3246 } 3268 }
3247 3269
3248 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 3270 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
3249 if (newcaps & ~issued) 3271 if (newcaps & ~extra_info->issued)
3250 wake = true; 3272 wake = true;
3251 kick_flushing_inode_caps(mdsc, session, inode); 3273 kick_flushing_inode_caps(session->s_mdsc, session, inode);
3252 up_read(&mdsc->snap_rwsem); 3274 up_read(&session->s_mdsc->snap_rwsem);
3253 } else { 3275 } else {
3254 spin_unlock(&ci->i_ceph_lock); 3276 spin_unlock(&ci->i_ceph_lock);
3255 } 3277 }
3256 3278
3257 if (fill_inline) 3279 if (fill_inline)
3258 ceph_fill_inline_data(inode, NULL, inline_data, inline_len); 3280 ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
3281 extra_info->inline_len);
3259 3282
3260 if (queue_trunc) 3283 if (queue_trunc)
3261 ceph_queue_vmtruncate(inode); 3284 ceph_queue_vmtruncate(inode);
@@ -3720,31 +3743,25 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3720 struct ceph_msg *msg) 3743 struct ceph_msg *msg)
3721{ 3744{
3722 struct ceph_mds_client *mdsc = session->s_mdsc; 3745 struct ceph_mds_client *mdsc = session->s_mdsc;
3723 struct super_block *sb = mdsc->fsc->sb;
3724 struct inode *inode; 3746 struct inode *inode;
3725 struct ceph_inode_info *ci; 3747 struct ceph_inode_info *ci;
3726 struct ceph_cap *cap; 3748 struct ceph_cap *cap;
3727 struct ceph_mds_caps *h; 3749 struct ceph_mds_caps *h;
3728 struct ceph_mds_cap_peer *peer = NULL; 3750 struct ceph_mds_cap_peer *peer = NULL;
3729 struct ceph_snap_realm *realm = NULL; 3751 struct ceph_snap_realm *realm = NULL;
3730 struct ceph_string *pool_ns = NULL; 3752 int op;
3731 int mds = session->s_mds; 3753 int msg_version = le16_to_cpu(msg->hdr.version);
3732 int op, issued;
3733 u32 seq, mseq; 3754 u32 seq, mseq;
3734 struct ceph_vino vino; 3755 struct ceph_vino vino;
3735 u64 tid;
3736 u64 inline_version = 0;
3737 void *inline_data = NULL;
3738 u32 inline_len = 0;
3739 void *snaptrace; 3756 void *snaptrace;
3740 size_t snaptrace_len; 3757 size_t snaptrace_len;
3741 void *p, *end; 3758 void *p, *end;
3759 struct cap_extra_info extra_info = {};
3742 3760
3743 dout("handle_caps from mds%d\n", mds); 3761 dout("handle_caps from mds%d\n", session->s_mds);
3744 3762
3745 /* decode */ 3763 /* decode */
3746 end = msg->front.iov_base + msg->front.iov_len; 3764 end = msg->front.iov_base + msg->front.iov_len;
3747 tid = le64_to_cpu(msg->hdr.tid);
3748 if (msg->front.iov_len < sizeof(*h)) 3765 if (msg->front.iov_len < sizeof(*h))
3749 goto bad; 3766 goto bad;
3750 h = msg->front.iov_base; 3767 h = msg->front.iov_base;
@@ -3758,7 +3775,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3758 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3775 snaptrace_len = le32_to_cpu(h->snap_trace_len);
3759 p = snaptrace + snaptrace_len; 3776 p = snaptrace + snaptrace_len;
3760 3777
3761 if (le16_to_cpu(msg->hdr.version) >= 2) { 3778 if (msg_version >= 2) {
3762 u32 flock_len; 3779 u32 flock_len;
3763 ceph_decode_32_safe(&p, end, flock_len, bad); 3780 ceph_decode_32_safe(&p, end, flock_len, bad);
3764 if (p + flock_len > end) 3781 if (p + flock_len > end)
@@ -3766,7 +3783,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3766 p += flock_len; 3783 p += flock_len;
3767 } 3784 }
3768 3785
3769 if (le16_to_cpu(msg->hdr.version) >= 3) { 3786 if (msg_version >= 3) {
3770 if (op == CEPH_CAP_OP_IMPORT) { 3787 if (op == CEPH_CAP_OP_IMPORT) {
3771 if (p + sizeof(*peer) > end) 3788 if (p + sizeof(*peer) > end)
3772 goto bad; 3789 goto bad;
@@ -3778,16 +3795,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3778 } 3795 }
3779 } 3796 }
3780 3797
3781 if (le16_to_cpu(msg->hdr.version) >= 4) { 3798 if (msg_version >= 4) {
3782 ceph_decode_64_safe(&p, end, inline_version, bad); 3799 ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
3783 ceph_decode_32_safe(&p, end, inline_len, bad); 3800 ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
3784 if (p + inline_len > end) 3801 if (p + extra_info.inline_len > end)
3785 goto bad; 3802 goto bad;
3786 inline_data = p; 3803 extra_info.inline_data = p;
3787 p += inline_len; 3804 p += extra_info.inline_len;
3788 } 3805 }
3789 3806
3790 if (le16_to_cpu(msg->hdr.version) >= 5) { 3807 if (msg_version >= 5) {
3791 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 3808 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
3792 u32 epoch_barrier; 3809 u32 epoch_barrier;
3793 3810
@@ -3795,7 +3812,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3795 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); 3812 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
3796 } 3813 }
3797 3814
3798 if (le16_to_cpu(msg->hdr.version) >= 8) { 3815 if (msg_version >= 8) {
3799 u64 flush_tid; 3816 u64 flush_tid;
3800 u32 caller_uid, caller_gid; 3817 u32 caller_uid, caller_gid;
3801 u32 pool_ns_len; 3818 u32 pool_ns_len;
@@ -3809,13 +3826,33 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3809 ceph_decode_32_safe(&p, end, pool_ns_len, bad); 3826 ceph_decode_32_safe(&p, end, pool_ns_len, bad);
3810 if (pool_ns_len > 0) { 3827 if (pool_ns_len > 0) {
3811 ceph_decode_need(&p, end, pool_ns_len, bad); 3828 ceph_decode_need(&p, end, pool_ns_len, bad);
3812 pool_ns = ceph_find_or_create_string(p, pool_ns_len); 3829 extra_info.pool_ns =
3830 ceph_find_or_create_string(p, pool_ns_len);
3813 p += pool_ns_len; 3831 p += pool_ns_len;
3814 } 3832 }
3815 } 3833 }
3816 3834
3835 if (msg_version >= 11) {
3836 struct ceph_timespec *btime;
3837 u64 change_attr;
3838 u32 flags;
3839
3840 /* version >= 9 */
3841 if (p + sizeof(*btime) > end)
3842 goto bad;
3843 btime = p;
3844 p += sizeof(*btime);
3845 ceph_decode_64_safe(&p, end, change_attr, bad);
3846 /* version >= 10 */
3847 ceph_decode_32_safe(&p, end, flags, bad);
3848 /* version >= 11 */
3849 extra_info.dirstat_valid = true;
3850 ceph_decode_64_safe(&p, end, extra_info.nfiles, bad);
3851 ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad);
3852 }
3853
3817 /* lookup ino */ 3854 /* lookup ino */
3818 inode = ceph_find_inode(sb, vino); 3855 inode = ceph_find_inode(mdsc->fsc->sb, vino);
3819 ci = ceph_inode(inode); 3856 ci = ceph_inode(inode);
3820 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, 3857 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
3821 vino.snap, inode); 3858 vino.snap, inode);
@@ -3848,7 +3885,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3848 /* these will work even if we don't have a cap yet */ 3885 /* these will work even if we don't have a cap yet */
3849 switch (op) { 3886 switch (op) {
3850 case CEPH_CAP_OP_FLUSHSNAP_ACK: 3887 case CEPH_CAP_OP_FLUSHSNAP_ACK:
3851 handle_cap_flushsnap_ack(inode, tid, h, session); 3888 handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
3889 h, session);
3852 goto done; 3890 goto done;
3853 3891
3854 case CEPH_CAP_OP_EXPORT: 3892 case CEPH_CAP_OP_EXPORT:
@@ -3867,10 +3905,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3867 down_read(&mdsc->snap_rwsem); 3905 down_read(&mdsc->snap_rwsem);
3868 } 3906 }
3869 handle_cap_import(mdsc, inode, h, peer, session, 3907 handle_cap_import(mdsc, inode, h, peer, session,
3870 &cap, &issued); 3908 &cap, &extra_info.issued);
3871 handle_cap_grant(mdsc, inode, h, &pool_ns, 3909 handle_cap_grant(inode, session, cap,
3872 inline_version, inline_data, inline_len, 3910 h, msg->middle, &extra_info);
3873 msg->middle, session, cap, issued);
3874 if (realm) 3911 if (realm)
3875 ceph_put_snap_realm(mdsc, realm); 3912 ceph_put_snap_realm(mdsc, realm);
3876 goto done_unlocked; 3913 goto done_unlocked;
@@ -3878,10 +3915,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3878 3915
3879 /* the rest require a cap */ 3916 /* the rest require a cap */
3880 spin_lock(&ci->i_ceph_lock); 3917 spin_lock(&ci->i_ceph_lock);
3881 cap = __get_cap_for_mds(ceph_inode(inode), mds); 3918 cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
3882 if (!cap) { 3919 if (!cap) {
3883 dout(" no cap on %p ino %llx.%llx from mds%d\n", 3920 dout(" no cap on %p ino %llx.%llx from mds%d\n",
3884 inode, ceph_ino(inode), ceph_snap(inode), mds); 3921 inode, ceph_ino(inode), ceph_snap(inode),
3922 session->s_mds);
3885 spin_unlock(&ci->i_ceph_lock); 3923 spin_unlock(&ci->i_ceph_lock);
3886 goto flush_cap_releases; 3924 goto flush_cap_releases;
3887 } 3925 }
@@ -3890,15 +3928,15 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3890 switch (op) { 3928 switch (op) {
3891 case CEPH_CAP_OP_REVOKE: 3929 case CEPH_CAP_OP_REVOKE:
3892 case CEPH_CAP_OP_GRANT: 3930 case CEPH_CAP_OP_GRANT:
3893 __ceph_caps_issued(ci, &issued); 3931 __ceph_caps_issued(ci, &extra_info.issued);
3894 issued |= __ceph_caps_dirty(ci); 3932 extra_info.issued |= __ceph_caps_dirty(ci);
3895 handle_cap_grant(mdsc, inode, h, &pool_ns, 3933 handle_cap_grant(inode, session, cap,
3896 inline_version, inline_data, inline_len, 3934 h, msg->middle, &extra_info);
3897 msg->middle, session, cap, issued);
3898 goto done_unlocked; 3935 goto done_unlocked;
3899 3936
3900 case CEPH_CAP_OP_FLUSH_ACK: 3937 case CEPH_CAP_OP_FLUSH_ACK:
3901 handle_cap_flush_ack(inode, tid, h, session, cap); 3938 handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
3939 h, session, cap);
3902 break; 3940 break;
3903 3941
3904 case CEPH_CAP_OP_TRUNC: 3942 case CEPH_CAP_OP_TRUNC:
@@ -3925,7 +3963,7 @@ done:
3925 mutex_unlock(&session->s_mutex); 3963 mutex_unlock(&session->s_mutex);
3926done_unlocked: 3964done_unlocked:
3927 iput(inode); 3965 iput(inode);
3928 ceph_put_string(pool_ns); 3966 ceph_put_string(extra_info.pool_ns);
3929 return; 3967 return;
3930 3968
3931bad: 3969bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 1a78dd6f8bf2..036ac0f3a393 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1486,6 +1486,8 @@ const struct file_operations ceph_dir_fops = {
1486 .release = ceph_release, 1486 .release = ceph_release,
1487 .unlocked_ioctl = ceph_ioctl, 1487 .unlocked_ioctl = ceph_ioctl,
1488 .fsync = ceph_fsync, 1488 .fsync = ceph_fsync,
1489 .lock = ceph_lock,
1490 .flock = ceph_flock,
1489}; 1491};
1490 1492
1491const struct file_operations ceph_snapdir_fops = { 1493const struct file_operations ceph_snapdir_fops = {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index cf0e45b10121..6b9f7f3cd237 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -895,7 +895,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
895 req->r_callback = ceph_aio_complete_req; 895 req->r_callback = ceph_aio_complete_req;
896 req->r_inode = inode; 896 req->r_inode = inode;
897 req->r_priv = aio_req; 897 req->r_priv = aio_req;
898 req->r_abort_on_full = true;
899 898
900 ret = ceph_osdc_start_request(req->r_osdc, req, false); 899 ret = ceph_osdc_start_request(req->r_osdc, req, false);
901out: 900out:
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ae056927080d..4fda7a9d4c9d 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -739,7 +739,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
739 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 739 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
740 struct ceph_mds_reply_inode *info = iinfo->in; 740 struct ceph_mds_reply_inode *info = iinfo->in;
741 struct ceph_inode_info *ci = ceph_inode(inode); 741 struct ceph_inode_info *ci = ceph_inode(inode);
742 int issued = 0, implemented, new_issued; 742 int issued, new_issued, info_caps;
743 struct timespec mtime, atime, ctime; 743 struct timespec mtime, atime, ctime;
744 struct ceph_buffer *xattr_blob = NULL; 744 struct ceph_buffer *xattr_blob = NULL;
745 struct ceph_string *pool_ns = NULL; 745 struct ceph_string *pool_ns = NULL;
@@ -754,8 +754,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
754 inode, ceph_vinop(inode), le64_to_cpu(info->version), 754 inode, ceph_vinop(inode), le64_to_cpu(info->version),
755 ci->i_version); 755 ci->i_version);
756 756
757 info_caps = le32_to_cpu(info->cap.caps);
758
757 /* prealloc new cap struct */ 759 /* prealloc new cap struct */
758 if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP) 760 if (info_caps && ceph_snap(inode) == CEPH_NOSNAP)
759 new_cap = ceph_get_cap(mdsc, caps_reservation); 761 new_cap = ceph_get_cap(mdsc, caps_reservation);
760 762
761 /* 763 /*
@@ -792,9 +794,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
792 le64_to_cpu(info->version) > (ci->i_version & ~1))) 794 le64_to_cpu(info->version) > (ci->i_version & ~1)))
793 new_version = true; 795 new_version = true;
794 796
795 issued = __ceph_caps_issued(ci, &implemented); 797 __ceph_caps_issued(ci, &issued);
796 issued |= implemented | __ceph_caps_dirty(ci); 798 issued |= __ceph_caps_dirty(ci);
797 new_issued = ~issued & le32_to_cpu(info->cap.caps); 799 new_issued = ~issued & info_caps;
798 800
799 /* update inode */ 801 /* update inode */
800 inode->i_rdev = le32_to_cpu(info->rdev); 802 inode->i_rdev = le32_to_cpu(info->rdev);
@@ -826,6 +828,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
826 &ctime, &mtime, &atime); 828 &ctime, &mtime, &atime);
827 } 829 }
828 830
831 if (new_version || (info_caps & CEPH_CAP_FILE_SHARED)) {
832 ci->i_files = le64_to_cpu(info->files);
833 ci->i_subdirs = le64_to_cpu(info->subdirs);
834 }
835
829 if (new_version || 836 if (new_version ||
830 (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { 837 (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
831 s64 old_pool = ci->i_layout.pool_id; 838 s64 old_pool = ci->i_layout.pool_id;
@@ -854,6 +861,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
854 } 861 }
855 } 862 }
856 863
864 /* layout and rstat are not tracked by capability, update them if
865 * the inode info is from auth mds */
866 if (new_version || (info->cap.flags & CEPH_CAP_FLAG_AUTH)) {
867 if (S_ISDIR(inode->i_mode)) {
868 ci->i_dir_layout = iinfo->dir_layout;
869 ci->i_rbytes = le64_to_cpu(info->rbytes);
870 ci->i_rfiles = le64_to_cpu(info->rfiles);
871 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
872 ceph_decode_timespec(&ci->i_rctime, &info->rctime);
873 }
874 }
875
857 /* xattrs */ 876 /* xattrs */
858 /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ 877 /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
859 if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) && 878 if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
@@ -870,7 +889,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
870 } 889 }
871 890
872 /* finally update i_version */ 891 /* finally update i_version */
873 ci->i_version = le64_to_cpu(info->version); 892 if (le64_to_cpu(info->version) > ci->i_version)
893 ci->i_version = le64_to_cpu(info->version);
874 894
875 inode->i_mapping->a_ops = &ceph_aops; 895 inode->i_mapping->a_ops = &ceph_aops;
876 896
@@ -918,15 +938,6 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
918 case S_IFDIR: 938 case S_IFDIR:
919 inode->i_op = &ceph_dir_iops; 939 inode->i_op = &ceph_dir_iops;
920 inode->i_fop = &ceph_dir_fops; 940 inode->i_fop = &ceph_dir_fops;
921
922 ci->i_dir_layout = iinfo->dir_layout;
923
924 ci->i_files = le64_to_cpu(info->files);
925 ci->i_subdirs = le64_to_cpu(info->subdirs);
926 ci->i_rbytes = le64_to_cpu(info->rbytes);
927 ci->i_rfiles = le64_to_cpu(info->rfiles);
928 ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
929 ceph_decode_timespec(&ci->i_rctime, &info->rctime);
930 break; 941 break;
931 default: 942 default:
932 pr_err("fill_inode %llx.%llx BAD mode 0%o\n", 943 pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
@@ -934,12 +945,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
934 } 945 }
935 946
936 /* were we issued a capability? */ 947 /* were we issued a capability? */
937 if (info->cap.caps) { 948 if (info_caps) {
938 if (ceph_snap(inode) == CEPH_NOSNAP) { 949 if (ceph_snap(inode) == CEPH_NOSNAP) {
939 unsigned caps = le32_to_cpu(info->cap.caps);
940 ceph_add_cap(inode, session, 950 ceph_add_cap(inode, session,
941 le64_to_cpu(info->cap.cap_id), 951 le64_to_cpu(info->cap.cap_id),
942 cap_fmode, caps, 952 cap_fmode, info_caps,
943 le32_to_cpu(info->cap.wanted), 953 le32_to_cpu(info->cap.wanted),
944 le32_to_cpu(info->cap.seq), 954 le32_to_cpu(info->cap.seq),
945 le32_to_cpu(info->cap.mseq), 955 le32_to_cpu(info->cap.mseq),
@@ -949,7 +959,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
949 /* set dir completion flag? */ 959 /* set dir completion flag? */
950 if (S_ISDIR(inode->i_mode) && 960 if (S_ISDIR(inode->i_mode) &&
951 ci->i_files == 0 && ci->i_subdirs == 0 && 961 ci->i_files == 0 && ci->i_subdirs == 0 &&
952 (caps & CEPH_CAP_FILE_SHARED) && 962 (info_caps & CEPH_CAP_FILE_SHARED) &&
953 (issued & CEPH_CAP_FILE_EXCL) == 0 && 963 (issued & CEPH_CAP_FILE_EXCL) == 0 &&
954 !__ceph_dir_is_complete(ci)) { 964 !__ceph_dir_is_complete(ci)) {
955 dout(" marking %p complete (empty)\n", inode); 965 dout(" marking %p complete (empty)\n", inode);
@@ -962,8 +972,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
962 wake = true; 972 wake = true;
963 } else { 973 } else {
964 dout(" %p got snap_caps %s\n", inode, 974 dout(" %p got snap_caps %s\n", inode,
965 ceph_cap_string(le32_to_cpu(info->cap.caps))); 975 ceph_cap_string(info_caps));
966 ci->i_snap_caps |= le32_to_cpu(info->cap.caps); 976 ci->i_snap_caps |= info_caps;
967 if (cap_fmode >= 0) 977 if (cap_fmode >= 0)
968 __ceph_get_fmode(ci, cap_fmode); 978 __ceph_get_fmode(ci, cap_fmode);
969 } 979 }
@@ -978,8 +988,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
978 int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 988 int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
979 ci->i_inline_version = iinfo->inline_version; 989 ci->i_inline_version = iinfo->inline_version;
980 if (ci->i_inline_version != CEPH_INLINE_NONE && 990 if (ci->i_inline_version != CEPH_INLINE_NONE &&
981 (locked_page || 991 (locked_page || (info_caps & cache_caps)))
982 (le32_to_cpu(info->cap.caps) & cache_caps)))
983 fill_inline = true; 992 fill_inline = true;
984 } 993 }
985 994
@@ -2178,6 +2187,7 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
2178 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); 2187 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
2179 struct ceph_mds_client *mdsc = fsc->mdsc; 2188 struct ceph_mds_client *mdsc = fsc->mdsc;
2180 struct ceph_mds_request *req; 2189 struct ceph_mds_request *req;
2190 int mode;
2181 int err; 2191 int err;
2182 2192
2183 if (ceph_snap(inode) == CEPH_SNAPDIR) { 2193 if (ceph_snap(inode) == CEPH_SNAPDIR) {
@@ -2190,7 +2200,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
2190 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) 2200 if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
2191 return 0; 2201 return 0;
2192 2202
2193 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); 2203 mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
2204 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
2194 if (IS_ERR(req)) 2205 if (IS_ERR(req))
2195 return PTR_ERR(req); 2206 return PTR_ERR(req);
2196 req->r_inode = inode; 2207 req->r_inode = inode;
@@ -2261,6 +2272,14 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
2261 stat->size = ci->i_files + ci->i_subdirs; 2272 stat->size = ci->i_files + ci->i_subdirs;
2262 stat->blocks = 0; 2273 stat->blocks = 0;
2263 stat->blksize = 65536; 2274 stat->blksize = 65536;
2275 /*
2276 * Some applications rely on the number of st_nlink
2277 * value on directories to be either 0 (if unlinked)
2278 * or 2 + number of subdirectories.
2279 */
2280 if (stat->nlink == 1)
2281 /* '.' + '..' + subdirs */
2282 stat->nlink = 1 + 1 + ci->i_subdirs;
2264 } 2283 }
2265 } 2284 }
2266 return err; 2285 return err;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index b33082e6878f..95a3b3ac9b6e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -45,7 +45,7 @@ static void ceph_put_super(struct super_block *s)
45static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) 45static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
46{ 46{
47 struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry)); 47 struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry));
48 struct ceph_monmap *monmap = fsc->client->monc.monmap; 48 struct ceph_mon_client *monc = &fsc->client->monc;
49 struct ceph_statfs st; 49 struct ceph_statfs st;
50 u64 fsid; 50 u64 fsid;
51 int err; 51 int err;
@@ -58,7 +58,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
58 } 58 }
59 59
60 dout("statfs\n"); 60 dout("statfs\n");
61 err = ceph_monc_do_statfs(&fsc->client->monc, data_pool, &st); 61 err = ceph_monc_do_statfs(monc, data_pool, &st);
62 if (err < 0) 62 if (err < 0)
63 return err; 63 return err;
64 64
@@ -94,8 +94,11 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
94 buf->f_namelen = NAME_MAX; 94 buf->f_namelen = NAME_MAX;
95 95
96 /* Must convert the fsid, for consistent values across arches */ 96 /* Must convert the fsid, for consistent values across arches */
97 fsid = le64_to_cpu(*(__le64 *)(&monmap->fsid)) ^ 97 mutex_lock(&monc->mutex);
98 le64_to_cpu(*((__le64 *)&monmap->fsid + 1)); 98 fsid = le64_to_cpu(*(__le64 *)(&monc->monmap->fsid)) ^
99 le64_to_cpu(*((__le64 *)&monc->monmap->fsid + 1));
100 mutex_unlock(&monc->mutex);
101
99 buf->f_fsid.val[0] = fsid & 0xffffffff; 102 buf->f_fsid.val[0] = fsid & 0xffffffff;
100 buf->f_fsid.val[1] = fsid >> 32; 103 buf->f_fsid.val[1] = fsid >> 32;
101 104
@@ -256,19 +259,19 @@ static int parse_fsopt_token(char *c, void *private)
256 break; 259 break;
257 /* misc */ 260 /* misc */
258 case Opt_wsize: 261 case Opt_wsize:
259 if (intval < PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE) 262 if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
260 return -EINVAL; 263 return -EINVAL;
261 fsopt->wsize = ALIGN(intval, PAGE_SIZE); 264 fsopt->wsize = ALIGN(intval, PAGE_SIZE);
262 break; 265 break;
263 case Opt_rsize: 266 case Opt_rsize:
264 if (intval < PAGE_SIZE || intval > CEPH_MAX_READ_SIZE) 267 if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
265 return -EINVAL; 268 return -EINVAL;
266 fsopt->rsize = ALIGN(intval, PAGE_SIZE); 269 fsopt->rsize = ALIGN(intval, PAGE_SIZE);
267 break; 270 break;
268 case Opt_rasize: 271 case Opt_rasize:
269 if (intval < 0) 272 if (intval < 0)
270 return -EINVAL; 273 return -EINVAL;
271 fsopt->rasize = ALIGN(intval + PAGE_SIZE - 1, PAGE_SIZE); 274 fsopt->rasize = ALIGN(intval, PAGE_SIZE);
272 break; 275 break;
273 case Opt_caps_wanted_delay_min: 276 case Opt_caps_wanted_delay_min:
274 if (intval < 1) 277 if (intval < 1)
@@ -286,7 +289,7 @@ static int parse_fsopt_token(char *c, void *private)
286 fsopt->max_readdir = intval; 289 fsopt->max_readdir = intval;
287 break; 290 break;
288 case Opt_readdir_max_bytes: 291 case Opt_readdir_max_bytes:
289 if (intval < PAGE_SIZE && intval != 0) 292 if (intval < (int)PAGE_SIZE && intval != 0)
290 return -EINVAL; 293 return -EINVAL;
291 fsopt->max_readdir_bytes = intval; 294 fsopt->max_readdir_bytes = intval;
292 break; 295 break;
@@ -534,6 +537,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
534 seq_puts(m, ",noasyncreaddir"); 537 seq_puts(m, ",noasyncreaddir");
535 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0) 538 if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
536 seq_puts(m, ",nodcache"); 539 seq_puts(m, ",nodcache");
540 if (fsopt->flags & CEPH_MOUNT_OPT_INO32)
541 seq_puts(m, ",ino32");
537 if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) { 542 if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
538 seq_show_option(m, "fsc", fsopt->fscache_uniq); 543 seq_show_option(m, "fsc", fsopt->fscache_uniq);
539 } 544 }
@@ -551,7 +556,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
551 556
552 if (fsopt->mds_namespace) 557 if (fsopt->mds_namespace)
553 seq_show_option(m, "mds_namespace", fsopt->mds_namespace); 558 seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
554 if (fsopt->wsize) 559 if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
555 seq_printf(m, ",wsize=%d", fsopt->wsize); 560 seq_printf(m, ",wsize=%d", fsopt->wsize);
556 if (fsopt->rsize != CEPH_MAX_READ_SIZE) 561 if (fsopt->rsize != CEPH_MAX_READ_SIZE)
557 seq_printf(m, ",rsize=%d", fsopt->rsize); 562 seq_printf(m, ",rsize=%d", fsopt->rsize);
@@ -616,7 +621,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
616 err = PTR_ERR(fsc->client); 621 err = PTR_ERR(fsc->client);
617 goto fail; 622 goto fail;
618 } 623 }
624
619 fsc->client->extra_mon_dispatch = extra_mon_dispatch; 625 fsc->client->extra_mon_dispatch = extra_mon_dispatch;
626 fsc->client->osdc.abort_on_full = true;
620 627
621 if (!fsopt->mds_namespace) { 628 if (!fsopt->mds_namespace) {
622 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 629 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
@@ -674,6 +681,13 @@ fail:
674 return ERR_PTR(err); 681 return ERR_PTR(err);
675} 682}
676 683
684static void flush_fs_workqueues(struct ceph_fs_client *fsc)
685{
686 flush_workqueue(fsc->wb_wq);
687 flush_workqueue(fsc->pg_inv_wq);
688 flush_workqueue(fsc->trunc_wq);
689}
690
677static void destroy_fs_client(struct ceph_fs_client *fsc) 691static void destroy_fs_client(struct ceph_fs_client *fsc)
678{ 692{
679 dout("destroy_fs_client %p\n", fsc); 693 dout("destroy_fs_client %p\n", fsc);
@@ -793,6 +807,7 @@ static void ceph_umount_begin(struct super_block *sb)
793 if (!fsc) 807 if (!fsc)
794 return; 808 return;
795 fsc->mount_state = CEPH_MOUNT_SHUTDOWN; 809 fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
810 ceph_osdc_abort_requests(&fsc->client->osdc, -EIO);
796 ceph_mdsc_force_umount(fsc->mdsc); 811 ceph_mdsc_force_umount(fsc->mdsc);
797 return; 812 return;
798} 813}
@@ -1088,6 +1103,8 @@ static void ceph_kill_sb(struct super_block *s)
1088 dout("kill_sb %p\n", s); 1103 dout("kill_sb %p\n", s);
1089 1104
1090 ceph_mdsc_pre_umount(fsc->mdsc); 1105 ceph_mdsc_pre_umount(fsc->mdsc);
1106 flush_fs_workqueues(fsc);
1107
1091 generic_shutdown_super(s); 1108 generic_shutdown_super(s);
1092 1109
1093 fsc->client->extra_mon_dispatch = NULL; 1110 fsc->client->extra_mon_dispatch = NULL;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 315f7e63e7cc..5bc8edb4c2a6 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -50,10 +50,14 @@ struct ceph_vxattr {
50 size_t name_size; /* strlen(name) + 1 (for '\0') */ 50 size_t name_size; /* strlen(name) + 1 (for '\0') */
51 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, 51 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
52 size_t size); 52 size_t size);
53 bool readonly, hidden;
54 bool (*exists_cb)(struct ceph_inode_info *ci); 53 bool (*exists_cb)(struct ceph_inode_info *ci);
54 unsigned int flags;
55}; 55};
56 56
57#define VXATTR_FLAG_READONLY (1<<0)
58#define VXATTR_FLAG_HIDDEN (1<<1)
59#define VXATTR_FLAG_RSTAT (1<<2)
60
57/* layouts */ 61/* layouts */
58 62
59static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) 63static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
@@ -262,32 +266,31 @@ static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci,
262#define CEPH_XATTR_NAME2(_type, _name, _name2) \ 266#define CEPH_XATTR_NAME2(_type, _name, _name2) \
263 XATTR_CEPH_PREFIX #_type "." #_name "." #_name2 267 XATTR_CEPH_PREFIX #_type "." #_name "." #_name2
264 268
265#define XATTR_NAME_CEPH(_type, _name) \ 269#define XATTR_NAME_CEPH(_type, _name, _flags) \
266 { \ 270 { \
267 .name = CEPH_XATTR_NAME(_type, _name), \ 271 .name = CEPH_XATTR_NAME(_type, _name), \
268 .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \ 272 .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
269 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ 273 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
270 .readonly = true, \ 274 .exists_cb = NULL, \
271 .hidden = false, \ 275 .flags = (VXATTR_FLAG_READONLY | _flags), \
272 .exists_cb = NULL, \
273 } 276 }
277#define XATTR_RSTAT_FIELD(_type, _name) \
278 XATTR_NAME_CEPH(_type, _name, VXATTR_FLAG_RSTAT)
274#define XATTR_LAYOUT_FIELD(_type, _name, _field) \ 279#define XATTR_LAYOUT_FIELD(_type, _name, _field) \
275 { \ 280 { \
276 .name = CEPH_XATTR_NAME2(_type, _name, _field), \ 281 .name = CEPH_XATTR_NAME2(_type, _name, _field), \
277 .name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \ 282 .name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \
278 .getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \ 283 .getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \
279 .readonly = false, \
280 .hidden = true, \
281 .exists_cb = ceph_vxattrcb_layout_exists, \ 284 .exists_cb = ceph_vxattrcb_layout_exists, \
285 .flags = VXATTR_FLAG_HIDDEN, \
282 } 286 }
283#define XATTR_QUOTA_FIELD(_type, _name) \ 287#define XATTR_QUOTA_FIELD(_type, _name) \
284 { \ 288 { \
285 .name = CEPH_XATTR_NAME(_type, _name), \ 289 .name = CEPH_XATTR_NAME(_type, _name), \
286 .name_size = sizeof(CEPH_XATTR_NAME(_type, _name)), \ 290 .name_size = sizeof(CEPH_XATTR_NAME(_type, _name)), \
287 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ 291 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
288 .readonly = false, \
289 .hidden = true, \
290 .exists_cb = ceph_vxattrcb_quota_exists, \ 292 .exists_cb = ceph_vxattrcb_quota_exists, \
293 .flags = VXATTR_FLAG_HIDDEN, \
291 } 294 }
292 295
293static struct ceph_vxattr ceph_dir_vxattrs[] = { 296static struct ceph_vxattr ceph_dir_vxattrs[] = {
@@ -295,30 +298,28 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
295 .name = "ceph.dir.layout", 298 .name = "ceph.dir.layout",
296 .name_size = sizeof("ceph.dir.layout"), 299 .name_size = sizeof("ceph.dir.layout"),
297 .getxattr_cb = ceph_vxattrcb_layout, 300 .getxattr_cb = ceph_vxattrcb_layout,
298 .readonly = false,
299 .hidden = true,
300 .exists_cb = ceph_vxattrcb_layout_exists, 301 .exists_cb = ceph_vxattrcb_layout_exists,
302 .flags = VXATTR_FLAG_HIDDEN,
301 }, 303 },
302 XATTR_LAYOUT_FIELD(dir, layout, stripe_unit), 304 XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
303 XATTR_LAYOUT_FIELD(dir, layout, stripe_count), 305 XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
304 XATTR_LAYOUT_FIELD(dir, layout, object_size), 306 XATTR_LAYOUT_FIELD(dir, layout, object_size),
305 XATTR_LAYOUT_FIELD(dir, layout, pool), 307 XATTR_LAYOUT_FIELD(dir, layout, pool),
306 XATTR_LAYOUT_FIELD(dir, layout, pool_namespace), 308 XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
307 XATTR_NAME_CEPH(dir, entries), 309 XATTR_NAME_CEPH(dir, entries, 0),
308 XATTR_NAME_CEPH(dir, files), 310 XATTR_NAME_CEPH(dir, files, 0),
309 XATTR_NAME_CEPH(dir, subdirs), 311 XATTR_NAME_CEPH(dir, subdirs, 0),
310 XATTR_NAME_CEPH(dir, rentries), 312 XATTR_RSTAT_FIELD(dir, rentries),
311 XATTR_NAME_CEPH(dir, rfiles), 313 XATTR_RSTAT_FIELD(dir, rfiles),
312 XATTR_NAME_CEPH(dir, rsubdirs), 314 XATTR_RSTAT_FIELD(dir, rsubdirs),
313 XATTR_NAME_CEPH(dir, rbytes), 315 XATTR_RSTAT_FIELD(dir, rbytes),
314 XATTR_NAME_CEPH(dir, rctime), 316 XATTR_RSTAT_FIELD(dir, rctime),
315 { 317 {
316 .name = "ceph.quota", 318 .name = "ceph.quota",
317 .name_size = sizeof("ceph.quota"), 319 .name_size = sizeof("ceph.quota"),
318 .getxattr_cb = ceph_vxattrcb_quota, 320 .getxattr_cb = ceph_vxattrcb_quota,
319 .readonly = false,
320 .hidden = true,
321 .exists_cb = ceph_vxattrcb_quota_exists, 321 .exists_cb = ceph_vxattrcb_quota_exists,
322 .flags = VXATTR_FLAG_HIDDEN,
322 }, 323 },
323 XATTR_QUOTA_FIELD(quota, max_bytes), 324 XATTR_QUOTA_FIELD(quota, max_bytes),
324 XATTR_QUOTA_FIELD(quota, max_files), 325 XATTR_QUOTA_FIELD(quota, max_files),
@@ -333,9 +334,8 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
333 .name = "ceph.file.layout", 334 .name = "ceph.file.layout",
334 .name_size = sizeof("ceph.file.layout"), 335 .name_size = sizeof("ceph.file.layout"),
335 .getxattr_cb = ceph_vxattrcb_layout, 336 .getxattr_cb = ceph_vxattrcb_layout,
336 .readonly = false,
337 .hidden = true,
338 .exists_cb = ceph_vxattrcb_layout_exists, 337 .exists_cb = ceph_vxattrcb_layout_exists,
338 .flags = VXATTR_FLAG_HIDDEN,
339 }, 339 },
340 XATTR_LAYOUT_FIELD(file, layout, stripe_unit), 340 XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
341 XATTR_LAYOUT_FIELD(file, layout, stripe_count), 341 XATTR_LAYOUT_FIELD(file, layout, stripe_count),
@@ -374,9 +374,10 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
374 struct ceph_vxattr *vxattr; 374 struct ceph_vxattr *vxattr;
375 size_t size = 0; 375 size_t size = 0;
376 376
377 for (vxattr = vxattrs; vxattr->name; vxattr++) 377 for (vxattr = vxattrs; vxattr->name; vxattr++) {
378 if (!vxattr->hidden) 378 if (!(vxattr->flags & VXATTR_FLAG_HIDDEN))
379 size += vxattr->name_size; 379 size += vxattr->name_size;
380 }
380 381
381 return size; 382 return size;
382} 383}
@@ -809,7 +810,10 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
809 /* let's see if a virtual xattr was requested */ 810 /* let's see if a virtual xattr was requested */
810 vxattr = ceph_match_vxattr(inode, name); 811 vxattr = ceph_match_vxattr(inode, name);
811 if (vxattr) { 812 if (vxattr) {
812 err = ceph_do_getattr(inode, 0, true); 813 int mask = 0;
814 if (vxattr->flags & VXATTR_FLAG_RSTAT)
815 mask |= CEPH_STAT_RSTAT;
816 err = ceph_do_getattr(inode, mask, true);
813 if (err) 817 if (err)
814 return err; 818 return err;
815 err = -ENODATA; 819 err = -ENODATA;
@@ -919,7 +923,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
919 err = namelen; 923 err = namelen;
920 if (vxattrs) { 924 if (vxattrs) {
921 for (i = 0; vxattrs[i].name; i++) { 925 for (i = 0; vxattrs[i].name; i++) {
922 if (!vxattrs[i].hidden && 926 if (!(vxattrs[i].flags & VXATTR_FLAG_HIDDEN) &&
923 !(vxattrs[i].exists_cb && 927 !(vxattrs[i].exists_cb &&
924 !vxattrs[i].exists_cb(ci))) { 928 !vxattrs[i].exists_cb(ci))) {
925 len = sprintf(names, "%s", vxattrs[i].name); 929 len = sprintf(names, "%s", vxattrs[i].name);
@@ -1024,7 +1028,7 @@ int __ceph_setxattr(struct inode *inode, const char *name,
1024 1028
1025 vxattr = ceph_match_vxattr(inode, name); 1029 vxattr = ceph_match_vxattr(inode, name);
1026 if (vxattr) { 1030 if (vxattr) {
1027 if (vxattr->readonly) 1031 if (vxattr->flags & VXATTR_FLAG_READONLY)
1028 return -EOPNOTSUPP; 1032 return -EOPNOTSUPP;
1029 if (value && !strncmp(vxattr->name, "ceph.quota", 10)) 1033 if (value && !strncmp(vxattr->name, "ceph.quota", 10))
1030 check_realm = true; 1034 check_realm = true;
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 7ecfc88314d8..4903deb0777a 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -628,6 +628,7 @@ int ceph_flags_to_mode(int flags);
628 CEPH_CAP_XATTR_SHARED) 628 CEPH_CAP_XATTR_SHARED)
629#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \ 629#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \
630 CEPH_CAP_FILE_RD) 630 CEPH_CAP_FILE_RD)
631#define CEPH_STAT_RSTAT CEPH_CAP_FILE_WREXTEND
631 632
632#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \ 633#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \
633 CEPH_CAP_LINK_SHARED | \ 634 CEPH_CAP_LINK_SHARED | \
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 96bb32285989..0d6ee04b4c41 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -170,6 +170,7 @@ struct ceph_osd_request {
170 u64 r_tid; /* unique for this client */ 170 u64 r_tid; /* unique for this client */
171 struct rb_node r_node; 171 struct rb_node r_node;
172 struct rb_node r_mc_node; /* map check */ 172 struct rb_node r_mc_node; /* map check */
173 struct work_struct r_complete_work;
173 struct ceph_osd *r_osd; 174 struct ceph_osd *r_osd;
174 175
175 struct ceph_osd_request_target r_t; 176 struct ceph_osd_request_target r_t;
@@ -201,7 +202,6 @@ struct ceph_osd_request {
201 struct timespec r_mtime; /* ditto */ 202 struct timespec r_mtime; /* ditto */
202 u64 r_data_offset; /* ditto */ 203 u64 r_data_offset; /* ditto */
203 bool r_linger; /* don't resend on failure */ 204 bool r_linger; /* don't resend on failure */
204 bool r_abort_on_full; /* return ENOSPC when full */
205 205
206 /* internal */ 206 /* internal */
207 unsigned long r_stamp; /* jiffies, send or check time */ 207 unsigned long r_stamp; /* jiffies, send or check time */
@@ -347,6 +347,8 @@ struct ceph_osd_client {
347 struct rb_root linger_map_checks; 347 struct rb_root linger_map_checks;
348 atomic_t num_requests; 348 atomic_t num_requests;
349 atomic_t num_homeless; 349 atomic_t num_homeless;
350 bool abort_on_full; /* abort w/ ENOSPC when full */
351 int abort_err;
350 struct delayed_work timeout_work; 352 struct delayed_work timeout_work;
351 struct delayed_work osds_timeout_work; 353 struct delayed_work osds_timeout_work;
352#ifdef CONFIG_DEBUG_FS 354#ifdef CONFIG_DEBUG_FS
@@ -359,6 +361,7 @@ struct ceph_osd_client {
359 struct ceph_msgpool msgpool_op_reply; 361 struct ceph_msgpool msgpool_op_reply;
360 362
361 struct workqueue_struct *notify_wq; 363 struct workqueue_struct *notify_wq;
364 struct workqueue_struct *completion_wq;
362}; 365};
363 366
364static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag) 367static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag)
@@ -378,6 +381,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
378extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, 381extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
379 struct ceph_msg *msg); 382 struct ceph_msg *msg);
380void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb); 383void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb);
384void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err);
381 385
382extern void osd_req_op_init(struct ceph_osd_request *osd_req, 386extern void osd_req_op_init(struct ceph_osd_request *osd_req,
383 unsigned int which, u16 opcode, u32 flags); 387 unsigned int which, u16 opcode, u32 flags);
@@ -440,7 +444,7 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
440 struct page **pages, u64 length, 444 struct page **pages, u64 length,
441 u32 alignment, bool pages_from_pool, 445 u32 alignment, bool pages_from_pool,
442 bool own_pages); 446 bool own_pages);
443extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, 447extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req,
444 unsigned int which, u16 opcode, 448 unsigned int which, u16 opcode,
445 const char *class, const char *method); 449 const char *class, const char *method);
446extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 450extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index e71fb222c7c3..5675b1f09bc5 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -279,10 +279,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
279 const struct ceph_osds *new_acting, 279 const struct ceph_osds *new_acting,
280 bool any_change); 280 bool any_change);
281 281
282int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 282void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
283 const struct ceph_object_id *oid, 283 const struct ceph_object_id *oid,
284 const struct ceph_object_locator *oloc, 284 const struct ceph_object_locator *oloc,
285 struct ceph_pg *raw_pgid); 285 struct ceph_pg *raw_pgid);
286int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 286int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
287 const struct ceph_object_id *oid, 287 const struct ceph_object_id *oid,
288 const struct ceph_object_locator *oloc, 288 const struct ceph_object_locator *oloc,
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3b3d33ea9ed8..c6413c360771 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -168,12 +168,6 @@ static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
168static struct lock_class_key socket_class; 168static struct lock_class_key socket_class;
169#endif 169#endif
170 170
171/*
172 * When skipping (ignoring) a block of input we read it into a "skip
173 * buffer," which is this many bytes in size.
174 */
175#define SKIP_BUF_SIZE 1024
176
177static void queue_con(struct ceph_connection *con); 171static void queue_con(struct ceph_connection *con);
178static void cancel_con(struct ceph_connection *con); 172static void cancel_con(struct ceph_connection *con);
179static void ceph_con_workfn(struct work_struct *); 173static void ceph_con_workfn(struct work_struct *);
@@ -520,12 +514,18 @@ static int ceph_tcp_connect(struct ceph_connection *con)
520 return 0; 514 return 0;
521} 515}
522 516
517/*
518 * If @buf is NULL, discard up to @len bytes.
519 */
523static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 520static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
524{ 521{
525 struct kvec iov = {buf, len}; 522 struct kvec iov = {buf, len};
526 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 523 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
527 int r; 524 int r;
528 525
526 if (!buf)
527 msg.msg_flags |= MSG_TRUNC;
528
529 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len); 529 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len);
530 r = sock_recvmsg(sock, &msg, msg.msg_flags); 530 r = sock_recvmsg(sock, &msg, msg.msg_flags);
531 if (r == -EAGAIN) 531 if (r == -EAGAIN)
@@ -2575,9 +2575,6 @@ static int try_write(struct ceph_connection *con)
2575 con->state != CON_STATE_OPEN) 2575 con->state != CON_STATE_OPEN)
2576 return 0; 2576 return 0;
2577 2577
2578more:
2579 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2580
2581 /* open the socket first? */ 2578 /* open the socket first? */
2582 if (con->state == CON_STATE_PREOPEN) { 2579 if (con->state == CON_STATE_PREOPEN) {
2583 BUG_ON(con->sock); 2580 BUG_ON(con->sock);
@@ -2598,7 +2595,8 @@ more:
2598 } 2595 }
2599 } 2596 }
2600 2597
2601more_kvec: 2598more:
2599 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2602 BUG_ON(!con->sock); 2600 BUG_ON(!con->sock);
2603 2601
2604 /* kvec data queued? */ 2602 /* kvec data queued? */
@@ -2623,7 +2621,7 @@ more_kvec:
2623 2621
2624 ret = write_partial_message_data(con); 2622 ret = write_partial_message_data(con);
2625 if (ret == 1) 2623 if (ret == 1)
2626 goto more_kvec; /* we need to send the footer, too! */ 2624 goto more; /* we need to send the footer, too! */
2627 if (ret == 0) 2625 if (ret == 0)
2628 goto out; 2626 goto out;
2629 if (ret < 0) { 2627 if (ret < 0) {
@@ -2659,8 +2657,6 @@ out:
2659 return ret; 2657 return ret;
2660} 2658}
2661 2659
2662
2663
2664/* 2660/*
2665 * Read what we can from the socket. 2661 * Read what we can from the socket.
2666 */ 2662 */
@@ -2721,16 +2717,11 @@ more:
2721 if (con->in_base_pos < 0) { 2717 if (con->in_base_pos < 0) {
2722 /* 2718 /*
2723 * skipping + discarding content. 2719 * skipping + discarding content.
2724 *
2725 * FIXME: there must be a better way to do this!
2726 */ 2720 */
2727 static char buf[SKIP_BUF_SIZE]; 2721 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
2728 int skip = min((int) sizeof (buf), -con->in_base_pos);
2729
2730 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
2731 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
2732 if (ret <= 0) 2722 if (ret <= 0)
2733 goto out; 2723 goto out;
2724 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
2734 con->in_base_pos += ret; 2725 con->in_base_pos += ret;
2735 if (con->in_base_pos) 2726 if (con->in_base_pos)
2736 goto more; 2727 goto more;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 69a2581ddbba..a00c74f1154e 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -766,7 +766,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
766} 766}
767EXPORT_SYMBOL(osd_req_op_extent_dup_last); 767EXPORT_SYMBOL(osd_req_op_extent_dup_last);
768 768
769void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 769int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
770 u16 opcode, const char *class, const char *method) 770 u16 opcode, const char *class, const char *method)
771{ 771{
772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
@@ -778,7 +778,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
778 BUG_ON(opcode != CEPH_OSD_OP_CALL); 778 BUG_ON(opcode != CEPH_OSD_OP_CALL);
779 779
780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
781 BUG_ON(!pagelist); 781 if (!pagelist)
782 return -ENOMEM;
783
782 ceph_pagelist_init(pagelist); 784 ceph_pagelist_init(pagelist);
783 785
784 op->cls.class_name = class; 786 op->cls.class_name = class;
@@ -798,6 +800,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
798 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 800 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
799 801
800 op->indata_len = payload_len; 802 op->indata_len = payload_len;
803 return 0;
801} 804}
802EXPORT_SYMBOL(osd_req_op_cls_init); 805EXPORT_SYMBOL(osd_req_op_cls_init);
803 806
@@ -1026,7 +1029,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1026 truncate_size, truncate_seq); 1029 truncate_size, truncate_seq);
1027 } 1030 }
1028 1031
1029 req->r_abort_on_full = true;
1030 req->r_flags = flags; 1032 req->r_flags = flags;
1031 req->r_base_oloc.pool = layout->pool_id; 1033 req->r_base_oloc.pool = layout->pool_id;
1032 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 1034 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@ -1054,6 +1056,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
1054DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 1056DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1055DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 1057DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1056 1058
1059/*
1060 * Call @fn on each OSD request as long as @fn returns 0.
1061 */
1062static void for_each_request(struct ceph_osd_client *osdc,
1063 int (*fn)(struct ceph_osd_request *req, void *arg),
1064 void *arg)
1065{
1066 struct rb_node *n, *p;
1067
1068 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1069 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1070
1071 for (p = rb_first(&osd->o_requests); p; ) {
1072 struct ceph_osd_request *req =
1073 rb_entry(p, struct ceph_osd_request, r_node);
1074
1075 p = rb_next(p);
1076 if (fn(req, arg))
1077 return;
1078 }
1079 }
1080
1081 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1082 struct ceph_osd_request *req =
1083 rb_entry(p, struct ceph_osd_request, r_node);
1084
1085 p = rb_next(p);
1086 if (fn(req, arg))
1087 return;
1088 }
1089}
1090
1057static bool osd_homeless(struct ceph_osd *osd) 1091static bool osd_homeless(struct ceph_osd *osd)
1058{ 1092{
1059 return osd->o_osd == CEPH_HOMELESS_OSD; 1093 return osd->o_osd == CEPH_HOMELESS_OSD;
@@ -1395,7 +1429,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1395 bool recovery_deletes = ceph_osdmap_flag(osdc, 1429 bool recovery_deletes = ceph_osdmap_flag(osdc,
1396 CEPH_OSDMAP_RECOVERY_DELETES); 1430 CEPH_OSDMAP_RECOVERY_DELETES);
1397 enum calc_target_result ct_res; 1431 enum calc_target_result ct_res;
1398 int ret;
1399 1432
1400 t->epoch = osdc->osdmap->epoch; 1433 t->epoch = osdc->osdmap->epoch;
1401 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1434 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
@@ -1431,14 +1464,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1431 } 1464 }
1432 } 1465 }
1433 1466
1434 ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, 1467 __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1435 &pgid);
1436 if (ret) {
1437 WARN_ON(ret != -ENOENT);
1438 t->osd = CEPH_HOMELESS_OSD;
1439 ct_res = CALC_TARGET_POOL_DNE;
1440 goto out;
1441 }
1442 last_pgid.pool = pgid.pool; 1468 last_pgid.pool = pgid.pool;
1443 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1469 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1444 1470
@@ -2161,9 +2187,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2161 struct ceph_osd_client *osdc = req->r_osdc; 2187 struct ceph_osd_client *osdc = req->r_osdc;
2162 struct ceph_osd *osd; 2188 struct ceph_osd *osd;
2163 enum calc_target_result ct_res; 2189 enum calc_target_result ct_res;
2190 int err = 0;
2164 bool need_send = false; 2191 bool need_send = false;
2165 bool promoted = false; 2192 bool promoted = false;
2166 bool need_abort = false;
2167 2193
2168 WARN_ON(req->r_tid); 2194 WARN_ON(req->r_tid);
2169 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2195 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -2179,7 +2205,10 @@ again:
2179 goto promote; 2205 goto promote;
2180 } 2206 }
2181 2207
2182 if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2208 if (osdc->abort_err) {
2209 dout("req %p abort_err %d\n", req, osdc->abort_err);
2210 err = osdc->abort_err;
2211 } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2183 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2212 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2184 osdc->epoch_barrier); 2213 osdc->epoch_barrier);
2185 req->r_t.paused = true; 2214 req->r_t.paused = true;
@@ -2200,11 +2229,13 @@ again:
2200 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2229 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2201 pool_full(osdc, req->r_t.base_oloc.pool))) { 2230 pool_full(osdc, req->r_t.base_oloc.pool))) {
2202 dout("req %p full/pool_full\n", req); 2231 dout("req %p full/pool_full\n", req);
2203 pr_warn_ratelimited("FULL or reached pool quota\n"); 2232 if (osdc->abort_on_full) {
2204 req->r_t.paused = true; 2233 err = -ENOSPC;
2205 maybe_request_map(osdc); 2234 } else {
2206 if (req->r_abort_on_full) 2235 pr_warn_ratelimited("FULL or reached pool quota\n");
2207 need_abort = true; 2236 req->r_t.paused = true;
2237 maybe_request_map(osdc);
2238 }
2208 } else if (!osd_homeless(osd)) { 2239 } else if (!osd_homeless(osd)) {
2209 need_send = true; 2240 need_send = true;
2210 } else { 2241 } else {
@@ -2221,11 +2252,11 @@ again:
2221 link_request(osd, req); 2252 link_request(osd, req);
2222 if (need_send) 2253 if (need_send)
2223 send_request(req); 2254 send_request(req);
2224 else if (need_abort) 2255 else if (err)
2225 complete_request(req, -ENOSPC); 2256 complete_request(req, err);
2226 mutex_unlock(&osd->lock); 2257 mutex_unlock(&osd->lock);
2227 2258
2228 if (ct_res == CALC_TARGET_POOL_DNE) 2259 if (!err && ct_res == CALC_TARGET_POOL_DNE)
2229 send_map_check(req); 2260 send_map_check(req);
2230 2261
2231 if (promoted) 2262 if (promoted)
@@ -2281,11 +2312,21 @@ static void finish_request(struct ceph_osd_request *req)
2281 2312
2282static void __complete_request(struct ceph_osd_request *req) 2313static void __complete_request(struct ceph_osd_request *req)
2283{ 2314{
2284 if (req->r_callback) { 2315 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
2285 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, 2316 req->r_tid, req->r_callback, req->r_result);
2286 req->r_tid, req->r_callback, req->r_result); 2317
2318 if (req->r_callback)
2287 req->r_callback(req); 2319 req->r_callback(req);
2288 } 2320 complete_all(&req->r_completion);
2321 ceph_osdc_put_request(req);
2322}
2323
2324static void complete_request_workfn(struct work_struct *work)
2325{
2326 struct ceph_osd_request *req =
2327 container_of(work, struct ceph_osd_request, r_complete_work);
2328
2329 __complete_request(req);
2289} 2330}
2290 2331
2291/* 2332/*
@@ -2297,9 +2338,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
2297 2338
2298 req->r_result = err; 2339 req->r_result = err;
2299 finish_request(req); 2340 finish_request(req);
2300 __complete_request(req); 2341
2301 complete_all(&req->r_completion); 2342 INIT_WORK(&req->r_complete_work, complete_request_workfn);
2302 ceph_osdc_put_request(req); 2343 queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2303} 2344}
2304 2345
2305static void cancel_map_check(struct ceph_osd_request *req) 2346static void cancel_map_check(struct ceph_osd_request *req)
@@ -2336,6 +2377,28 @@ static void abort_request(struct ceph_osd_request *req, int err)
2336 complete_request(req, err); 2377 complete_request(req, err);
2337} 2378}
2338 2379
2380static int abort_fn(struct ceph_osd_request *req, void *arg)
2381{
2382 int err = *(int *)arg;
2383
2384 abort_request(req, err);
2385 return 0; /* continue iteration */
2386}
2387
2388/*
2389 * Abort all in-flight requests with @err and arrange for all future
2390 * requests to be failed immediately.
2391 */
2392void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2393{
2394 dout("%s osdc %p err %d\n", __func__, osdc, err);
2395 down_write(&osdc->lock);
2396 for_each_request(osdc, abort_fn, &err);
2397 osdc->abort_err = err;
2398 up_write(&osdc->lock);
2399}
2400EXPORT_SYMBOL(ceph_osdc_abort_requests);
2401
2339static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2402static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2340{ 2403{
2341 if (likely(eb > osdc->epoch_barrier)) { 2404 if (likely(eb > osdc->epoch_barrier)) {
@@ -2363,6 +2426,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2363EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2426EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2364 2427
2365/* 2428/*
2429 * We can end up releasing caps as a result of abort_request().
2430 * In that case, we probably want to ensure that the cap release message
2431 * has an updated epoch barrier in it, so set the epoch barrier prior to
2432 * aborting the first request.
2433 */
2434static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2435{
2436 struct ceph_osd_client *osdc = req->r_osdc;
2437 bool *victims = arg;
2438
2439 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2440 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2441 pool_full(osdc, req->r_t.base_oloc.pool))) {
2442 if (!*victims) {
2443 update_epoch_barrier(osdc, osdc->osdmap->epoch);
2444 *victims = true;
2445 }
2446 abort_request(req, -ENOSPC);
2447 }
2448
2449 return 0; /* continue iteration */
2450}
2451
2452/*
2366 * Drop all pending requests that are stalled waiting on a full condition to 2453 * Drop all pending requests that are stalled waiting on a full condition to
2367 * clear, and complete them with ENOSPC as the return code. Set the 2454 * clear, and complete them with ENOSPC as the return code. Set the
2368 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2455 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
@@ -2370,61 +2457,11 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2370 */ 2457 */
2371static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2458static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2372{ 2459{
2373 struct rb_node *n;
2374 bool victims = false; 2460 bool victims = false;
2375 2461
2376 dout("enter abort_on_full\n"); 2462 if (osdc->abort_on_full &&
2377 2463 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2378 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) 2464 for_each_request(osdc, abort_on_full_fn, &victims);
2379 goto out;
2380
2381 /* Scan list and see if there is anything to abort */
2382 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2383 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2384 struct rb_node *m;
2385
2386 m = rb_first(&osd->o_requests);
2387 while (m) {
2388 struct ceph_osd_request *req = rb_entry(m,
2389 struct ceph_osd_request, r_node);
2390 m = rb_next(m);
2391
2392 if (req->r_abort_on_full) {
2393 victims = true;
2394 break;
2395 }
2396 }
2397 if (victims)
2398 break;
2399 }
2400
2401 if (!victims)
2402 goto out;
2403
2404 /*
2405 * Update the barrier to current epoch if it's behind that point,
2406 * since we know we have some calls to be aborted in the tree.
2407 */
2408 update_epoch_barrier(osdc, osdc->osdmap->epoch);
2409
2410 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2411 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2412 struct rb_node *m;
2413
2414 m = rb_first(&osd->o_requests);
2415 while (m) {
2416 struct ceph_osd_request *req = rb_entry(m,
2417 struct ceph_osd_request, r_node);
2418 m = rb_next(m);
2419
2420 if (req->r_abort_on_full &&
2421 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2422 pool_full(osdc, req->r_t.target_oloc.pool)))
2423 abort_request(req, -ENOSPC);
2424 }
2425 }
2426out:
2427 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
2428} 2465}
2429 2466
2430static void check_pool_dne(struct ceph_osd_request *req) 2467static void check_pool_dne(struct ceph_osd_request *req)
@@ -3541,8 +3578,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3541 up_read(&osdc->lock); 3578 up_read(&osdc->lock);
3542 3579
3543 __complete_request(req); 3580 __complete_request(req);
3544 complete_all(&req->r_completion);
3545 ceph_osdc_put_request(req);
3546 return; 3581 return;
3547 3582
3548fail_request: 3583fail_request:
@@ -4927,7 +4962,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4927 if (ret) 4962 if (ret)
4928 goto out_put_req; 4963 goto out_put_req;
4929 4964
4930 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); 4965 ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4966 if (ret)
4967 goto out_put_req;
4968
4931 if (req_page) 4969 if (req_page)
4932 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 4970 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4933 0, false, false); 4971 0, false, false);
@@ -4996,6 +5034,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
4996 if (!osdc->notify_wq) 5034 if (!osdc->notify_wq)
4997 goto out_msgpool_reply; 5035 goto out_msgpool_reply;
4998 5036
5037 osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5038 if (!osdc->completion_wq)
5039 goto out_notify_wq;
5040
4999 schedule_delayed_work(&osdc->timeout_work, 5041 schedule_delayed_work(&osdc->timeout_work,
5000 osdc->client->options->osd_keepalive_timeout); 5042 osdc->client->options->osd_keepalive_timeout);
5001 schedule_delayed_work(&osdc->osds_timeout_work, 5043 schedule_delayed_work(&osdc->osds_timeout_work,
@@ -5003,6 +5045,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5003 5045
5004 return 0; 5046 return 0;
5005 5047
5048out_notify_wq:
5049 destroy_workqueue(osdc->notify_wq);
5006out_msgpool_reply: 5050out_msgpool_reply:
5007 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5051 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5008out_msgpool: 5052out_msgpool:
@@ -5017,7 +5061,7 @@ out:
5017 5061
5018void ceph_osdc_stop(struct ceph_osd_client *osdc) 5062void ceph_osdc_stop(struct ceph_osd_client *osdc)
5019{ 5063{
5020 flush_workqueue(osdc->notify_wq); 5064 destroy_workqueue(osdc->completion_wq);
5021 destroy_workqueue(osdc->notify_wq); 5065 destroy_workqueue(osdc->notify_wq);
5022 cancel_delayed_work_sync(&osdc->timeout_work); 5066 cancel_delayed_work_sync(&osdc->timeout_work);
5023 cancel_delayed_work_sync(&osdc->osds_timeout_work); 5067 cancel_delayed_work_sync(&osdc->osds_timeout_work);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index e22820e24f50..98c0ff3d6441 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -2146,10 +2146,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
2146 * Should only be called with target_oid and target_oloc (as opposed to 2146 * Should only be called with target_oid and target_oloc (as opposed to
2147 * base_oid and base_oloc), since tiering isn't taken into account. 2147 * base_oid and base_oloc), since tiering isn't taken into account.
2148 */ 2148 */
2149int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2149void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2150 const struct ceph_object_id *oid, 2150 const struct ceph_object_id *oid,
2151 const struct ceph_object_locator *oloc, 2151 const struct ceph_object_locator *oloc,
2152 struct ceph_pg *raw_pgid) 2152 struct ceph_pg *raw_pgid)
2153{ 2153{
2154 WARN_ON(pi->id != oloc->pool); 2154 WARN_ON(pi->id != oloc->pool);
2155 2155
@@ -2165,11 +2165,8 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2165 int nsl = oloc->pool_ns->len; 2165 int nsl = oloc->pool_ns->len;
2166 size_t total = nsl + 1 + oid->name_len; 2166 size_t total = nsl + 1 + oid->name_len;
2167 2167
2168 if (total > sizeof(stack_buf)) { 2168 if (total > sizeof(stack_buf))
2169 buf = kmalloc(total, GFP_NOIO); 2169 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
2170 if (!buf)
2171 return -ENOMEM;
2172 }
2173 memcpy(buf, oloc->pool_ns->str, nsl); 2170 memcpy(buf, oloc->pool_ns->str, nsl);
2174 buf[nsl] = '\037'; 2171 buf[nsl] = '\037';
2175 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2172 memcpy(buf + nsl + 1, oid->name, oid->name_len);
@@ -2181,7 +2178,6 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
2181 oid->name, nsl, oloc->pool_ns->str, 2178 oid->name, nsl, oloc->pool_ns->str,
2182 raw_pgid->pool, raw_pgid->seed); 2179 raw_pgid->pool, raw_pgid->seed);
2183 } 2180 }
2184 return 0;
2185} 2181}
2186 2182
2187int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2183int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
@@ -2195,7 +2191,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
2195 if (!pi) 2191 if (!pi)
2196 return -ENOENT; 2192 return -ENOENT;
2197 2193
2198 return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2194 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
2195 return 0;
2199} 2196}
2200EXPORT_SYMBOL(ceph_object_locator_to_pg); 2197EXPORT_SYMBOL(ceph_object_locator_to_pg);
2201 2198