aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 11:42:33 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 11:42:33 -0400
commit26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19 (patch)
tree070c518340ae308dce62695a06a118a1df78be15
parent1176032cb12bb89ad558a3e57e82f2f25b817eff (diff)
parenteeca958dce0a9231d1969f86196653eb50fcc9b3 (diff)
Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The two main items are support for disabling automatic rbd exclusive lock transfers from myself and the long awaited -ENOSPC handling series from Jeff. The former will allow rbd users to take advantage of exclusive lock's built-in blacklist/break-lock functionality while staying in control of who owns the lock. With the latter in place, we will abort filesystem writes on -ENOSPC instead of having them block indefinitely. Beyond that we've got the usual pile of filesystem fixes from Zheng, some refcount_t conversion patches from Elena and a patch for an ancient open() flags handling bug from Alexander" * tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits) ceph: fix memory leak in __ceph_setxattr() ceph: fix file open flags on ppc64 ceph: choose readdir frag based on previous readdir reply rbd: exclusive map option rbd: return ResponseMessage result from rbd_handle_request_lock() rbd: kill rbd_is_lock_supported() rbd: support updating the lock cookie without releasing the lock rbd: store lock cookie rbd: ignore unlock errors rbd: fix error handling around rbd_init_disk() rbd: move rbd_unregister_watch() call into rbd_dev_image_release() rbd: move rbd_dev_destroy() call out of rbd_dev_image_release() ceph: when seeing write errors on an inode, switch to sync writes Revert "ceph: SetPageError() for writeback pages if writepages fails" ceph: handle epoch barriers in cap messages libceph: add an epoch_barrier field to struct ceph_osd_client libceph: abort already submitted but abortable requests when map or pool goes full libceph: allow requests to return immediately on full conditions if caller wishes libceph: remove req->r_replay_version ceph: make seeky readdir more efficient ...
-rw-r--r--drivers/block/rbd.c359
-rw-r--r--fs/ceph/addr.c10
-rw-r--r--fs/ceph/caps.c25
-rw-r--r--fs/ceph/debugfs.c21
-rw-r--r--fs/ceph/dir.c23
-rw-r--r--fs/ceph/file.c68
-rw-r--r--fs/ceph/inode.c17
-rw-r--r--fs/ceph/mds_client.c75
-rw-r--r--fs/ceph/mds_client.h15
-rw-r--r--fs/ceph/mdsmap.c44
-rw-r--r--fs/ceph/snap.c2
-rw-r--r--fs/ceph/super.c7
-rw-r--r--fs/ceph/super.h31
-rw-r--r--fs/ceph/xattr.c3
-rw-r--r--include/linux/ceph/ceph_features.h4
-rw-r--r--include/linux/ceph/ceph_fs.h14
-rw-r--r--include/linux/ceph/cls_lock_client.h5
-rw-r--r--include/linux/ceph/libceph.h8
-rw-r--r--include/linux/ceph/mdsmap.h7
-rw-r--r--include/linux/ceph/osd_client.h7
-rw-r--r--include/linux/ceph/pagelist.h6
-rw-r--r--net/ceph/ceph_common.c27
-rw-r--r--net/ceph/cls_lock_client.c51
-rw-r--r--net/ceph/debugfs.c7
-rw-r--r--net/ceph/osd_client.c139
-rw-r--r--net/ceph/pagelist.c2
-rw-r--r--net/ceph/snapshot.c6
27 files changed, 706 insertions, 277 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 26812c1ed0cf..454bf9c34882 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -387,6 +387,7 @@ struct rbd_device {
387 387
388 struct rw_semaphore lock_rwsem; 388 struct rw_semaphore lock_rwsem;
389 enum rbd_lock_state lock_state; 389 enum rbd_lock_state lock_state;
390 char lock_cookie[32];
390 struct rbd_client_id owner_cid; 391 struct rbd_client_id owner_cid;
391 struct work_struct acquired_lock_work; 392 struct work_struct acquired_lock_work;
392 struct work_struct released_lock_work; 393 struct work_struct released_lock_work;
@@ -477,13 +478,6 @@ static int minor_to_rbd_dev_id(int minor)
477 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; 478 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
478} 479}
479 480
480static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
481{
482 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
483 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
484 !rbd_dev->mapping.read_only;
485}
486
487static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) 481static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
488{ 482{
489 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || 483 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
@@ -731,7 +725,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
731 kref_init(&rbdc->kref); 725 kref_init(&rbdc->kref);
732 INIT_LIST_HEAD(&rbdc->node); 726 INIT_LIST_HEAD(&rbdc->node);
733 727
734 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 728 rbdc->client = ceph_create_client(ceph_opts, rbdc);
735 if (IS_ERR(rbdc->client)) 729 if (IS_ERR(rbdc->client))
736 goto out_rbdc; 730 goto out_rbdc;
737 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 731 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
@@ -804,6 +798,7 @@ enum {
804 Opt_read_only, 798 Opt_read_only,
805 Opt_read_write, 799 Opt_read_write,
806 Opt_lock_on_read, 800 Opt_lock_on_read,
801 Opt_exclusive,
807 Opt_err 802 Opt_err
808}; 803};
809 804
@@ -816,6 +811,7 @@ static match_table_t rbd_opts_tokens = {
816 {Opt_read_write, "read_write"}, 811 {Opt_read_write, "read_write"},
817 {Opt_read_write, "rw"}, /* Alternate spelling */ 812 {Opt_read_write, "rw"}, /* Alternate spelling */
818 {Opt_lock_on_read, "lock_on_read"}, 813 {Opt_lock_on_read, "lock_on_read"},
814 {Opt_exclusive, "exclusive"},
819 {Opt_err, NULL} 815 {Opt_err, NULL}
820}; 816};
821 817
@@ -823,11 +819,13 @@ struct rbd_options {
823 int queue_depth; 819 int queue_depth;
824 bool read_only; 820 bool read_only;
825 bool lock_on_read; 821 bool lock_on_read;
822 bool exclusive;
826}; 823};
827 824
828#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ 825#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
829#define RBD_READ_ONLY_DEFAULT false 826#define RBD_READ_ONLY_DEFAULT false
830#define RBD_LOCK_ON_READ_DEFAULT false 827#define RBD_LOCK_ON_READ_DEFAULT false
828#define RBD_EXCLUSIVE_DEFAULT false
831 829
832static int parse_rbd_opts_token(char *c, void *private) 830static int parse_rbd_opts_token(char *c, void *private)
833{ 831{
@@ -866,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private)
866 case Opt_lock_on_read: 864 case Opt_lock_on_read:
867 rbd_opts->lock_on_read = true; 865 rbd_opts->lock_on_read = true;
868 break; 866 break;
867 case Opt_exclusive:
868 rbd_opts->exclusive = true;
869 break;
869 default: 870 default:
870 /* libceph prints "bad option" msg */ 871 /* libceph prints "bad option" msg */
871 return -EINVAL; 872 return -EINVAL;
@@ -3079,7 +3080,8 @@ static int rbd_lock(struct rbd_device *rbd_dev)
3079 char cookie[32]; 3080 char cookie[32];
3080 int ret; 3081 int ret;
3081 3082
3082 WARN_ON(__rbd_is_lock_owner(rbd_dev)); 3083 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3084 rbd_dev->lock_cookie[0] != '\0');
3083 3085
3084 format_lock_cookie(rbd_dev, cookie); 3086 format_lock_cookie(rbd_dev, cookie);
3085 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3087 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
@@ -3089,6 +3091,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
3089 return ret; 3091 return ret;
3090 3092
3091 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; 3093 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3094 strcpy(rbd_dev->lock_cookie, cookie);
3092 rbd_set_owner_cid(rbd_dev, &cid); 3095 rbd_set_owner_cid(rbd_dev, &cid);
3093 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); 3096 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3094 return 0; 3097 return 0;
@@ -3097,27 +3100,24 @@ static int rbd_lock(struct rbd_device *rbd_dev)
3097/* 3100/*
3098 * lock_rwsem must be held for write 3101 * lock_rwsem must be held for write
3099 */ 3102 */
3100static int rbd_unlock(struct rbd_device *rbd_dev) 3103static void rbd_unlock(struct rbd_device *rbd_dev)
3101{ 3104{
3102 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3105 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3103 char cookie[32];
3104 int ret; 3106 int ret;
3105 3107
3106 WARN_ON(!__rbd_is_lock_owner(rbd_dev)); 3108 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3107 3109 rbd_dev->lock_cookie[0] == '\0');
3108 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3109 3110
3110 format_lock_cookie(rbd_dev, cookie);
3111 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, 3111 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3112 RBD_LOCK_NAME, cookie); 3112 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3113 if (ret && ret != -ENOENT) { 3113 if (ret && ret != -ENOENT)
3114 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); 3114 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3115 return ret;
3116 }
3117 3115
3116 /* treat errors as the image is unlocked */
3117 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3118 rbd_dev->lock_cookie[0] = '\0';
3118 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); 3119 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3119 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); 3120 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3120 return 0;
3121} 3121}
3122 3122
3123static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, 3123static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
@@ -3447,6 +3447,18 @@ again:
3447 ret = rbd_request_lock(rbd_dev); 3447 ret = rbd_request_lock(rbd_dev);
3448 if (ret == -ETIMEDOUT) { 3448 if (ret == -ETIMEDOUT) {
3449 goto again; /* treat this as a dead client */ 3449 goto again; /* treat this as a dead client */
3450 } else if (ret == -EROFS) {
3451 rbd_warn(rbd_dev, "peer will not release lock");
3452 /*
3453 * If this is rbd_add_acquire_lock(), we want to fail
3454 * immediately -- reuse BLACKLISTED flag. Otherwise we
3455 * want to block.
3456 */
3457 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3458 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3459 /* wake "rbd map --exclusive" process */
3460 wake_requests(rbd_dev, false);
3461 }
3450 } else if (ret < 0) { 3462 } else if (ret < 0) {
3451 rbd_warn(rbd_dev, "error requesting lock: %d", ret); 3463 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3452 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 3464 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3490,16 +3502,15 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
3490 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) 3502 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3491 return false; 3503 return false;
3492 3504
3493 if (!rbd_unlock(rbd_dev)) 3505 rbd_unlock(rbd_dev);
3494 /* 3506 /*
3495 * Give others a chance to grab the lock - we would re-acquire 3507 * Give others a chance to grab the lock - we would re-acquire
3496 * almost immediately if we got new IO during ceph_osdc_sync() 3508 * almost immediately if we got new IO during ceph_osdc_sync()
3497 * otherwise. We need to ack our own notifications, so this 3509 * otherwise. We need to ack our own notifications, so this
3498 * lock_dwork will be requeued from rbd_wait_state_locked() 3510 * lock_dwork will be requeued from rbd_wait_state_locked()
3499 * after wake_requests() in rbd_handle_released_lock(). 3511 * after wake_requests() in rbd_handle_released_lock().
3500 */ 3512 */
3501 cancel_delayed_work(&rbd_dev->lock_dwork); 3513 cancel_delayed_work(&rbd_dev->lock_dwork);
3502
3503 return true; 3514 return true;
3504} 3515}
3505 3516
@@ -3580,12 +3591,16 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3580 up_read(&rbd_dev->lock_rwsem); 3591 up_read(&rbd_dev->lock_rwsem);
3581} 3592}
3582 3593
3583static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, 3594/*
3584 void **p) 3595 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3596 * ResponseMessage is needed.
3597 */
3598static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3599 void **p)
3585{ 3600{
3586 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); 3601 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3587 struct rbd_client_id cid = { 0 }; 3602 struct rbd_client_id cid = { 0 };
3588 bool need_to_send; 3603 int result = 1;
3589 3604
3590 if (struct_v >= 2) { 3605 if (struct_v >= 2) {
3591 cid.gid = ceph_decode_64(p); 3606 cid.gid = ceph_decode_64(p);
@@ -3595,19 +3610,36 @@ static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3595 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, 3610 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3596 cid.handle); 3611 cid.handle);
3597 if (rbd_cid_equal(&cid, &my_cid)) 3612 if (rbd_cid_equal(&cid, &my_cid))
3598 return false; 3613 return result;
3599 3614
3600 down_read(&rbd_dev->lock_rwsem); 3615 down_read(&rbd_dev->lock_rwsem);
3601 need_to_send = __rbd_is_lock_owner(rbd_dev); 3616 if (__rbd_is_lock_owner(rbd_dev)) {
3602 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { 3617 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3603 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { 3618 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3604 dout("%s rbd_dev %p queueing unlock_work\n", __func__, 3619 goto out_unlock;
3605 rbd_dev); 3620
3606 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); 3621 /*
3622 * encode ResponseMessage(0) so the peer can detect
3623 * a missing owner
3624 */
3625 result = 0;
3626
3627 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3628 if (!rbd_dev->opts->exclusive) {
3629 dout("%s rbd_dev %p queueing unlock_work\n",
3630 __func__, rbd_dev);
3631 queue_work(rbd_dev->task_wq,
3632 &rbd_dev->unlock_work);
3633 } else {
3634 /* refuse to release the lock */
3635 result = -EROFS;
3636 }
3607 } 3637 }
3608 } 3638 }
3639
3640out_unlock:
3609 up_read(&rbd_dev->lock_rwsem); 3641 up_read(&rbd_dev->lock_rwsem);
3610 return need_to_send; 3642 return result;
3611} 3643}
3612 3644
3613static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, 3645static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
@@ -3690,13 +3722,10 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3691 break; 3723 break;
3692 case RBD_NOTIFY_OP_REQUEST_LOCK: 3724 case RBD_NOTIFY_OP_REQUEST_LOCK:
3693 if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) 3725 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3694 /* 3726 if (ret <= 0)
3695 * send ResponseMessage(0) back so the client
3696 * can detect a missing owner
3697 */
3698 rbd_acknowledge_notify_result(rbd_dev, notify_id, 3727 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3699 cookie, 0); 3728 cookie, ret);
3700 else 3729 else
3701 rbd_acknowledge_notify(rbd_dev, notify_id, cookie); 3730 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3702 break; 3731 break;
@@ -3821,24 +3850,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3821 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); 3850 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3822} 3851}
3823 3852
3853/*
3854 * lock_rwsem must be held for write
3855 */
3856static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3857{
3858 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3859 char cookie[32];
3860 int ret;
3861
3862 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3863
3864 format_lock_cookie(rbd_dev, cookie);
3865 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3866 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3867 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3868 RBD_LOCK_TAG, cookie);
3869 if (ret) {
3870 if (ret != -EOPNOTSUPP)
3871 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3872 ret);
3873
3874 /*
3875 * Lock cookie cannot be updated on older OSDs, so do
3876 * a manual release and queue an acquire.
3877 */
3878 if (rbd_release_lock(rbd_dev))
3879 queue_delayed_work(rbd_dev->task_wq,
3880 &rbd_dev->lock_dwork, 0);
3881 } else {
3882 strcpy(rbd_dev->lock_cookie, cookie);
3883 }
3884}
3885
3824static void rbd_reregister_watch(struct work_struct *work) 3886static void rbd_reregister_watch(struct work_struct *work)
3825{ 3887{
3826 struct rbd_device *rbd_dev = container_of(to_delayed_work(work), 3888 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3827 struct rbd_device, watch_dwork); 3889 struct rbd_device, watch_dwork);
3828 bool was_lock_owner = false;
3829 bool need_to_wake = false;
3830 int ret; 3890 int ret;
3831 3891
3832 dout("%s rbd_dev %p\n", __func__, rbd_dev); 3892 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3833 3893
3834 down_write(&rbd_dev->lock_rwsem);
3835 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3836 was_lock_owner = rbd_release_lock(rbd_dev);
3837
3838 mutex_lock(&rbd_dev->watch_mutex); 3894 mutex_lock(&rbd_dev->watch_mutex);
3839 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { 3895 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3840 mutex_unlock(&rbd_dev->watch_mutex); 3896 mutex_unlock(&rbd_dev->watch_mutex);
3841 goto out; 3897 return;
3842 } 3898 }
3843 3899
3844 ret = __rbd_register_watch(rbd_dev); 3900 ret = __rbd_register_watch(rbd_dev);
@@ -3846,36 +3902,28 @@ static void rbd_reregister_watch(struct work_struct *work)
3846 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); 3902 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3847 if (ret == -EBLACKLISTED || ret == -ENOENT) { 3903 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3848 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); 3904 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3849 need_to_wake = true; 3905 wake_requests(rbd_dev, true);
3850 } else { 3906 } else {
3851 queue_delayed_work(rbd_dev->task_wq, 3907 queue_delayed_work(rbd_dev->task_wq,
3852 &rbd_dev->watch_dwork, 3908 &rbd_dev->watch_dwork,
3853 RBD_RETRY_DELAY); 3909 RBD_RETRY_DELAY);
3854 } 3910 }
3855 mutex_unlock(&rbd_dev->watch_mutex); 3911 mutex_unlock(&rbd_dev->watch_mutex);
3856 goto out; 3912 return;
3857 } 3913 }
3858 3914
3859 need_to_wake = true;
3860 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; 3915 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3861 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; 3916 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3862 mutex_unlock(&rbd_dev->watch_mutex); 3917 mutex_unlock(&rbd_dev->watch_mutex);
3863 3918
3919 down_write(&rbd_dev->lock_rwsem);
3920 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3921 rbd_reacquire_lock(rbd_dev);
3922 up_write(&rbd_dev->lock_rwsem);
3923
3864 ret = rbd_dev_refresh(rbd_dev); 3924 ret = rbd_dev_refresh(rbd_dev);
3865 if (ret) 3925 if (ret)
3866 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); 3926 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3867
3868 if (was_lock_owner) {
3869 ret = rbd_try_lock(rbd_dev);
3870 if (ret)
3871 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3872 ret);
3873 }
3874
3875out:
3876 up_write(&rbd_dev->lock_rwsem);
3877 if (need_to_wake)
3878 wake_requests(rbd_dev, true);
3879} 3927}
3880 3928
3881/* 3929/*
@@ -4034,10 +4082,6 @@ static void rbd_queue_workfn(struct work_struct *work)
4034 if (op_type != OBJ_OP_READ) { 4082 if (op_type != OBJ_OP_READ) {
4035 snapc = rbd_dev->header.snapc; 4083 snapc = rbd_dev->header.snapc;
4036 ceph_get_snap_context(snapc); 4084 ceph_get_snap_context(snapc);
4037 must_be_locked = rbd_is_lock_supported(rbd_dev);
4038 } else {
4039 must_be_locked = rbd_dev->opts->lock_on_read &&
4040 rbd_is_lock_supported(rbd_dev);
4041 } 4085 }
4042 up_read(&rbd_dev->header_rwsem); 4086 up_read(&rbd_dev->header_rwsem);
4043 4087
@@ -4048,14 +4092,20 @@ static void rbd_queue_workfn(struct work_struct *work)
4048 goto err_rq; 4092 goto err_rq;
4049 } 4093 }
4050 4094
4095 must_be_locked =
4096 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4097 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4051 if (must_be_locked) { 4098 if (must_be_locked) {
4052 down_read(&rbd_dev->lock_rwsem); 4099 down_read(&rbd_dev->lock_rwsem);
4053 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && 4100 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4054 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) 4101 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4102 if (rbd_dev->opts->exclusive) {
4103 rbd_warn(rbd_dev, "exclusive lock required");
4104 result = -EROFS;
4105 goto err_unlock;
4106 }
4055 rbd_wait_state_locked(rbd_dev); 4107 rbd_wait_state_locked(rbd_dev);
4056 4108 }
4057 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4058 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4059 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { 4109 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4060 result = -EBLACKLISTED; 4110 result = -EBLACKLISTED;
4061 goto err_unlock; 4111 goto err_unlock;
@@ -4114,19 +4164,10 @@ static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4114 4164
4115static void rbd_free_disk(struct rbd_device *rbd_dev) 4165static void rbd_free_disk(struct rbd_device *rbd_dev)
4116{ 4166{
4117 struct gendisk *disk = rbd_dev->disk; 4167 blk_cleanup_queue(rbd_dev->disk->queue);
4118 4168 blk_mq_free_tag_set(&rbd_dev->tag_set);
4119 if (!disk) 4169 put_disk(rbd_dev->disk);
4120 return;
4121
4122 rbd_dev->disk = NULL; 4170 rbd_dev->disk = NULL;
4123 if (disk->flags & GENHD_FL_UP) {
4124 del_gendisk(disk);
4125 if (disk->queue)
4126 blk_cleanup_queue(disk->queue);
4127 blk_mq_free_tag_set(&rbd_dev->tag_set);
4128 }
4129 put_disk(disk);
4130} 4171}
4131 4172
4132static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 4173static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
@@ -4383,8 +4424,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
4383 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) 4424 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4384 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; 4425 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4385 4426
4427 /*
4428 * disk_release() expects a queue ref from add_disk() and will
4429 * put it. Hold an extra ref until add_disk() is called.
4430 */
4431 WARN_ON(!blk_get_queue(q));
4386 disk->queue = q; 4432 disk->queue = q;
4387
4388 q->queuedata = rbd_dev; 4433 q->queuedata = rbd_dev;
4389 4434
4390 rbd_dev->disk = disk; 4435 rbd_dev->disk = disk;
@@ -5624,6 +5669,7 @@ static int rbd_add_parse_args(const char *buf,
5624 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 5669 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5625 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; 5670 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5626 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; 5671 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5672 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5627 5673
5628 copts = ceph_parse_options(options, mon_addrs, 5674 copts = ceph_parse_options(options, mon_addrs,
5629 mon_addrs + mon_addrs_size - 1, 5675 mon_addrs + mon_addrs_size - 1,
@@ -5682,6 +5728,33 @@ again:
5682 return ret; 5728 return ret;
5683} 5729}
5684 5730
5731static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5732{
5733 down_write(&rbd_dev->lock_rwsem);
5734 if (__rbd_is_lock_owner(rbd_dev))
5735 rbd_unlock(rbd_dev);
5736 up_write(&rbd_dev->lock_rwsem);
5737}
5738
5739static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5740{
5741 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5742 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5743 return -EINVAL;
5744 }
5745
5746 /* FIXME: "rbd map --exclusive" should be in interruptible */
5747 down_read(&rbd_dev->lock_rwsem);
5748 rbd_wait_state_locked(rbd_dev);
5749 up_read(&rbd_dev->lock_rwsem);
5750 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5751 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5752 return -EROFS;
5753 }
5754
5755 return 0;
5756}
5757
5685/* 5758/*
5686 * An rbd format 2 image has a unique identifier, distinct from the 5759 * An rbd format 2 image has a unique identifier, distinct from the
5687 * name given to it by the user. Internally, that identifier is 5760 * name given to it by the user. Internally, that identifier is
@@ -5873,6 +5946,15 @@ out_err:
5873 return ret; 5946 return ret;
5874} 5947}
5875 5948
5949static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5950{
5951 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5952 rbd_dev_mapping_clear(rbd_dev);
5953 rbd_free_disk(rbd_dev);
5954 if (!single_major)
5955 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5956}
5957
5876/* 5958/*
5877 * rbd_dev->header_rwsem must be locked for write and will be unlocked 5959 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5878 * upon return. 5960 * upon return.
@@ -5908,26 +5990,13 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5908 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5990 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5909 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); 5991 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5910 5992
5911 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); 5993 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5912 ret = device_add(&rbd_dev->dev);
5913 if (ret) 5994 if (ret)
5914 goto err_out_mapping; 5995 goto err_out_mapping;
5915 5996
5916 /* Everything's ready. Announce the disk to the world. */
5917
5918 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5997 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5919 up_write(&rbd_dev->header_rwsem); 5998 up_write(&rbd_dev->header_rwsem);
5920 5999 return 0;
5921 spin_lock(&rbd_dev_list_lock);
5922 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5923 spin_unlock(&rbd_dev_list_lock);
5924
5925 add_disk(rbd_dev->disk);
5926 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5927 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5928 rbd_dev->header.features);
5929
5930 return ret;
5931 6000
5932err_out_mapping: 6001err_out_mapping:
5933 rbd_dev_mapping_clear(rbd_dev); 6002 rbd_dev_mapping_clear(rbd_dev);
@@ -5962,11 +6031,11 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5962static void rbd_dev_image_release(struct rbd_device *rbd_dev) 6031static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5963{ 6032{
5964 rbd_dev_unprobe(rbd_dev); 6033 rbd_dev_unprobe(rbd_dev);
6034 if (rbd_dev->opts)
6035 rbd_unregister_watch(rbd_dev);
5965 rbd_dev->image_format = 0; 6036 rbd_dev->image_format = 0;
5966 kfree(rbd_dev->spec->image_id); 6037 kfree(rbd_dev->spec->image_id);
5967 rbd_dev->spec->image_id = NULL; 6038 rbd_dev->spec->image_id = NULL;
5968
5969 rbd_dev_destroy(rbd_dev);
5970} 6039}
5971 6040
5972/* 6041/*
@@ -6126,22 +6195,43 @@ static ssize_t do_rbd_add(struct bus_type *bus,
6126 rbd_dev->mapping.read_only = read_only; 6195 rbd_dev->mapping.read_only = read_only;
6127 6196
6128 rc = rbd_dev_device_setup(rbd_dev); 6197 rc = rbd_dev_device_setup(rbd_dev);
6129 if (rc) { 6198 if (rc)
6130 /* 6199 goto err_out_image_probe;
6131 * rbd_unregister_watch() can't be moved into 6200
6132 * rbd_dev_image_release() without refactoring, see 6201 if (rbd_dev->opts->exclusive) {
6133 * commit 1f3ef78861ac. 6202 rc = rbd_add_acquire_lock(rbd_dev);
6134 */ 6203 if (rc)
6135 rbd_unregister_watch(rbd_dev); 6204 goto err_out_device_setup;
6136 rbd_dev_image_release(rbd_dev);
6137 goto out;
6138 } 6205 }
6139 6206
6207 /* Everything's ready. Announce the disk to the world. */
6208
6209 rc = device_add(&rbd_dev->dev);
6210 if (rc)
6211 goto err_out_image_lock;
6212
6213 add_disk(rbd_dev->disk);
6214 /* see rbd_init_disk() */
6215 blk_put_queue(rbd_dev->disk->queue);
6216
6217 spin_lock(&rbd_dev_list_lock);
6218 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6219 spin_unlock(&rbd_dev_list_lock);
6220
6221 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6222 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6223 rbd_dev->header.features);
6140 rc = count; 6224 rc = count;
6141out: 6225out:
6142 module_put(THIS_MODULE); 6226 module_put(THIS_MODULE);
6143 return rc; 6227 return rc;
6144 6228
6229err_out_image_lock:
6230 rbd_dev_image_unlock(rbd_dev);
6231err_out_device_setup:
6232 rbd_dev_device_release(rbd_dev);
6233err_out_image_probe:
6234 rbd_dev_image_release(rbd_dev);
6145err_out_rbd_dev: 6235err_out_rbd_dev:
6146 rbd_dev_destroy(rbd_dev); 6236 rbd_dev_destroy(rbd_dev);
6147err_out_client: 6237err_out_client:
@@ -6169,21 +6259,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
6169 return do_rbd_add(bus, buf, count); 6259 return do_rbd_add(bus, buf, count);
6170} 6260}
6171 6261
6172static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6173{
6174 rbd_free_disk(rbd_dev);
6175
6176 spin_lock(&rbd_dev_list_lock);
6177 list_del_init(&rbd_dev->node);
6178 spin_unlock(&rbd_dev_list_lock);
6179
6180 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6181 device_del(&rbd_dev->dev);
6182 rbd_dev_mapping_clear(rbd_dev);
6183 if (!single_major)
6184 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6185}
6186
6187static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) 6262static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6188{ 6263{
6189 while (rbd_dev->parent) { 6264 while (rbd_dev->parent) {
@@ -6201,6 +6276,7 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6201 } 6276 }
6202 rbd_assert(second); 6277 rbd_assert(second);
6203 rbd_dev_image_release(second); 6278 rbd_dev_image_release(second);
6279 rbd_dev_destroy(second);
6204 first->parent = NULL; 6280 first->parent = NULL;
6205 first->parent_overlap = 0; 6281 first->parent_overlap = 0;
6206 6282
@@ -6269,21 +6345,16 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
6269 blk_set_queue_dying(rbd_dev->disk->queue); 6345 blk_set_queue_dying(rbd_dev->disk->queue);
6270 } 6346 }
6271 6347
6272 down_write(&rbd_dev->lock_rwsem); 6348 del_gendisk(rbd_dev->disk);
6273 if (__rbd_is_lock_owner(rbd_dev)) 6349 spin_lock(&rbd_dev_list_lock);
6274 rbd_unlock(rbd_dev); 6350 list_del_init(&rbd_dev->node);
6275 up_write(&rbd_dev->lock_rwsem); 6351 spin_unlock(&rbd_dev_list_lock);
6276 rbd_unregister_watch(rbd_dev); 6352 device_del(&rbd_dev->dev);
6277 6353
6278 /* 6354 rbd_dev_image_unlock(rbd_dev);
6279 * Don't free anything from rbd_dev->disk until after all
6280 * notifies are completely processed. Otherwise
6281 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6282 * in a potential use after free of rbd_dev->disk or rbd_dev.
6283 */
6284 rbd_dev_device_release(rbd_dev); 6355 rbd_dev_device_release(rbd_dev);
6285 rbd_dev_image_release(rbd_dev); 6356 rbd_dev_image_release(rbd_dev);
6286 6357 rbd_dev_destroy(rbd_dev);
6287 return count; 6358 return count;
6288} 6359}
6289 6360
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 9ecb2fd348cb..1e71e6ca5ddf 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -670,8 +670,12 @@ static void writepages_finish(struct ceph_osd_request *req)
670 bool remove_page; 670 bool remove_page;
671 671
672 dout("writepages_finish %p rc %d\n", inode, rc); 672 dout("writepages_finish %p rc %d\n", inode, rc);
673 if (rc < 0) 673 if (rc < 0) {
674 mapping_set_error(mapping, rc); 674 mapping_set_error(mapping, rc);
675 ceph_set_error_write(ci);
676 } else {
677 ceph_clear_error_write(ci);
678 }
675 679
676 /* 680 /*
677 * We lost the cache cap, need to truncate the page before 681 * We lost the cache cap, need to truncate the page before
@@ -703,9 +707,6 @@ static void writepages_finish(struct ceph_osd_request *req)
703 clear_bdi_congested(inode_to_bdi(inode), 707 clear_bdi_congested(inode_to_bdi(inode),
704 BLK_RW_ASYNC); 708 BLK_RW_ASYNC);
705 709
706 if (rc < 0)
707 SetPageError(page);
708
709 ceph_put_snap_context(page_snap_context(page)); 710 ceph_put_snap_context(page_snap_context(page));
710 page->private = 0; 711 page->private = 0;
711 ClearPagePrivate(page); 712 ClearPagePrivate(page);
@@ -1892,6 +1893,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1892 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); 1893 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1893 1894
1894 wr_req->r_mtime = ci->vfs_inode.i_mtime; 1895 wr_req->r_mtime = ci->vfs_inode.i_mtime;
1896 wr_req->r_abort_on_full = true;
1895 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); 1897 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1896 1898
1897 if (!err) 1899 if (!err)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 68c78be19d5b..a3ebb632294e 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1015,6 +1015,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
1015 void *p; 1015 void *p;
1016 size_t extra_len; 1016 size_t extra_len;
1017 struct timespec zerotime = {0}; 1017 struct timespec zerotime = {0};
1018 struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
1018 1019
1019 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 1020 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
1020 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" 1021 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
@@ -1076,8 +1077,12 @@ static int send_cap_msg(struct cap_msg_args *arg)
1076 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); 1077 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
1077 /* inline data size */ 1078 /* inline data size */
1078 ceph_encode_32(&p, 0); 1079 ceph_encode_32(&p, 0);
1079 /* osd_epoch_barrier (version 5) */ 1080 /*
1080 ceph_encode_32(&p, 0); 1081 * osd_epoch_barrier (version 5)
1082 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
1083 * case it was recently changed
1084 */
1085 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
1081 /* oldest_flush_tid (version 6) */ 1086 /* oldest_flush_tid (version 6) */
1082 ceph_encode_64(&p, arg->oldest_flush_tid); 1087 ceph_encode_64(&p, arg->oldest_flush_tid);
1083 1088
@@ -1389,7 +1394,7 @@ static void __ceph_flush_snaps(struct ceph_inode_info *ci,
1389 first_tid = cf->tid + 1; 1394 first_tid = cf->tid + 1;
1390 1395
1391 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); 1396 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
1392 atomic_inc(&capsnap->nref); 1397 refcount_inc(&capsnap->nref);
1393 spin_unlock(&ci->i_ceph_lock); 1398 spin_unlock(&ci->i_ceph_lock);
1394 1399
1395 dout("__flush_snaps %p capsnap %p tid %llu %s\n", 1400 dout("__flush_snaps %p capsnap %p tid %llu %s\n",
@@ -2202,7 +2207,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
2202 inode, capsnap, cf->tid, 2207 inode, capsnap, cf->tid,
2203 ceph_cap_string(capsnap->dirty)); 2208 ceph_cap_string(capsnap->dirty));
2204 2209
2205 atomic_inc(&capsnap->nref); 2210 refcount_inc(&capsnap->nref);
2206 spin_unlock(&ci->i_ceph_lock); 2211 spin_unlock(&ci->i_ceph_lock);
2207 2212
2208 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 2213 ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
@@ -3633,13 +3638,19 @@ void ceph_handle_caps(struct ceph_mds_session *session,
3633 p += inline_len; 3638 p += inline_len;
3634 } 3639 }
3635 3640
3641 if (le16_to_cpu(msg->hdr.version) >= 5) {
3642 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
3643 u32 epoch_barrier;
3644
3645 ceph_decode_32_safe(&p, end, epoch_barrier, bad);
3646 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
3647 }
3648
3636 if (le16_to_cpu(msg->hdr.version) >= 8) { 3649 if (le16_to_cpu(msg->hdr.version) >= 8) {
3637 u64 flush_tid; 3650 u64 flush_tid;
3638 u32 caller_uid, caller_gid; 3651 u32 caller_uid, caller_gid;
3639 u32 osd_epoch_barrier;
3640 u32 pool_ns_len; 3652 u32 pool_ns_len;
3641 /* version >= 5 */ 3653
3642 ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
3643 /* version >= 6 */ 3654 /* version >= 6 */
3644 ceph_decode_64_safe(&p, end, flush_tid, bad); 3655 ceph_decode_64_safe(&p, end, flush_tid, bad);
3645 /* version >= 7 */ 3656 /* version >= 7 */
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 3ef11bc8d728..4e2d112c982f 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -22,20 +22,19 @@ static int mdsmap_show(struct seq_file *s, void *p)
22{ 22{
23 int i; 23 int i;
24 struct ceph_fs_client *fsc = s->private; 24 struct ceph_fs_client *fsc = s->private;
25 struct ceph_mdsmap *mdsmap;
25 26
26 if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL) 27 if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
27 return 0; 28 return 0;
28 seq_printf(s, "epoch %d\n", fsc->mdsc->mdsmap->m_epoch); 29 mdsmap = fsc->mdsc->mdsmap;
29 seq_printf(s, "root %d\n", fsc->mdsc->mdsmap->m_root); 30 seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
30 seq_printf(s, "session_timeout %d\n", 31 seq_printf(s, "root %d\n", mdsmap->m_root);
31 fsc->mdsc->mdsmap->m_session_timeout); 32 seq_printf(s, "max_mds %d\n", mdsmap->m_max_mds);
32 seq_printf(s, "session_autoclose %d\n", 33 seq_printf(s, "session_timeout %d\n", mdsmap->m_session_timeout);
33 fsc->mdsc->mdsmap->m_session_autoclose); 34 seq_printf(s, "session_autoclose %d\n", mdsmap->m_session_autoclose);
34 for (i = 0; i < fsc->mdsc->mdsmap->m_max_mds; i++) { 35 for (i = 0; i < mdsmap->m_num_mds; i++) {
35 struct ceph_entity_addr *addr = 36 struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr;
36 &fsc->mdsc->mdsmap->m_info[i].addr; 37 int state = mdsmap->m_info[i].state;
37 int state = fsc->mdsc->mdsmap->m_info[i].state;
38
39 seq_printf(s, "\tmds%d\t%s\t(%s)\n", i, 38 seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
40 ceph_pr_addr(&addr->in_addr), 39 ceph_pr_addr(&addr->in_addr),
41 ceph_mds_state_name(state)); 40 ceph_mds_state_name(state));
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 3e9ad501addf..e071d23f6148 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -294,7 +294,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
294 struct ceph_mds_client *mdsc = fsc->mdsc; 294 struct ceph_mds_client *mdsc = fsc->mdsc;
295 int i; 295 int i;
296 int err; 296 int err;
297 u32 ftype; 297 unsigned frag = -1;
298 struct ceph_mds_reply_info_parsed *rinfo; 298 struct ceph_mds_reply_info_parsed *rinfo;
299 299
300 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos); 300 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
@@ -341,7 +341,6 @@ more:
341 /* do we have the correct frag content buffered? */ 341 /* do we have the correct frag content buffered? */
342 if (need_send_readdir(fi, ctx->pos)) { 342 if (need_send_readdir(fi, ctx->pos)) {
343 struct ceph_mds_request *req; 343 struct ceph_mds_request *req;
344 unsigned frag;
345 int op = ceph_snap(inode) == CEPH_SNAPDIR ? 344 int op = ceph_snap(inode) == CEPH_SNAPDIR ?
346 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; 345 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
347 346
@@ -352,8 +351,11 @@ more:
352 } 351 }
353 352
354 if (is_hash_order(ctx->pos)) { 353 if (is_hash_order(ctx->pos)) {
355 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), 354 /* fragtree isn't always accurate. choose frag
356 NULL, NULL); 355 * based on previous reply when possible. */
356 if (frag == (unsigned)-1)
357 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
358 NULL, NULL);
357 } else { 359 } else {
358 frag = fpos_frag(ctx->pos); 360 frag = fpos_frag(ctx->pos);
359 } 361 }
@@ -378,7 +380,11 @@ more:
378 ceph_mdsc_put_request(req); 380 ceph_mdsc_put_request(req);
379 return -ENOMEM; 381 return -ENOMEM;
380 } 382 }
383 } else if (is_hash_order(ctx->pos)) {
384 req->r_args.readdir.offset_hash =
385 cpu_to_le32(fpos_hash(ctx->pos));
381 } 386 }
387
382 req->r_dir_release_cnt = fi->dir_release_count; 388 req->r_dir_release_cnt = fi->dir_release_count;
383 req->r_dir_ordered_cnt = fi->dir_ordered_count; 389 req->r_dir_ordered_cnt = fi->dir_ordered_count;
384 req->r_readdir_cache_idx = fi->readdir_cache_idx; 390 req->r_readdir_cache_idx = fi->readdir_cache_idx;
@@ -476,6 +482,7 @@ more:
476 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; 482 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
477 struct ceph_vino vino; 483 struct ceph_vino vino;
478 ino_t ino; 484 ino_t ino;
485 u32 ftype;
479 486
480 BUG_ON(rde->offset < ctx->pos); 487 BUG_ON(rde->offset < ctx->pos);
481 488
@@ -498,15 +505,17 @@ more:
498 ctx->pos++; 505 ctx->pos++;
499 } 506 }
500 507
508 ceph_mdsc_put_request(fi->last_readdir);
509 fi->last_readdir = NULL;
510
501 if (fi->next_offset > 2) { 511 if (fi->next_offset > 2) {
502 ceph_mdsc_put_request(fi->last_readdir); 512 frag = fi->frag;
503 fi->last_readdir = NULL;
504 goto more; 513 goto more;
505 } 514 }
506 515
507 /* more frags? */ 516 /* more frags? */
508 if (!ceph_frag_is_rightmost(fi->frag)) { 517 if (!ceph_frag_is_rightmost(fi->frag)) {
509 unsigned frag = ceph_frag_next(fi->frag); 518 frag = ceph_frag_next(fi->frag);
510 if (is_hash_order(ctx->pos)) { 519 if (is_hash_order(ctx->pos)) {
511 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), 520 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
512 fi->next_offset, true); 521 fi->next_offset, true);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 18c045e2ead6..3fdde0b283c9 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -13,6 +13,38 @@
13#include "mds_client.h" 13#include "mds_client.h"
14#include "cache.h" 14#include "cache.h"
15 15
16static __le32 ceph_flags_sys2wire(u32 flags)
17{
18 u32 wire_flags = 0;
19
20 switch (flags & O_ACCMODE) {
21 case O_RDONLY:
22 wire_flags |= CEPH_O_RDONLY;
23 break;
24 case O_WRONLY:
25 wire_flags |= CEPH_O_WRONLY;
26 break;
27 case O_RDWR:
28 wire_flags |= CEPH_O_RDWR;
29 break;
30 }
31
32#define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
33
34 ceph_sys2wire(O_CREAT);
35 ceph_sys2wire(O_EXCL);
36 ceph_sys2wire(O_TRUNC);
37 ceph_sys2wire(O_DIRECTORY);
38 ceph_sys2wire(O_NOFOLLOW);
39
40#undef ceph_sys2wire
41
42 if (flags)
43 dout("unused open flags: %x", flags);
44
45 return cpu_to_le32(wire_flags);
46}
47
16/* 48/*
17 * Ceph file operations 49 * Ceph file operations
18 * 50 *
@@ -120,7 +152,7 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode)
120 if (IS_ERR(req)) 152 if (IS_ERR(req))
121 goto out; 153 goto out;
122 req->r_fmode = ceph_flags_to_mode(flags); 154 req->r_fmode = ceph_flags_to_mode(flags);
123 req->r_args.open.flags = cpu_to_le32(flags); 155 req->r_args.open.flags = ceph_flags_sys2wire(flags);
124 req->r_args.open.mode = cpu_to_le32(create_mode); 156 req->r_args.open.mode = cpu_to_le32(create_mode);
125out: 157out:
126 return req; 158 return req;
@@ -189,7 +221,7 @@ int ceph_renew_caps(struct inode *inode)
189 spin_lock(&ci->i_ceph_lock); 221 spin_lock(&ci->i_ceph_lock);
190 wanted = __ceph_caps_file_wanted(ci); 222 wanted = __ceph_caps_file_wanted(ci);
191 if (__ceph_is_any_real_caps(ci) && 223 if (__ceph_is_any_real_caps(ci) &&
192 (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) { 224 (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
193 int issued = __ceph_caps_issued(ci, NULL); 225 int issued = __ceph_caps_issued(ci, NULL);
194 spin_unlock(&ci->i_ceph_lock); 226 spin_unlock(&ci->i_ceph_lock);
195 dout("renew caps %p want %s issued %s updating mds_wanted\n", 227 dout("renew caps %p want %s issued %s updating mds_wanted\n",
@@ -778,6 +810,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
778 req->r_callback = ceph_aio_complete_req; 810 req->r_callback = ceph_aio_complete_req;
779 req->r_inode = inode; 811 req->r_inode = inode;
780 req->r_priv = aio_req; 812 req->r_priv = aio_req;
813 req->r_abort_on_full = true;
781 814
782 ret = ceph_osdc_start_request(req->r_osdc, req, false); 815 ret = ceph_osdc_start_request(req->r_osdc, req, false);
783out: 816out:
@@ -1085,19 +1118,22 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
1085 1118
1086out: 1119out:
1087 ceph_osdc_put_request(req); 1120 ceph_osdc_put_request(req);
1088 if (ret == 0) { 1121 if (ret != 0) {
1089 pos += len; 1122 ceph_set_error_write(ci);
1090 written += len;
1091
1092 if (pos > i_size_read(inode)) {
1093 check_caps = ceph_inode_set_size(inode, pos);
1094 if (check_caps)
1095 ceph_check_caps(ceph_inode(inode),
1096 CHECK_CAPS_AUTHONLY,
1097 NULL);
1098 }
1099 } else
1100 break; 1123 break;
1124 }
1125
1126 ceph_clear_error_write(ci);
1127 pos += len;
1128 written += len;
1129 if (pos > i_size_read(inode)) {
1130 check_caps = ceph_inode_set_size(inode, pos);
1131 if (check_caps)
1132 ceph_check_caps(ceph_inode(inode),
1133 CHECK_CAPS_AUTHONLY,
1134 NULL);
1135 }
1136
1101 } 1137 }
1102 1138
1103 if (ret != -EOLDSNAPC && written > 0) { 1139 if (ret != -EOLDSNAPC && written > 0) {
@@ -1303,6 +1339,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
1303 } 1339 }
1304 1340
1305retry_snap: 1341retry_snap:
1342 /* FIXME: not complete since it doesn't account for being at quota */
1306 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) { 1343 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
1307 err = -ENOSPC; 1344 err = -ENOSPC;
1308 goto out; 1345 goto out;
@@ -1324,7 +1361,8 @@ retry_snap:
1324 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 1361 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
1325 1362
1326 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 1363 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
1327 (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) { 1364 (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
1365 (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
1328 struct ceph_snap_context *snapc; 1366 struct ceph_snap_context *snapc;
1329 struct iov_iter data; 1367 struct iov_iter data;
1330 inode_unlock(inode); 1368 inode_unlock(inode);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index d3119fe3ab45..dcce79b84406 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1482,10 +1482,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1482 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 1482 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
1483 return readdir_prepopulate_inodes_only(req, session); 1483 return readdir_prepopulate_inodes_only(req, session);
1484 1484
1485 if (rinfo->hash_order && req->r_path2) { 1485 if (rinfo->hash_order) {
1486 last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, 1486 if (req->r_path2) {
1487 req->r_path2, strlen(req->r_path2)); 1487 last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
1488 last_hash = ceph_frag_value(last_hash); 1488 req->r_path2,
1489 strlen(req->r_path2));
1490 last_hash = ceph_frag_value(last_hash);
1491 } else if (rinfo->offset_hash) {
1492 /* mds understands offset_hash */
1493 WARN_ON_ONCE(req->r_readdir_offset != 2);
1494 last_hash = le32_to_cpu(rhead->args.readdir.offset_hash);
1495 }
1489 } 1496 }
1490 1497
1491 if (rinfo->dir_dir && 1498 if (rinfo->dir_dir &&
@@ -1510,7 +1517,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1510 } 1517 }
1511 1518
1512 if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 && 1519 if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 &&
1513 !(rinfo->hash_order && req->r_path2)) { 1520 !(rinfo->hash_order && last_hash)) {
1514 /* note dir version at start of readdir so we can tell 1521 /* note dir version at start of readdir so we can tell
1515 * if any dentries get dropped */ 1522 * if any dentries get dropped */
1516 req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); 1523 req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1d3fa90d40b9..f38e56fa9712 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -189,6 +189,7 @@ static int parse_reply_info_dir(void **p, void *end,
189 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 189 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
190 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 190 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
191 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 191 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
192 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
192 } 193 }
193 if (num == 0) 194 if (num == 0)
194 goto done; 195 goto done;
@@ -378,9 +379,9 @@ const char *ceph_session_state_name(int s)
378 379
379static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 380static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
380{ 381{
381 if (atomic_inc_not_zero(&s->s_ref)) { 382 if (refcount_inc_not_zero(&s->s_ref)) {
382 dout("mdsc get_session %p %d -> %d\n", s, 383 dout("mdsc get_session %p %d -> %d\n", s,
383 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 384 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
384 return s; 385 return s;
385 } else { 386 } else {
386 dout("mdsc get_session %p 0 -- FAIL", s); 387 dout("mdsc get_session %p 0 -- FAIL", s);
@@ -391,8 +392,8 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
391void ceph_put_mds_session(struct ceph_mds_session *s) 392void ceph_put_mds_session(struct ceph_mds_session *s)
392{ 393{
393 dout("mdsc put_session %p %d -> %d\n", s, 394 dout("mdsc put_session %p %d -> %d\n", s,
394 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 395 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
395 if (atomic_dec_and_test(&s->s_ref)) { 396 if (refcount_dec_and_test(&s->s_ref)) {
396 if (s->s_auth.authorizer) 397 if (s->s_auth.authorizer)
397 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 398 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
398 kfree(s); 399 kfree(s);
@@ -411,7 +412,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
411 return NULL; 412 return NULL;
412 session = mdsc->sessions[mds]; 413 session = mdsc->sessions[mds];
413 dout("lookup_mds_session %p %d\n", session, 414 dout("lookup_mds_session %p %d\n", session,
414 atomic_read(&session->s_ref)); 415 refcount_read(&session->s_ref));
415 get_session(session); 416 get_session(session);
416 return session; 417 return session;
417} 418}
@@ -441,7 +442,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
441{ 442{
442 struct ceph_mds_session *s; 443 struct ceph_mds_session *s;
443 444
444 if (mds >= mdsc->mdsmap->m_max_mds) 445 if (mds >= mdsc->mdsmap->m_num_mds)
445 return ERR_PTR(-EINVAL); 446 return ERR_PTR(-EINVAL);
446 447
447 s = kzalloc(sizeof(*s), GFP_NOFS); 448 s = kzalloc(sizeof(*s), GFP_NOFS);
@@ -466,7 +467,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
466 INIT_LIST_HEAD(&s->s_caps); 467 INIT_LIST_HEAD(&s->s_caps);
467 s->s_nr_caps = 0; 468 s->s_nr_caps = 0;
468 s->s_trim_caps = 0; 469 s->s_trim_caps = 0;
469 atomic_set(&s->s_ref, 1); 470 refcount_set(&s->s_ref, 1);
470 INIT_LIST_HEAD(&s->s_waiting); 471 INIT_LIST_HEAD(&s->s_waiting);
471 INIT_LIST_HEAD(&s->s_unsafe); 472 INIT_LIST_HEAD(&s->s_unsafe);
472 s->s_num_cap_releases = 0; 473 s->s_num_cap_releases = 0;
@@ -494,7 +495,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
494 } 495 }
495 mdsc->sessions[mds] = s; 496 mdsc->sessions[mds] = s;
496 atomic_inc(&mdsc->num_sessions); 497 atomic_inc(&mdsc->num_sessions);
497 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 498 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
498 499
499 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 500 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
500 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 501 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
@@ -1004,7 +1005,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1004 struct ceph_mds_session *ts; 1005 struct ceph_mds_session *ts;
1005 int i, mds = session->s_mds; 1006 int i, mds = session->s_mds;
1006 1007
1007 if (mds >= mdsc->mdsmap->m_max_mds) 1008 if (mds >= mdsc->mdsmap->m_num_mds)
1008 return; 1009 return;
1009 1010
1010 mi = &mdsc->mdsmap->m_info[mds]; 1011 mi = &mdsc->mdsmap->m_info[mds];
@@ -1551,9 +1552,15 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1551 struct ceph_msg *msg = NULL; 1552 struct ceph_msg *msg = NULL;
1552 struct ceph_mds_cap_release *head; 1553 struct ceph_mds_cap_release *head;
1553 struct ceph_mds_cap_item *item; 1554 struct ceph_mds_cap_item *item;
1555 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1554 struct ceph_cap *cap; 1556 struct ceph_cap *cap;
1555 LIST_HEAD(tmp_list); 1557 LIST_HEAD(tmp_list);
1556 int num_cap_releases; 1558 int num_cap_releases;
1559 __le32 barrier, *cap_barrier;
1560
1561 down_read(&osdc->lock);
1562 barrier = cpu_to_le32(osdc->epoch_barrier);
1563 up_read(&osdc->lock);
1557 1564
1558 spin_lock(&session->s_cap_lock); 1565 spin_lock(&session->s_cap_lock);
1559again: 1566again:
@@ -1571,7 +1578,11 @@ again:
1571 head = msg->front.iov_base; 1578 head = msg->front.iov_base;
1572 head->num = cpu_to_le32(0); 1579 head->num = cpu_to_le32(0);
1573 msg->front.iov_len = sizeof(*head); 1580 msg->front.iov_len = sizeof(*head);
1581
1582 msg->hdr.version = cpu_to_le16(2);
1583 msg->hdr.compat_version = cpu_to_le16(1);
1574 } 1584 }
1585
1575 cap = list_first_entry(&tmp_list, struct ceph_cap, 1586 cap = list_first_entry(&tmp_list, struct ceph_cap,
1576 session_caps); 1587 session_caps);
1577 list_del(&cap->session_caps); 1588 list_del(&cap->session_caps);
@@ -1589,6 +1600,11 @@ again:
1589 ceph_put_cap(mdsc, cap); 1600 ceph_put_cap(mdsc, cap);
1590 1601
1591 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 1602 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1603 // Append cap_barrier field
1604 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1605 *cap_barrier = barrier;
1606 msg->front.iov_len += sizeof(*cap_barrier);
1607
1592 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1608 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1593 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1609 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1594 ceph_con_send(&session->s_con, msg); 1610 ceph_con_send(&session->s_con, msg);
@@ -1604,6 +1620,11 @@ again:
1604 spin_unlock(&session->s_cap_lock); 1620 spin_unlock(&session->s_cap_lock);
1605 1621
1606 if (msg) { 1622 if (msg) {
1623 // Append cap_barrier field
1624 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1625 *cap_barrier = barrier;
1626 msg->front.iov_len += sizeof(*cap_barrier);
1627
1607 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1628 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1608 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1629 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1609 ceph_con_send(&session->s_con, msg); 1630 ceph_con_send(&session->s_con, msg);
@@ -1993,7 +2014,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1993 2014
1994 if (req->r_pagelist) { 2015 if (req->r_pagelist) {
1995 struct ceph_pagelist *pagelist = req->r_pagelist; 2016 struct ceph_pagelist *pagelist = req->r_pagelist;
1996 atomic_inc(&pagelist->refcnt); 2017 refcount_inc(&pagelist->refcnt);
1997 ceph_msg_data_add_pagelist(msg, pagelist); 2018 ceph_msg_data_add_pagelist(msg, pagelist);
1998 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2019 msg->hdr.data_len = cpu_to_le32(pagelist->length);
1999 } else { 2020 } else {
@@ -2640,8 +2661,10 @@ static void handle_session(struct ceph_mds_session *session,
2640 seq = le64_to_cpu(h->seq); 2661 seq = le64_to_cpu(h->seq);
2641 2662
2642 mutex_lock(&mdsc->mutex); 2663 mutex_lock(&mdsc->mutex);
2643 if (op == CEPH_SESSION_CLOSE) 2664 if (op == CEPH_SESSION_CLOSE) {
2665 get_session(session);
2644 __unregister_session(mdsc, session); 2666 __unregister_session(mdsc, session);
2667 }
2645 /* FIXME: this ttl calculation is generous */ 2668 /* FIXME: this ttl calculation is generous */
2646 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2669 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2647 mutex_unlock(&mdsc->mutex); 2670 mutex_unlock(&mdsc->mutex);
@@ -2730,6 +2753,8 @@ static void handle_session(struct ceph_mds_session *session,
2730 kick_requests(mdsc, mds); 2753 kick_requests(mdsc, mds);
2731 mutex_unlock(&mdsc->mutex); 2754 mutex_unlock(&mdsc->mutex);
2732 } 2755 }
2756 if (op == CEPH_SESSION_CLOSE)
2757 ceph_put_mds_session(session);
2733 return; 2758 return;
2734 2759
2735bad: 2760bad:
@@ -3109,7 +3134,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
3109 dout("check_new_map new %u old %u\n", 3134 dout("check_new_map new %u old %u\n",
3110 newmap->m_epoch, oldmap->m_epoch); 3135 newmap->m_epoch, oldmap->m_epoch);
3111 3136
3112 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 3137 for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3113 if (mdsc->sessions[i] == NULL) 3138 if (mdsc->sessions[i] == NULL)
3114 continue; 3139 continue;
3115 s = mdsc->sessions[i]; 3140 s = mdsc->sessions[i];
@@ -3123,15 +3148,33 @@ static void check_new_map(struct ceph_mds_client *mdsc,
3123 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3148 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3124 ceph_session_state_name(s->s_state)); 3149 ceph_session_state_name(s->s_state));
3125 3150
3126 if (i >= newmap->m_max_mds || 3151 if (i >= newmap->m_num_mds ||
3127 memcmp(ceph_mdsmap_get_addr(oldmap, i), 3152 memcmp(ceph_mdsmap_get_addr(oldmap, i),
3128 ceph_mdsmap_get_addr(newmap, i), 3153 ceph_mdsmap_get_addr(newmap, i),
3129 sizeof(struct ceph_entity_addr))) { 3154 sizeof(struct ceph_entity_addr))) {
3130 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 3155 if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3131 /* the session never opened, just close it 3156 /* the session never opened, just close it
3132 * out now */ 3157 * out now */
3158 get_session(s);
3159 __unregister_session(mdsc, s);
3133 __wake_requests(mdsc, &s->s_waiting); 3160 __wake_requests(mdsc, &s->s_waiting);
3161 ceph_put_mds_session(s);
3162 } else if (i >= newmap->m_num_mds) {
3163 /* force close session for stopped mds */
3164 get_session(s);
3134 __unregister_session(mdsc, s); 3165 __unregister_session(mdsc, s);
3166 __wake_requests(mdsc, &s->s_waiting);
3167 kick_requests(mdsc, i);
3168 mutex_unlock(&mdsc->mutex);
3169
3170 mutex_lock(&s->s_mutex);
3171 cleanup_session_requests(mdsc, s);
3172 remove_session_caps(s);
3173 mutex_unlock(&s->s_mutex);
3174
3175 ceph_put_mds_session(s);
3176
3177 mutex_lock(&mdsc->mutex);
3135 } else { 3178 } else {
3136 /* just close it */ 3179 /* just close it */
3137 mutex_unlock(&mdsc->mutex); 3180 mutex_unlock(&mdsc->mutex);
@@ -3169,7 +3212,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
3169 } 3212 }
3170 } 3213 }
3171 3214
3172 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 3215 for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3173 s = mdsc->sessions[i]; 3216 s = mdsc->sessions[i];
3174 if (!s) 3217 if (!s)
3175 continue; 3218 continue;
@@ -3883,7 +3926,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
3883 struct ceph_mds_session *s = con->private; 3926 struct ceph_mds_session *s = con->private;
3884 3927
3885 if (get_session(s)) { 3928 if (get_session(s)) {
3886 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3929 dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
3887 return con; 3930 return con;
3888 } 3931 }
3889 dout("mdsc con_get %p FAIL\n", s); 3932 dout("mdsc con_get %p FAIL\n", s);
@@ -3894,7 +3937,7 @@ static void con_put(struct ceph_connection *con)
3894{ 3937{
3895 struct ceph_mds_session *s = con->private; 3938 struct ceph_mds_session *s = con->private;
3896 3939
3897 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); 3940 dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
3898 ceph_put_mds_session(s); 3941 ceph_put_mds_session(s);
3899} 3942}
3900 3943
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index ac0475a2daa7..db57ae98ed34 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -7,6 +7,7 @@
7#include <linux/mutex.h> 7#include <linux/mutex.h>
8#include <linux/rbtree.h> 8#include <linux/rbtree.h>
9#include <linux/spinlock.h> 9#include <linux/spinlock.h>
10#include <linux/refcount.h>
10 11
11#include <linux/ceph/types.h> 12#include <linux/ceph/types.h>
12#include <linux/ceph/messenger.h> 13#include <linux/ceph/messenger.h>
@@ -82,9 +83,10 @@ struct ceph_mds_reply_info_parsed {
82 struct ceph_mds_reply_dirfrag *dir_dir; 83 struct ceph_mds_reply_dirfrag *dir_dir;
83 size_t dir_buf_size; 84 size_t dir_buf_size;
84 int dir_nr; 85 int dir_nr;
85 bool dir_complete;
86 bool dir_end; 86 bool dir_end;
87 bool dir_complete;
87 bool hash_order; 88 bool hash_order;
89 bool offset_hash;
88 struct ceph_mds_reply_dir_entry *dir_entries; 90 struct ceph_mds_reply_dir_entry *dir_entries;
89 }; 91 };
90 92
@@ -104,10 +106,13 @@ struct ceph_mds_reply_info_parsed {
104 106
105/* 107/*
106 * cap releases are batched and sent to the MDS en masse. 108 * cap releases are batched and sent to the MDS en masse.
109 *
110 * Account for per-message overhead of mds_cap_release header
111 * and __le32 for osd epoch barrier trailing field.
107 */ 112 */
108#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - \ 113#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - sizeof(u32) - \
109 sizeof(struct ceph_mds_cap_release)) / \ 114 sizeof(struct ceph_mds_cap_release)) / \
110 sizeof(struct ceph_mds_cap_item)) 115 sizeof(struct ceph_mds_cap_item))
111 116
112 117
113/* 118/*
@@ -156,7 +161,7 @@ struct ceph_mds_session {
156 unsigned long s_renew_requested; /* last time we sent a renew req */ 161 unsigned long s_renew_requested; /* last time we sent a renew req */
157 u64 s_renew_seq; 162 u64 s_renew_seq;
158 163
159 atomic_t s_ref; 164 refcount_t s_ref;
160 struct list_head s_waiting; /* waiting requests */ 165 struct list_head s_waiting; /* waiting requests */
161 struct list_head s_unsafe; /* unsafe requests */ 166 struct list_head s_unsafe; /* unsafe requests */
162}; 167};
@@ -373,7 +378,7 @@ __ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
373static inline struct ceph_mds_session * 378static inline struct ceph_mds_session *
374ceph_get_mds_session(struct ceph_mds_session *s) 379ceph_get_mds_session(struct ceph_mds_session *s)
375{ 380{
376 atomic_inc(&s->s_ref); 381 refcount_inc(&s->s_ref);
377 return s; 382 return s;
378} 383}
379 384
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 5454e2327a5f..1a748cf88535 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -22,11 +22,11 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
22 int i; 22 int i;
23 23
24 /* special case for one mds */ 24 /* special case for one mds */
25 if (1 == m->m_max_mds && m->m_info[0].state > 0) 25 if (1 == m->m_num_mds && m->m_info[0].state > 0)
26 return 0; 26 return 0;
27 27
28 /* count */ 28 /* count */
29 for (i = 0; i < m->m_max_mds; i++) 29 for (i = 0; i < m->m_num_mds; i++)
30 if (m->m_info[i].state > 0) 30 if (m->m_info[i].state > 0)
31 n++; 31 n++;
32 if (n == 0) 32 if (n == 0)
@@ -135,8 +135,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
135 m->m_session_autoclose = ceph_decode_32(p); 135 m->m_session_autoclose = ceph_decode_32(p);
136 m->m_max_file_size = ceph_decode_64(p); 136 m->m_max_file_size = ceph_decode_64(p);
137 m->m_max_mds = ceph_decode_32(p); 137 m->m_max_mds = ceph_decode_32(p);
138 m->m_num_mds = m->m_max_mds;
138 139
139 m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS); 140 m->m_info = kcalloc(m->m_num_mds, sizeof(*m->m_info), GFP_NOFS);
140 if (m->m_info == NULL) 141 if (m->m_info == NULL)
141 goto nomem; 142 goto nomem;
142 143
@@ -207,9 +208,20 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
207 ceph_pr_addr(&addr.in_addr), 208 ceph_pr_addr(&addr.in_addr),
208 ceph_mds_state_name(state)); 209 ceph_mds_state_name(state));
209 210
210 if (mds < 0 || mds >= m->m_max_mds || state <= 0) 211 if (mds < 0 || state <= 0)
211 continue; 212 continue;
212 213
214 if (mds >= m->m_num_mds) {
215 int new_num = max(mds + 1, m->m_num_mds * 2);
216 void *new_m_info = krealloc(m->m_info,
217 new_num * sizeof(*m->m_info),
218 GFP_NOFS | __GFP_ZERO);
219 if (!new_m_info)
220 goto nomem;
221 m->m_info = new_m_info;
222 m->m_num_mds = new_num;
223 }
224
213 info = &m->m_info[mds]; 225 info = &m->m_info[mds];
214 info->global_id = global_id; 226 info->global_id = global_id;
215 info->state = state; 227 info->state = state;
@@ -229,6 +241,14 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
229 info->export_targets = NULL; 241 info->export_targets = NULL;
230 } 242 }
231 } 243 }
244 if (m->m_num_mds > m->m_max_mds) {
245 /* find max up mds */
246 for (i = m->m_num_mds; i >= m->m_max_mds; i--) {
247 if (i == 0 || m->m_info[i-1].state > 0)
248 break;
249 }
250 m->m_num_mds = i;
251 }
232 252
233 /* pg_pools */ 253 /* pg_pools */
234 ceph_decode_32_safe(p, end, n, bad); 254 ceph_decode_32_safe(p, end, n, bad);
@@ -270,12 +290,22 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
270 290
271 for (i = 0; i < n; i++) { 291 for (i = 0; i < n; i++) {
272 s32 mds = ceph_decode_32(p); 292 s32 mds = ceph_decode_32(p);
273 if (mds >= 0 && mds < m->m_max_mds) { 293 if (mds >= 0 && mds < m->m_num_mds) {
274 if (m->m_info[mds].laggy) 294 if (m->m_info[mds].laggy)
275 num_laggy++; 295 num_laggy++;
276 } 296 }
277 } 297 }
278 m->m_num_laggy = num_laggy; 298 m->m_num_laggy = num_laggy;
299
300 if (n > m->m_num_mds) {
301 void *new_m_info = krealloc(m->m_info,
302 n * sizeof(*m->m_info),
303 GFP_NOFS | __GFP_ZERO);
304 if (!new_m_info)
305 goto nomem;
306 m->m_info = new_m_info;
307 }
308 m->m_num_mds = n;
279 } 309 }
280 310
281 /* inc */ 311 /* inc */
@@ -341,7 +371,7 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
341{ 371{
342 int i; 372 int i;
343 373
344 for (i = 0; i < m->m_max_mds; i++) 374 for (i = 0; i < m->m_num_mds; i++)
345 kfree(m->m_info[i].export_targets); 375 kfree(m->m_info[i].export_targets);
346 kfree(m->m_info); 376 kfree(m->m_info);
347 kfree(m->m_data_pg_pools); 377 kfree(m->m_data_pg_pools);
@@ -357,7 +387,7 @@ bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m)
357 return false; 387 return false;
358 if (m->m_num_laggy > 0) 388 if (m->m_num_laggy > 0)
359 return false; 389 return false;
360 for (i = 0; i < m->m_max_mds; i++) { 390 for (i = 0; i < m->m_num_mds; i++) {
361 if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE) 391 if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
362 nr_active++; 392 nr_active++;
363 } 393 }
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 8f8b41c2ef0f..dab5d6732345 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -519,7 +519,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
519 capsnap->need_flush ? "" : "no_flush"); 519 capsnap->need_flush ? "" : "no_flush");
520 ihold(inode); 520 ihold(inode);
521 521
522 atomic_set(&capsnap->nref, 1); 522 refcount_set(&capsnap->nref, 1);
523 INIT_LIST_HEAD(&capsnap->ci_item); 523 INIT_LIST_HEAD(&capsnap->ci_item);
524 524
525 capsnap->follows = old_snapc->seq; 525 capsnap->follows = old_snapc->seq;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index a8c81b2052ca..8d7918ce694a 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -544,10 +544,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
544 struct ceph_options *opt) 544 struct ceph_options *opt)
545{ 545{
546 struct ceph_fs_client *fsc; 546 struct ceph_fs_client *fsc;
547 const u64 supported_features =
548 CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
549 CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
550 const u64 required_features = 0;
551 int page_count; 547 int page_count;
552 size_t size; 548 size_t size;
553 int err = -ENOMEM; 549 int err = -ENOMEM;
@@ -556,8 +552,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
556 if (!fsc) 552 if (!fsc)
557 return ERR_PTR(-ENOMEM); 553 return ERR_PTR(-ENOMEM);
558 554
559 fsc->client = ceph_create_client(opt, fsc, supported_features, 555 fsc->client = ceph_create_client(opt, fsc);
560 required_features);
561 if (IS_ERR(fsc->client)) { 556 if (IS_ERR(fsc->client)) {
562 err = PTR_ERR(fsc->client); 557 err = PTR_ERR(fsc->client);
563 goto fail; 558 goto fail;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 176186b12457..a973acd8beaf 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -14,6 +14,7 @@
14#include <linux/writeback.h> 14#include <linux/writeback.h>
15#include <linux/slab.h> 15#include <linux/slab.h>
16#include <linux/posix_acl.h> 16#include <linux/posix_acl.h>
17#include <linux/refcount.h>
17 18
18#include <linux/ceph/libceph.h> 19#include <linux/ceph/libceph.h>
19 20
@@ -160,7 +161,7 @@ struct ceph_cap_flush {
160 * data before flushing the snapped state (tracked here) back to the MDS. 161 * data before flushing the snapped state (tracked here) back to the MDS.
161 */ 162 */
162struct ceph_cap_snap { 163struct ceph_cap_snap {
163 atomic_t nref; 164 refcount_t nref;
164 struct list_head ci_item; 165 struct list_head ci_item;
165 166
166 struct ceph_cap_flush cap_flush; 167 struct ceph_cap_flush cap_flush;
@@ -189,7 +190,7 @@ struct ceph_cap_snap {
189 190
190static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) 191static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
191{ 192{
192 if (atomic_dec_and_test(&capsnap->nref)) { 193 if (refcount_dec_and_test(&capsnap->nref)) {
193 if (capsnap->xattr_blob) 194 if (capsnap->xattr_blob)
194 ceph_buffer_put(capsnap->xattr_blob); 195 ceph_buffer_put(capsnap->xattr_blob);
195 kfree(capsnap); 196 kfree(capsnap);
@@ -471,6 +472,32 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
471#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */ 472#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
472#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */ 473#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */
473#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */ 474#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */
475#define CEPH_I_ERROR_WRITE (1 << 11) /* have seen write errors */
476
477/*
478 * We set the ERROR_WRITE bit when we start seeing write errors on an inode
479 * and then clear it when they start succeeding. Note that we do a lockless
480 * check first, and only take the lock if it looks like it needs to be changed.
481 * The write submission code just takes this as a hint, so we're not too
482 * worried if a few slip through in either direction.
483 */
484static inline void ceph_set_error_write(struct ceph_inode_info *ci)
485{
486 if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE)) {
487 spin_lock(&ci->i_ceph_lock);
488 ci->i_ceph_flags |= CEPH_I_ERROR_WRITE;
489 spin_unlock(&ci->i_ceph_lock);
490 }
491}
492
493static inline void ceph_clear_error_write(struct ceph_inode_info *ci)
494{
495 if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE) {
496 spin_lock(&ci->i_ceph_lock);
497 ci->i_ceph_flags &= ~CEPH_I_ERROR_WRITE;
498 spin_unlock(&ci->i_ceph_lock);
499 }
500}
474 501
475static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, 502static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
476 long long release_count, 503 long long release_count,
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index febc28f9e2c2..75267cdd5dfd 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -392,6 +392,7 @@ static int __set_xattr(struct ceph_inode_info *ci,
392 392
393 if (update_xattr) { 393 if (update_xattr) {
394 int err = 0; 394 int err = 0;
395
395 if (xattr && (flags & XATTR_CREATE)) 396 if (xattr && (flags & XATTR_CREATE))
396 err = -EEXIST; 397 err = -EEXIST;
397 else if (!xattr && (flags & XATTR_REPLACE)) 398 else if (!xattr && (flags & XATTR_REPLACE))
@@ -399,12 +400,14 @@ static int __set_xattr(struct ceph_inode_info *ci,
399 if (err) { 400 if (err) {
400 kfree(name); 401 kfree(name);
401 kfree(val); 402 kfree(val);
403 kfree(*newxattr);
402 return err; 404 return err;
403 } 405 }
404 if (update_xattr < 0) { 406 if (update_xattr < 0) {
405 if (xattr) 407 if (xattr)
406 __remove_xattr(ci, xattr); 408 __remove_xattr(ci, xattr);
407 kfree(name); 409 kfree(name);
410 kfree(*newxattr);
408 return 0; 411 return 0;
409 } 412 }
410 } 413 }
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index ae2f66833762..fd8b2953c78f 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -105,8 +105,10 @@ static inline u64 ceph_sanitize_features(u64 features)
105 */ 105 */
106#define CEPH_FEATURES_SUPPORTED_DEFAULT \ 106#define CEPH_FEATURES_SUPPORTED_DEFAULT \
107 (CEPH_FEATURE_NOSRCADDR | \ 107 (CEPH_FEATURE_NOSRCADDR | \
108 CEPH_FEATURE_FLOCK | \
108 CEPH_FEATURE_SUBSCRIBE2 | \ 109 CEPH_FEATURE_SUBSCRIBE2 | \
109 CEPH_FEATURE_RECONNECT_SEQ | \ 110 CEPH_FEATURE_RECONNECT_SEQ | \
111 CEPH_FEATURE_DIRLAYOUTHASH | \
110 CEPH_FEATURE_PGID64 | \ 112 CEPH_FEATURE_PGID64 | \
111 CEPH_FEATURE_PGPOOL3 | \ 113 CEPH_FEATURE_PGPOOL3 | \
112 CEPH_FEATURE_OSDENC | \ 114 CEPH_FEATURE_OSDENC | \
@@ -114,11 +116,13 @@ static inline u64 ceph_sanitize_features(u64 features)
114 CEPH_FEATURE_MSG_AUTH | \ 116 CEPH_FEATURE_MSG_AUTH | \
115 CEPH_FEATURE_CRUSH_TUNABLES2 | \ 117 CEPH_FEATURE_CRUSH_TUNABLES2 | \
116 CEPH_FEATURE_REPLY_CREATE_INODE | \ 118 CEPH_FEATURE_REPLY_CREATE_INODE | \
119 CEPH_FEATURE_MDSENC | \
117 CEPH_FEATURE_OSDHASHPSPOOL | \ 120 CEPH_FEATURE_OSDHASHPSPOOL | \
118 CEPH_FEATURE_OSD_CACHEPOOL | \ 121 CEPH_FEATURE_OSD_CACHEPOOL | \
119 CEPH_FEATURE_CRUSH_V2 | \ 122 CEPH_FEATURE_CRUSH_V2 | \
120 CEPH_FEATURE_EXPORT_PEER | \ 123 CEPH_FEATURE_EXPORT_PEER | \
121 CEPH_FEATURE_OSDMAP_ENC | \ 124 CEPH_FEATURE_OSDMAP_ENC | \
125 CEPH_FEATURE_MDS_INLINE_DATA | \
122 CEPH_FEATURE_CRUSH_TUNABLES3 | \ 126 CEPH_FEATURE_CRUSH_TUNABLES3 | \
123 CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \ 127 CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \
124 CEPH_FEATURE_MSGR_KEEPALIVE2 | \ 128 CEPH_FEATURE_MSGR_KEEPALIVE2 | \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index f4b2ee18f38c..ad078ebe25d6 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -365,6 +365,19 @@ extern const char *ceph_mds_op_name(int op);
365#define CEPH_READDIR_FRAG_END (1<<0) 365#define CEPH_READDIR_FRAG_END (1<<0)
366#define CEPH_READDIR_FRAG_COMPLETE (1<<8) 366#define CEPH_READDIR_FRAG_COMPLETE (1<<8)
367#define CEPH_READDIR_HASH_ORDER (1<<9) 367#define CEPH_READDIR_HASH_ORDER (1<<9)
368#define CEPH_READDIR_OFFSET_HASH (1<<10)
369
370/*
371 * open request flags
372 */
373#define CEPH_O_RDONLY 00000000
374#define CEPH_O_WRONLY 00000001
375#define CEPH_O_RDWR 00000002
376#define CEPH_O_CREAT 00000100
377#define CEPH_O_EXCL 00000200
378#define CEPH_O_TRUNC 00001000
379#define CEPH_O_DIRECTORY 00200000
380#define CEPH_O_NOFOLLOW 00400000
368 381
369union ceph_mds_request_args { 382union ceph_mds_request_args {
370 struct { 383 struct {
@@ -384,6 +397,7 @@ union ceph_mds_request_args {
384 __le32 max_entries; /* how many dentries to grab */ 397 __le32 max_entries; /* how many dentries to grab */
385 __le32 max_bytes; 398 __le32 max_bytes;
386 __le16 flags; 399 __le16 flags;
400 __le32 offset_hash;
387 } __attribute__ ((packed)) readdir; 401 } __attribute__ ((packed)) readdir;
388 struct { 402 struct {
389 __le32 mode; 403 __le32 mode;
diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
index 84884d8d4710..0594d3bba774 100644
--- a/include/linux/ceph/cls_lock_client.h
+++ b/include/linux/ceph/cls_lock_client.h
@@ -37,6 +37,11 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
37 struct ceph_object_locator *oloc, 37 struct ceph_object_locator *oloc,
38 char *lock_name, char *cookie, 38 char *lock_name, char *cookie,
39 struct ceph_entity_name *locker); 39 struct ceph_entity_name *locker);
40int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
41 struct ceph_object_id *oid,
42 struct ceph_object_locator *oloc,
43 char *lock_name, u8 type, char *old_cookie,
44 char *tag, char *new_cookie);
40 45
41void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers); 46void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
42 47
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 88cd5dc8e238..3229ae6c7846 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -14,6 +14,7 @@
14#include <linux/wait.h> 14#include <linux/wait.h>
15#include <linux/writeback.h> 15#include <linux/writeback.h>
16#include <linux/slab.h> 16#include <linux/slab.h>
17#include <linux/refcount.h>
17 18
18#include <linux/ceph/types.h> 19#include <linux/ceph/types.h>
19#include <linux/ceph/messenger.h> 20#include <linux/ceph/messenger.h>
@@ -161,7 +162,7 @@ struct ceph_client {
161 * dirtied. 162 * dirtied.
162 */ 163 */
163struct ceph_snap_context { 164struct ceph_snap_context {
164 atomic_t nref; 165 refcount_t nref;
165 u64 seq; 166 u64 seq;
166 u32 num_snaps; 167 u32 num_snaps;
167 u64 snaps[]; 168 u64 snaps[];
@@ -262,10 +263,7 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client);
262extern void ceph_destroy_options(struct ceph_options *opt); 263extern void ceph_destroy_options(struct ceph_options *opt);
263extern int ceph_compare_options(struct ceph_options *new_opt, 264extern int ceph_compare_options(struct ceph_options *new_opt,
264 struct ceph_client *client); 265 struct ceph_client *client);
265extern struct ceph_client *ceph_create_client(struct ceph_options *opt, 266struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private);
266 void *private,
267 u64 supported_features,
268 u64 required_features);
269struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client); 267struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
270u64 ceph_client_gid(struct ceph_client *client); 268u64 ceph_client_gid(struct ceph_client *client);
271extern void ceph_destroy_client(struct ceph_client *client); 269extern void ceph_destroy_client(struct ceph_client *client);
diff --git a/include/linux/ceph/mdsmap.h b/include/linux/ceph/mdsmap.h
index 8ed5dc505fbb..d5f783f3226a 100644
--- a/include/linux/ceph/mdsmap.h
+++ b/include/linux/ceph/mdsmap.h
@@ -25,6 +25,7 @@ struct ceph_mdsmap {
25 u32 m_session_autoclose; /* seconds */ 25 u32 m_session_autoclose; /* seconds */
26 u64 m_max_file_size; 26 u64 m_max_file_size;
27 u32 m_max_mds; /* size of m_addr, m_state arrays */ 27 u32 m_max_mds; /* size of m_addr, m_state arrays */
28 int m_num_mds;
28 struct ceph_mds_info *m_info; 29 struct ceph_mds_info *m_info;
29 30
30 /* which object pools file data can be stored in */ 31 /* which object pools file data can be stored in */
@@ -40,7 +41,7 @@ struct ceph_mdsmap {
40static inline struct ceph_entity_addr * 41static inline struct ceph_entity_addr *
41ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w) 42ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
42{ 43{
43 if (w >= m->m_max_mds) 44 if (w >= m->m_num_mds)
44 return NULL; 45 return NULL;
45 return &m->m_info[w].addr; 46 return &m->m_info[w].addr;
46} 47}
@@ -48,14 +49,14 @@ ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
48static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w) 49static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
49{ 50{
50 BUG_ON(w < 0); 51 BUG_ON(w < 0);
51 if (w >= m->m_max_mds) 52 if (w >= m->m_num_mds)
52 return CEPH_MDS_STATE_DNE; 53 return CEPH_MDS_STATE_DNE;
53 return m->m_info[w].state; 54 return m->m_info[w].state;
54} 55}
55 56
56static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w) 57static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
57{ 58{
58 if (w >= 0 && w < m->m_max_mds) 59 if (w >= 0 && w < m->m_num_mds)
59 return m->m_info[w].laggy; 60 return m->m_info[w].laggy;
60 return false; 61 return false;
61} 62}
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index c125b5d9e13c..85650b415e73 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -5,6 +5,7 @@
5#include <linux/kref.h> 5#include <linux/kref.h>
6#include <linux/mempool.h> 6#include <linux/mempool.h>
7#include <linux/rbtree.h> 7#include <linux/rbtree.h>
8#include <linux/refcount.h>
8 9
9#include <linux/ceph/types.h> 10#include <linux/ceph/types.h>
10#include <linux/ceph/osdmap.h> 11#include <linux/ceph/osdmap.h>
@@ -27,7 +28,7 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
27 28
28/* a given osd we're communicating with */ 29/* a given osd we're communicating with */
29struct ceph_osd { 30struct ceph_osd {
30 atomic_t o_ref; 31 refcount_t o_ref;
31 struct ceph_osd_client *o_osdc; 32 struct ceph_osd_client *o_osdc;
32 int o_osd; 33 int o_osd;
33 int o_incarnation; 34 int o_incarnation;
@@ -186,12 +187,12 @@ struct ceph_osd_request {
186 struct timespec r_mtime; /* ditto */ 187 struct timespec r_mtime; /* ditto */
187 u64 r_data_offset; /* ditto */ 188 u64 r_data_offset; /* ditto */
188 bool r_linger; /* don't resend on failure */ 189 bool r_linger; /* don't resend on failure */
190 bool r_abort_on_full; /* return ENOSPC when full */
189 191
190 /* internal */ 192 /* internal */
191 unsigned long r_stamp; /* jiffies, send or check time */ 193 unsigned long r_stamp; /* jiffies, send or check time */
192 unsigned long r_start_stamp; /* jiffies */ 194 unsigned long r_start_stamp; /* jiffies */
193 int r_attempts; 195 int r_attempts;
194 struct ceph_eversion r_replay_version; /* aka reassert_version */
195 u32 r_last_force_resend; 196 u32 r_last_force_resend;
196 u32 r_map_dne_bound; 197 u32 r_map_dne_bound;
197 198
@@ -266,6 +267,7 @@ struct ceph_osd_client {
266 struct rb_root osds; /* osds */ 267 struct rb_root osds; /* osds */
267 struct list_head osd_lru; /* idle osds */ 268 struct list_head osd_lru; /* idle osds */
268 spinlock_t osd_lru_lock; 269 spinlock_t osd_lru_lock;
270 u32 epoch_barrier;
269 struct ceph_osd homeless_osd; 271 struct ceph_osd homeless_osd;
270 atomic64_t last_tid; /* tid of last request */ 272 atomic64_t last_tid; /* tid of last request */
271 u64 last_linger_id; 273 u64 last_linger_id;
@@ -304,6 +306,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
304 struct ceph_msg *msg); 306 struct ceph_msg *msg);
305extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, 307extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
306 struct ceph_msg *msg); 308 struct ceph_msg *msg);
309void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb);
307 310
308extern void osd_req_op_init(struct ceph_osd_request *osd_req, 311extern void osd_req_op_init(struct ceph_osd_request *osd_req,
309 unsigned int which, u16 opcode, u32 flags); 312 unsigned int which, u16 opcode, u32 flags);
diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h
index 13d71fe18b0c..75a7db21457d 100644
--- a/include/linux/ceph/pagelist.h
+++ b/include/linux/ceph/pagelist.h
@@ -2,7 +2,7 @@
2#define __FS_CEPH_PAGELIST_H 2#define __FS_CEPH_PAGELIST_H
3 3
4#include <asm/byteorder.h> 4#include <asm/byteorder.h>
5#include <linux/atomic.h> 5#include <linux/refcount.h>
6#include <linux/list.h> 6#include <linux/list.h>
7#include <linux/types.h> 7#include <linux/types.h>
8 8
@@ -13,7 +13,7 @@ struct ceph_pagelist {
13 size_t room; 13 size_t room;
14 struct list_head free_list; 14 struct list_head free_list;
15 size_t num_pages_free; 15 size_t num_pages_free;
16 atomic_t refcnt; 16 refcount_t refcnt;
17}; 17};
18 18
19struct ceph_pagelist_cursor { 19struct ceph_pagelist_cursor {
@@ -30,7 +30,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
30 pl->room = 0; 30 pl->room = 0;
31 INIT_LIST_HEAD(&pl->free_list); 31 INIT_LIST_HEAD(&pl->free_list);
32 pl->num_pages_free = 0; 32 pl->num_pages_free = 0;
33 atomic_set(&pl->refcnt, 1); 33 refcount_set(&pl->refcnt, 1);
34} 34}
35 35
36extern void ceph_pagelist_release(struct ceph_pagelist *pl); 36extern void ceph_pagelist_release(struct ceph_pagelist *pl);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 4eb773ccce11..4fd02831beed 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -45,6 +45,17 @@ bool libceph_compatible(void *data)
45} 45}
46EXPORT_SYMBOL(libceph_compatible); 46EXPORT_SYMBOL(libceph_compatible);
47 47
48static int param_get_supported_features(char *buffer,
49 const struct kernel_param *kp)
50{
51 return sprintf(buffer, "0x%llx", CEPH_FEATURES_SUPPORTED_DEFAULT);
52}
53static const struct kernel_param_ops param_ops_supported_features = {
54 .get = param_get_supported_features,
55};
56module_param_cb(supported_features, &param_ops_supported_features, NULL,
57 S_IRUGO);
58
48/* 59/*
49 * find filename portion of a path (/foo/bar/baz -> baz) 60 * find filename portion of a path (/foo/bar/baz -> baz)
50 */ 61 */
@@ -596,9 +607,7 @@ EXPORT_SYMBOL(ceph_client_gid);
596/* 607/*
597 * create a fresh client instance 608 * create a fresh client instance
598 */ 609 */
599struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, 610struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
600 u64 supported_features,
601 u64 required_features)
602{ 611{
603 struct ceph_client *client; 612 struct ceph_client *client;
604 struct ceph_entity_addr *myaddr = NULL; 613 struct ceph_entity_addr *myaddr = NULL;
@@ -615,14 +624,12 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
615 init_waitqueue_head(&client->auth_wq); 624 init_waitqueue_head(&client->auth_wq);
616 client->auth_err = 0; 625 client->auth_err = 0;
617 626
618 if (!ceph_test_opt(client, NOMSGAUTH))
619 required_features |= CEPH_FEATURE_MSG_AUTH;
620
621 client->extra_mon_dispatch = NULL; 627 client->extra_mon_dispatch = NULL;
622 client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT | 628 client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
623 supported_features; 629 client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT;
624 client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT | 630
625 required_features; 631 if (!ceph_test_opt(client, NOMSGAUTH))
632 client->required_features |= CEPH_FEATURE_MSG_AUTH;
626 633
627 /* msgr */ 634 /* msgr */
628 if (ceph_test_opt(client, MYIP)) 635 if (ceph_test_opt(client, MYIP))
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index b9233b990399..08ada893f01e 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -179,6 +179,57 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
179} 179}
180EXPORT_SYMBOL(ceph_cls_break_lock); 180EXPORT_SYMBOL(ceph_cls_break_lock);
181 181
182int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
183 struct ceph_object_id *oid,
184 struct ceph_object_locator *oloc,
185 char *lock_name, u8 type, char *old_cookie,
186 char *tag, char *new_cookie)
187{
188 int cookie_op_buf_size;
189 int name_len = strlen(lock_name);
190 int old_cookie_len = strlen(old_cookie);
191 int tag_len = strlen(tag);
192 int new_cookie_len = strlen(new_cookie);
193 void *p, *end;
194 struct page *cookie_op_page;
195 int ret;
196
197 cookie_op_buf_size = name_len + sizeof(__le32) +
198 old_cookie_len + sizeof(__le32) +
199 tag_len + sizeof(__le32) +
200 new_cookie_len + sizeof(__le32) +
201 sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
202 if (cookie_op_buf_size > PAGE_SIZE)
203 return -E2BIG;
204
205 cookie_op_page = alloc_page(GFP_NOIO);
206 if (!cookie_op_page)
207 return -ENOMEM;
208
209 p = page_address(cookie_op_page);
210 end = p + cookie_op_buf_size;
211
212 /* encode cls_lock_set_cookie_op struct */
213 ceph_start_encoding(&p, 1, 1,
214 cookie_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
215 ceph_encode_string(&p, end, lock_name, name_len);
216 ceph_encode_8(&p, type);
217 ceph_encode_string(&p, end, old_cookie, old_cookie_len);
218 ceph_encode_string(&p, end, tag, tag_len);
219 ceph_encode_string(&p, end, new_cookie, new_cookie_len);
220
221 dout("%s lock_name %s type %d old_cookie %s tag %s new_cookie %s\n",
222 __func__, lock_name, type, old_cookie, tag, new_cookie);
223 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
224 CEPH_OSD_FLAG_WRITE, cookie_op_page,
225 cookie_op_buf_size, NULL, NULL);
226
227 dout("%s: status %d\n", __func__, ret);
228 __free_page(cookie_op_page);
229 return ret;
230}
231EXPORT_SYMBOL(ceph_cls_set_cookie);
232
182void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers) 233void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
183{ 234{
184 int i; 235 int i;
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index c62b2b029a6e..71ba13927b3d 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -62,7 +62,8 @@ static int osdmap_show(struct seq_file *s, void *p)
62 return 0; 62 return 0;
63 63
64 down_read(&osdc->lock); 64 down_read(&osdc->lock);
65 seq_printf(s, "epoch %d flags 0x%x\n", map->epoch, map->flags); 65 seq_printf(s, "epoch %u barrier %u flags 0x%x\n", map->epoch,
66 osdc->epoch_barrier, map->flags);
66 67
67 for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) { 68 for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
68 struct ceph_pg_pool_info *pi = 69 struct ceph_pg_pool_info *pi =
@@ -177,9 +178,7 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
177 seq_printf(s, "%llu\t", req->r_tid); 178 seq_printf(s, "%llu\t", req->r_tid);
178 dump_target(s, &req->r_t); 179 dump_target(s, &req->r_t);
179 180
180 seq_printf(s, "\t%d\t%u'%llu", req->r_attempts, 181 seq_printf(s, "\t%d", req->r_attempts);
181 le32_to_cpu(req->r_replay_version.epoch),
182 le64_to_cpu(req->r_replay_version.version));
183 182
184 for (i = 0; i < req->r_num_ops; i++) { 183 for (i = 0; i < req->r_num_ops; i++) {
185 struct ceph_osd_req_op *op = &req->r_ops[i]; 184 struct ceph_osd_req_op *op = &req->r_ops[i];
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 242d7c0d92f8..924f07c36ddb 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -961,6 +961,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
961 truncate_size, truncate_seq); 961 truncate_size, truncate_seq);
962 } 962 }
963 963
964 req->r_abort_on_full = true;
964 req->r_flags = flags; 965 req->r_flags = flags;
965 req->r_base_oloc.pool = layout->pool_id; 966 req->r_base_oloc.pool = layout->pool_id;
966 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 967 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@ -1005,7 +1006,7 @@ static bool osd_registered(struct ceph_osd *osd)
1005 */ 1006 */
1006static void osd_init(struct ceph_osd *osd) 1007static void osd_init(struct ceph_osd *osd)
1007{ 1008{
1008 atomic_set(&osd->o_ref, 1); 1009 refcount_set(&osd->o_ref, 1);
1009 RB_CLEAR_NODE(&osd->o_node); 1010 RB_CLEAR_NODE(&osd->o_node);
1010 osd->o_requests = RB_ROOT; 1011 osd->o_requests = RB_ROOT;
1011 osd->o_linger_requests = RB_ROOT; 1012 osd->o_linger_requests = RB_ROOT;
@@ -1050,9 +1051,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1050 1051
1051static struct ceph_osd *get_osd(struct ceph_osd *osd) 1052static struct ceph_osd *get_osd(struct ceph_osd *osd)
1052{ 1053{
1053 if (atomic_inc_not_zero(&osd->o_ref)) { 1054 if (refcount_inc_not_zero(&osd->o_ref)) {
1054 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 1055 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1055 atomic_read(&osd->o_ref)); 1056 refcount_read(&osd->o_ref));
1056 return osd; 1057 return osd;
1057 } else { 1058 } else {
1058 dout("get_osd %p FAIL\n", osd); 1059 dout("get_osd %p FAIL\n", osd);
@@ -1062,9 +1063,9 @@ static struct ceph_osd *get_osd(struct ceph_osd *osd)
1062 1063
1063static void put_osd(struct ceph_osd *osd) 1064static void put_osd(struct ceph_osd *osd)
1064{ 1065{
1065 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1066 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1066 atomic_read(&osd->o_ref) - 1); 1067 refcount_read(&osd->o_ref) - 1);
1067 if (atomic_dec_and_test(&osd->o_ref)) { 1068 if (refcount_dec_and_test(&osd->o_ref)) {
1068 osd_cleanup(osd); 1069 osd_cleanup(osd);
1069 kfree(osd); 1070 kfree(osd);
1070 } 1071 }
@@ -1297,8 +1298,9 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
1297 __pool_full(pi); 1298 __pool_full(pi);
1298 1299
1299 WARN_ON(pi->id != t->base_oloc.pool); 1300 WARN_ON(pi->id != t->base_oloc.pool);
1300 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || 1301 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1301 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr); 1302 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1303 (osdc->osdmap->epoch < osdc->epoch_barrier);
1302} 1304}
1303 1305
1304enum calc_target_result { 1306enum calc_target_result {
@@ -1503,9 +1505,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1503 ceph_encode_32(&p, req->r_flags); 1505 ceph_encode_32(&p, req->r_flags);
1504 ceph_encode_timespec(p, &req->r_mtime); 1506 ceph_encode_timespec(p, &req->r_mtime);
1505 p += sizeof(struct ceph_timespec); 1507 p += sizeof(struct ceph_timespec);
1506 /* aka reassert_version */ 1508
1507 memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version)); 1509 /* reassert_version */
1508 p += sizeof(req->r_replay_version); 1510 memset(p, 0, sizeof(struct ceph_eversion));
1511 p += sizeof(struct ceph_eversion);
1509 1512
1510 /* oloc */ 1513 /* oloc */
1511 ceph_start_encoding(&p, 5, 4, 1514 ceph_start_encoding(&p, 5, 4,
@@ -1626,6 +1629,7 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
1626 ceph_monc_renew_subs(&osdc->client->monc); 1629 ceph_monc_renew_subs(&osdc->client->monc);
1627} 1630}
1628 1631
1632static void complete_request(struct ceph_osd_request *req, int err);
1629static void send_map_check(struct ceph_osd_request *req); 1633static void send_map_check(struct ceph_osd_request *req);
1630 1634
1631static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 1635static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
@@ -1635,6 +1639,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
1635 enum calc_target_result ct_res; 1639 enum calc_target_result ct_res;
1636 bool need_send = false; 1640 bool need_send = false;
1637 bool promoted = false; 1641 bool promoted = false;
1642 bool need_abort = false;
1638 1643
1639 WARN_ON(req->r_tid); 1644 WARN_ON(req->r_tid);
1640 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 1645 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -1650,8 +1655,13 @@ again:
1650 goto promote; 1655 goto promote;
1651 } 1656 }
1652 1657
1653 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1658 if (osdc->osdmap->epoch < osdc->epoch_barrier) {
1654 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 1659 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
1660 osdc->epoch_barrier);
1661 req->r_t.paused = true;
1662 maybe_request_map(osdc);
1663 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1664 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
1655 dout("req %p pausewr\n", req); 1665 dout("req %p pausewr\n", req);
1656 req->r_t.paused = true; 1666 req->r_t.paused = true;
1657 maybe_request_map(osdc); 1667 maybe_request_map(osdc);
@@ -1669,6 +1679,8 @@ again:
1669 pr_warn_ratelimited("FULL or reached pool quota\n"); 1679 pr_warn_ratelimited("FULL or reached pool quota\n");
1670 req->r_t.paused = true; 1680 req->r_t.paused = true;
1671 maybe_request_map(osdc); 1681 maybe_request_map(osdc);
1682 if (req->r_abort_on_full)
1683 need_abort = true;
1672 } else if (!osd_homeless(osd)) { 1684 } else if (!osd_homeless(osd)) {
1673 need_send = true; 1685 need_send = true;
1674 } else { 1686 } else {
@@ -1685,6 +1697,8 @@ again:
1685 link_request(osd, req); 1697 link_request(osd, req);
1686 if (need_send) 1698 if (need_send)
1687 send_request(req); 1699 send_request(req);
1700 else if (need_abort)
1701 complete_request(req, -ENOSPC);
1688 mutex_unlock(&osd->lock); 1702 mutex_unlock(&osd->lock);
1689 1703
1690 if (ct_res == CALC_TARGET_POOL_DNE) 1704 if (ct_res == CALC_TARGET_POOL_DNE)
@@ -1799,6 +1813,97 @@ static void abort_request(struct ceph_osd_request *req, int err)
1799 complete_request(req, err); 1813 complete_request(req, err);
1800} 1814}
1801 1815
1816static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
1817{
1818 if (likely(eb > osdc->epoch_barrier)) {
1819 dout("updating epoch_barrier from %u to %u\n",
1820 osdc->epoch_barrier, eb);
1821 osdc->epoch_barrier = eb;
1822 /* Request map if we're not to the barrier yet */
1823 if (eb > osdc->osdmap->epoch)
1824 maybe_request_map(osdc);
1825 }
1826}
1827
1828void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
1829{
1830 down_read(&osdc->lock);
1831 if (unlikely(eb > osdc->epoch_barrier)) {
1832 up_read(&osdc->lock);
1833 down_write(&osdc->lock);
1834 update_epoch_barrier(osdc, eb);
1835 up_write(&osdc->lock);
1836 } else {
1837 up_read(&osdc->lock);
1838 }
1839}
1840EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
1841
1842/*
1843 * Drop all pending requests that are stalled waiting on a full condition to
1844 * clear, and complete them with ENOSPC as the return code. Set the
1845 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
1846 * cancelled.
1847 */
1848static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
1849{
1850 struct rb_node *n;
1851 bool victims = false;
1852
1853 dout("enter abort_on_full\n");
1854
1855 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
1856 goto out;
1857
1858 /* Scan list and see if there is anything to abort */
1859 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1860 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1861 struct rb_node *m;
1862
1863 m = rb_first(&osd->o_requests);
1864 while (m) {
1865 struct ceph_osd_request *req = rb_entry(m,
1866 struct ceph_osd_request, r_node);
1867 m = rb_next(m);
1868
1869 if (req->r_abort_on_full) {
1870 victims = true;
1871 break;
1872 }
1873 }
1874 if (victims)
1875 break;
1876 }
1877
1878 if (!victims)
1879 goto out;
1880
1881 /*
1882 * Update the barrier to current epoch if it's behind that point,
1883 * since we know we have some calls to be aborted in the tree.
1884 */
1885 update_epoch_barrier(osdc, osdc->osdmap->epoch);
1886
1887 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1888 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1889 struct rb_node *m;
1890
1891 m = rb_first(&osd->o_requests);
1892 while (m) {
1893 struct ceph_osd_request *req = rb_entry(m,
1894 struct ceph_osd_request, r_node);
1895 m = rb_next(m);
1896
1897 if (req->r_abort_on_full &&
1898 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1899 pool_full(osdc, req->r_t.target_oloc.pool)))
1900 abort_request(req, -ENOSPC);
1901 }
1902 }
1903out:
1904 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
1905}
1906
1802static void check_pool_dne(struct ceph_osd_request *req) 1907static void check_pool_dne(struct ceph_osd_request *req)
1803{ 1908{
1804 struct ceph_osd_client *osdc = req->r_osdc; 1909 struct ceph_osd_client *osdc = req->r_osdc;
@@ -3252,11 +3357,13 @@ done:
3252 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3357 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3253 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3358 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3254 have_pool_full(osdc); 3359 have_pool_full(osdc);
3255 if (was_pauserd || was_pausewr || pauserd || pausewr) 3360 if (was_pauserd || was_pausewr || pauserd || pausewr ||
3361 osdc->osdmap->epoch < osdc->epoch_barrier)
3256 maybe_request_map(osdc); 3362 maybe_request_map(osdc);
3257 3363
3258 kick_requests(osdc, &need_resend, &need_resend_linger); 3364 kick_requests(osdc, &need_resend, &need_resend_linger);
3259 3365
3366 ceph_osdc_abort_on_full(osdc);
3260 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 3367 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
3261 osdc->osdmap->epoch); 3368 osdc->osdmap->epoch);
3262 up_write(&osdc->lock); 3369 up_write(&osdc->lock);
@@ -4126,7 +4233,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
4126 close_osd(osd); 4233 close_osd(osd);
4127 } 4234 }
4128 up_write(&osdc->lock); 4235 up_write(&osdc->lock);
4129 WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1); 4236 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
4130 osd_cleanup(&osdc->homeless_osd); 4237 osd_cleanup(&osdc->homeless_osd);
4131 4238
4132 WARN_ON(!list_empty(&osdc->osd_lru)); 4239 WARN_ON(!list_empty(&osdc->osd_lru));
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
index 6864007e64fc..ce09f73be759 100644
--- a/net/ceph/pagelist.c
+++ b/net/ceph/pagelist.c
@@ -16,7 +16,7 @@ static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
16 16
17void ceph_pagelist_release(struct ceph_pagelist *pl) 17void ceph_pagelist_release(struct ceph_pagelist *pl)
18{ 18{
19 if (!atomic_dec_and_test(&pl->refcnt)) 19 if (!refcount_dec_and_test(&pl->refcnt))
20 return; 20 return;
21 ceph_pagelist_unmap_tail(pl); 21 ceph_pagelist_unmap_tail(pl);
22 while (!list_empty(&pl->head)) { 22 while (!list_empty(&pl->head)) {
diff --git a/net/ceph/snapshot.c b/net/ceph/snapshot.c
index 705414e78ae0..e14a5d038656 100644
--- a/net/ceph/snapshot.c
+++ b/net/ceph/snapshot.c
@@ -49,7 +49,7 @@ struct ceph_snap_context *ceph_create_snap_context(u32 snap_count,
49 if (!snapc) 49 if (!snapc)
50 return NULL; 50 return NULL;
51 51
52 atomic_set(&snapc->nref, 1); 52 refcount_set(&snapc->nref, 1);
53 snapc->num_snaps = snap_count; 53 snapc->num_snaps = snap_count;
54 54
55 return snapc; 55 return snapc;
@@ -59,7 +59,7 @@ EXPORT_SYMBOL(ceph_create_snap_context);
59struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc) 59struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
60{ 60{
61 if (sc) 61 if (sc)
62 atomic_inc(&sc->nref); 62 refcount_inc(&sc->nref);
63 return sc; 63 return sc;
64} 64}
65EXPORT_SYMBOL(ceph_get_snap_context); 65EXPORT_SYMBOL(ceph_get_snap_context);
@@ -68,7 +68,7 @@ void ceph_put_snap_context(struct ceph_snap_context *sc)
68{ 68{
69 if (!sc) 69 if (!sc)
70 return; 70 return;
71 if (atomic_dec_and_test(&sc->nref)) { 71 if (refcount_dec_and_test(&sc->nref)) {
72 /*printk(" deleting snap_context %p\n", sc);*/ 72 /*printk(" deleting snap_context %p\n", sc);*/
73 kfree(sc); 73 kfree(sc);
74 } 74 }