aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-08-13 19:43:29 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2014-08-13 19:43:29 -0400
commit8d2d441ac4af223eae466c3c31ff737cc31a1411 (patch)
treed14b0f72e80f94c1575c281bd21d43a86de0a92d
parent89838b80bbbf9774cf010905851db7913c9331f0 (diff)
parent5f740d7e1531099b888410e6bab13f68da9b1a4d (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "There is a lot of refactoring and hardening of the libceph and rbd code here from Ilya that fix various smaller bugs, and a few more important fixes with clone overlap. The main fix is a critical change to the request_fn handling to not sleep that was exposed by the recent mutex changes (which will also go to the 3.16 stable series). Yan Zheng has several fixes in here for CephFS fixing ACL handling, time stamps, and request resends when the MDS restarts. Finally, there are a few cleanups from Himangi Saraogi based on Coccinelle" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (39 commits) libceph: set last_piece in ceph_msg_data_pages_cursor_init() correctly rbd: remove extra newlines from rbd_warn() messages rbd: allocate img_request with GFP_NOIO instead GFP_ATOMIC rbd: rework rbd_request_fn() ceph: fix kick_requests() ceph: fix append mode write ceph: fix sizeof(struct tYpO *) typo ceph: remove redundant memset(0) rbd: take snap_id into account when reading in parent info rbd: do not read in parent info before snap context rbd: update mapping size only on refresh rbd: harden rbd_dev_refresh() and callers a bit rbd: split rbd_dev_spec_update() into two functions rbd: remove unnecessary asserts in rbd_dev_image_probe() rbd: introduce rbd_dev_header_info() rbd: show the entire chain of parent images ceph: replace comma with a semicolon rbd: use rbd_segment_name_free() instead of kfree() ceph: check zero length in ceph_sync_read() ceph: reset r_resend_mds after receiving -ESTALE ...
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd4
-rw-r--r--drivers/block/rbd.c689
-rw-r--r--fs/ceph/acl.c14
-rw-r--r--fs/ceph/caps.c2
-rw-r--r--fs/ceph/file.c24
-rw-r--r--fs/ceph/mds_client.c16
-rw-r--r--fs/ceph/xattr.c4
-rw-r--r--include/linux/ceph/messenger.h14
-rw-r--r--include/linux/ceph/osd_client.h18
-rw-r--r--net/ceph/messenger.c47
-rw-r--r--net/ceph/osd_client.c129
11 files changed, 538 insertions, 423 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 501adc2a9ec7..2ddd680929d8 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -94,5 +94,5 @@ current_snap
94 94
95parent 95parent
96 96
97 Information identifying the pool, image, and snapshot id for 97 Information identifying the chain of parent images in a layered rbd
98 the parent image in a layered rbd image (format 2 only). 98 image. Entries are separated by empty lines.
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b2c98c1bc037..623c84145b79 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@
42#include <linux/blkdev.h> 42#include <linux/blkdev.h>
43#include <linux/slab.h> 43#include <linux/slab.h>
44#include <linux/idr.h> 44#include <linux/idr.h>
45#include <linux/workqueue.h>
45 46
46#include "rbd_types.h" 47#include "rbd_types.h"
47 48
@@ -332,7 +333,10 @@ struct rbd_device {
332 333
333 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 334 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334 335
336 struct list_head rq_queue; /* incoming rq queue */
335 spinlock_t lock; /* queue, flags, open_count */ 337 spinlock_t lock; /* queue, flags, open_count */
338 struct workqueue_struct *rq_wq;
339 struct work_struct rq_work;
336 340
337 struct rbd_image_header header; 341 struct rbd_image_header header;
338 unsigned long flags; /* possibly lock protected */ 342 unsigned long flags; /* possibly lock protected */
@@ -514,7 +518,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
514 518
515static int rbd_dev_refresh(struct rbd_device *rbd_dev); 519static int rbd_dev_refresh(struct rbd_device *rbd_dev);
516static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); 520static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
517static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev); 521static int rbd_dev_header_info(struct rbd_device *rbd_dev);
522static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
518static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, 523static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
519 u64 snap_id); 524 u64 snap_id);
520static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 525static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -971,12 +976,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
971 header->snap_names = snap_names; 976 header->snap_names = snap_names;
972 header->snap_sizes = snap_sizes; 977 header->snap_sizes = snap_sizes;
973 978
974 /* Make sure mapping size is consistent with header info */
975
976 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
977 if (rbd_dev->mapping.size != header->image_size)
978 rbd_dev->mapping.size = header->image_size;
979
980 return 0; 979 return 0;
981out_2big: 980out_2big:
982 ret = -EIO; 981 ret = -EIO;
@@ -1139,6 +1138,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1139 rbd_dev->mapping.features = 0; 1138 rbd_dev->mapping.features = 0;
1140} 1139}
1141 1140
1141static void rbd_segment_name_free(const char *name)
1142{
1143 /* The explicit cast here is needed to drop the const qualifier */
1144
1145 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1146}
1147
1142static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 1148static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1143{ 1149{
1144 char *name; 1150 char *name;
@@ -1158,20 +1164,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1158 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { 1164 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1159 pr_err("error formatting segment name for #%llu (%d)\n", 1165 pr_err("error formatting segment name for #%llu (%d)\n",
1160 segment, ret); 1166 segment, ret);
1161 kfree(name); 1167 rbd_segment_name_free(name);
1162 name = NULL; 1168 name = NULL;
1163 } 1169 }
1164 1170
1165 return name; 1171 return name;
1166} 1172}
1167 1173
1168static void rbd_segment_name_free(const char *name)
1169{
1170 /* The explicit cast here is needed to drop the const qualifier */
1171
1172 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1173}
1174
1175static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 1174static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1176{ 1175{
1177 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 1176 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -1371,7 +1370,7 @@ static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1371 struct rbd_device *rbd_dev; 1370 struct rbd_device *rbd_dev;
1372 1371
1373 rbd_dev = obj_request->img_request->rbd_dev; 1372 rbd_dev = obj_request->img_request->rbd_dev;
1374 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", 1373 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1375 obj_request); 1374 obj_request);
1376 } 1375 }
1377} 1376}
@@ -1389,7 +1388,7 @@ static void obj_request_done_set(struct rbd_obj_request *obj_request)
1389 1388
1390 if (obj_request_img_data_test(obj_request)) 1389 if (obj_request_img_data_test(obj_request))
1391 rbd_dev = obj_request->img_request->rbd_dev; 1390 rbd_dev = obj_request->img_request->rbd_dev;
1392 rbd_warn(rbd_dev, "obj_request %p already marked done\n", 1391 rbd_warn(rbd_dev, "obj_request %p already marked done",
1393 obj_request); 1392 obj_request);
1394 } 1393 }
1395} 1394}
@@ -1527,11 +1526,37 @@ static bool obj_request_type_valid(enum obj_request_type type)
1527static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1526static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1528 struct rbd_obj_request *obj_request) 1527 struct rbd_obj_request *obj_request)
1529{ 1528{
1530 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); 1529 dout("%s %p\n", __func__, obj_request);
1531
1532 return ceph_osdc_start_request(osdc, obj_request->osd_req, false); 1530 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1533} 1531}
1534 1532
1533static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1534{
1535 dout("%s %p\n", __func__, obj_request);
1536 ceph_osdc_cancel_request(obj_request->osd_req);
1537}
1538
1539/*
1540 * Wait for an object request to complete. If interrupted, cancel the
1541 * underlying osd request.
1542 */
1543static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1544{
1545 int ret;
1546
1547 dout("%s %p\n", __func__, obj_request);
1548
1549 ret = wait_for_completion_interruptible(&obj_request->completion);
1550 if (ret < 0) {
1551 dout("%s %p interrupted\n", __func__, obj_request);
1552 rbd_obj_request_end(obj_request);
1553 return ret;
1554 }
1555
1556 dout("%s %p done\n", __func__, obj_request);
1557 return 0;
1558}
1559
1535static void rbd_img_request_complete(struct rbd_img_request *img_request) 1560static void rbd_img_request_complete(struct rbd_img_request *img_request)
1536{ 1561{
1537 1562
@@ -1558,15 +1583,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
1558 rbd_img_request_put(img_request); 1583 rbd_img_request_put(img_request);
1559} 1584}
1560 1585
1561/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1562
1563static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1564{
1565 dout("%s: obj %p\n", __func__, obj_request);
1566
1567 return wait_for_completion_interruptible(&obj_request->completion);
1568}
1569
1570/* 1586/*
1571 * The default/initial value for all image request flags is 0. Each 1587 * The default/initial value for all image request flags is 0. Each
1572 * is conditionally set to 1 at image request initialization time 1588 * is conditionally set to 1 at image request initialization time
@@ -1763,7 +1779,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1763 rbd_osd_trivial_callback(obj_request); 1779 rbd_osd_trivial_callback(obj_request);
1764 break; 1780 break;
1765 default: 1781 default:
1766 rbd_warn(NULL, "%s: unsupported op %hu\n", 1782 rbd_warn(NULL, "%s: unsupported op %hu",
1767 obj_request->object_name, (unsigned short) opcode); 1783 obj_request->object_name, (unsigned short) opcode);
1768 break; 1784 break;
1769 } 1785 }
@@ -1998,7 +2014,7 @@ static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1998 if (!counter) 2014 if (!counter)
1999 rbd_dev_unparent(rbd_dev); 2015 rbd_dev_unparent(rbd_dev);
2000 else 2016 else
2001 rbd_warn(rbd_dev, "parent reference underflow\n"); 2017 rbd_warn(rbd_dev, "parent reference underflow");
2002} 2018}
2003 2019
2004/* 2020/*
@@ -2028,7 +2044,7 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2028 /* Image was flattened, but parent is not yet torn down */ 2044 /* Image was flattened, but parent is not yet torn down */
2029 2045
2030 if (counter < 0) 2046 if (counter < 0)
2031 rbd_warn(rbd_dev, "parent reference overflow\n"); 2047 rbd_warn(rbd_dev, "parent reference overflow");
2032 2048
2033 return false; 2049 return false;
2034} 2050}
@@ -2045,7 +2061,7 @@ static struct rbd_img_request *rbd_img_request_create(
2045{ 2061{
2046 struct rbd_img_request *img_request; 2062 struct rbd_img_request *img_request;
2047 2063
2048 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC); 2064 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2049 if (!img_request) 2065 if (!img_request)
2050 return NULL; 2066 return NULL;
2051 2067
@@ -2161,11 +2177,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2161 if (result) { 2177 if (result) {
2162 struct rbd_device *rbd_dev = img_request->rbd_dev; 2178 struct rbd_device *rbd_dev = img_request->rbd_dev;
2163 2179
2164 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", 2180 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2165 img_request_write_test(img_request) ? "write" : "read", 2181 img_request_write_test(img_request) ? "write" : "read",
2166 obj_request->length, obj_request->img_offset, 2182 obj_request->length, obj_request->img_offset,
2167 obj_request->offset); 2183 obj_request->offset);
2168 rbd_warn(rbd_dev, " result %d xferred %x\n", 2184 rbd_warn(rbd_dev, " result %d xferred %x",
2169 result, xferred); 2185 result, xferred);
2170 if (!img_request->result) 2186 if (!img_request->result)
2171 img_request->result = result; 2187 img_request->result = result;
@@ -2946,154 +2962,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2946 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 2962 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2947 rbd_dev->header_name, (unsigned long long)notify_id, 2963 rbd_dev->header_name, (unsigned long long)notify_id,
2948 (unsigned int)opcode); 2964 (unsigned int)opcode);
2965
2966 /*
2967 * Until adequate refresh error handling is in place, there is
2968 * not much we can do here, except warn.
2969 *
2970 * See http://tracker.ceph.com/issues/5040
2971 */
2949 ret = rbd_dev_refresh(rbd_dev); 2972 ret = rbd_dev_refresh(rbd_dev);
2950 if (ret) 2973 if (ret)
2951 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret); 2974 rbd_warn(rbd_dev, "refresh failed: %d", ret);
2952 2975
2953 rbd_obj_notify_ack_sync(rbd_dev, notify_id); 2976 ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2977 if (ret)
2978 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
2954} 2979}
2955 2980
2956/* 2981/*
2957 * Initiate a watch request, synchronously. 2982 * Send a (un)watch request and wait for the ack. Return a request
2983 * with a ref held on success or error.
2958 */ 2984 */
2959static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) 2985static struct rbd_obj_request *rbd_obj_watch_request_helper(
2986 struct rbd_device *rbd_dev,
2987 bool watch)
2960{ 2988{
2961 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2989 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2962 struct rbd_obj_request *obj_request; 2990 struct rbd_obj_request *obj_request;
2963 int ret; 2991 int ret;
2964 2992
2965 rbd_assert(!rbd_dev->watch_event);
2966 rbd_assert(!rbd_dev->watch_request);
2967
2968 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2969 &rbd_dev->watch_event);
2970 if (ret < 0)
2971 return ret;
2972
2973 rbd_assert(rbd_dev->watch_event);
2974
2975 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2993 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2976 OBJ_REQUEST_NODATA); 2994 OBJ_REQUEST_NODATA);
2977 if (!obj_request) { 2995 if (!obj_request)
2978 ret = -ENOMEM; 2996 return ERR_PTR(-ENOMEM);
2979 goto out_cancel;
2980 }
2981 2997
2982 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, 2998 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
2983 obj_request); 2999 obj_request);
2984 if (!obj_request->osd_req) { 3000 if (!obj_request->osd_req) {
2985 ret = -ENOMEM; 3001 ret = -ENOMEM;
2986 goto out_put; 3002 goto out;
2987 } 3003 }
2988 3004
2989 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2990
2991 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, 3005 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2992 rbd_dev->watch_event->cookie, 0, 1); 3006 rbd_dev->watch_event->cookie, 0, watch);
2993 rbd_osd_req_format_write(obj_request); 3007 rbd_osd_req_format_write(obj_request);
2994 3008
3009 if (watch)
3010 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3011
2995 ret = rbd_obj_request_submit(osdc, obj_request); 3012 ret = rbd_obj_request_submit(osdc, obj_request);
2996 if (ret) 3013 if (ret)
2997 goto out_linger; 3014 goto out;
2998 3015
2999 ret = rbd_obj_request_wait(obj_request); 3016 ret = rbd_obj_request_wait(obj_request);
3000 if (ret) 3017 if (ret)
3001 goto out_linger; 3018 goto out;
3002 3019
3003 ret = obj_request->result; 3020 ret = obj_request->result;
3004 if (ret) 3021 if (ret) {
3005 goto out_linger; 3022 if (watch)
3006 3023 rbd_obj_request_end(obj_request);
3007 /* 3024 goto out;
3008 * A watch request is set to linger, so the underlying osd 3025 }
3009 * request won't go away until we unregister it. We retain
3010 * a pointer to the object request during that time (in
3011 * rbd_dev->watch_request), so we'll keep a reference to
3012 * it. We'll drop that reference (below) after we've
3013 * unregistered it.
3014 */
3015 rbd_dev->watch_request = obj_request;
3016 3026
3017 return 0; 3027 return obj_request;
3018 3028
3019out_linger: 3029out:
3020 ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
3021out_put:
3022 rbd_obj_request_put(obj_request); 3030 rbd_obj_request_put(obj_request);
3023out_cancel: 3031 return ERR_PTR(ret);
3024 ceph_osdc_cancel_event(rbd_dev->watch_event);
3025 rbd_dev->watch_event = NULL;
3026
3027 return ret;
3028} 3032}
3029 3033
3030/* 3034/*
3031 * Tear down a watch request, synchronously. 3035 * Initiate a watch request, synchronously.
3032 */ 3036 */
3033static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) 3037static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3034{ 3038{
3035 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 3039 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3036 struct rbd_obj_request *obj_request; 3040 struct rbd_obj_request *obj_request;
3037 int ret; 3041 int ret;
3038 3042
3039 rbd_assert(rbd_dev->watch_event); 3043 rbd_assert(!rbd_dev->watch_event);
3040 rbd_assert(rbd_dev->watch_request); 3044 rbd_assert(!rbd_dev->watch_request);
3041 3045
3042 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 3046 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3043 OBJ_REQUEST_NODATA); 3047 &rbd_dev->watch_event);
3044 if (!obj_request) { 3048 if (ret < 0)
3045 ret = -ENOMEM; 3049 return ret;
3046 goto out_cancel;
3047 }
3048 3050
3049 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, 3051 obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3050 obj_request); 3052 if (IS_ERR(obj_request)) {
3051 if (!obj_request->osd_req) { 3053 ceph_osdc_cancel_event(rbd_dev->watch_event);
3052 ret = -ENOMEM; 3054 rbd_dev->watch_event = NULL;
3053 goto out_put; 3055 return PTR_ERR(obj_request);
3054 } 3056 }
3055 3057
3056 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, 3058 /*
3057 rbd_dev->watch_event->cookie, 0, 0); 3059 * A watch request is set to linger, so the underlying osd
3058 rbd_osd_req_format_write(obj_request); 3060 * request won't go away until we unregister it. We retain
3059 3061 * a pointer to the object request during that time (in
3060 ret = rbd_obj_request_submit(osdc, obj_request); 3062 * rbd_dev->watch_request), so we'll keep a reference to it.
3061 if (ret) 3063 * We'll drop that reference after we've unregistered it in
3062 goto out_put; 3064 * rbd_dev_header_unwatch_sync().
3065 */
3066 rbd_dev->watch_request = obj_request;
3063 3067
3064 ret = rbd_obj_request_wait(obj_request); 3068 return 0;
3065 if (ret) 3069}
3066 goto out_put;
3067 3070
3068 ret = obj_request->result; 3071/*
3069 if (ret) 3072 * Tear down a watch request, synchronously.
3070 goto out_put; 3073 */
3074static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3075{
3076 struct rbd_obj_request *obj_request;
3071 3077
3072 /* We have successfully torn down the watch request */ 3078 rbd_assert(rbd_dev->watch_event);
3079 rbd_assert(rbd_dev->watch_request);
3073 3080
3074 ceph_osdc_unregister_linger_request(osdc, 3081 rbd_obj_request_end(rbd_dev->watch_request);
3075 rbd_dev->watch_request->osd_req);
3076 rbd_obj_request_put(rbd_dev->watch_request); 3082 rbd_obj_request_put(rbd_dev->watch_request);
3077 rbd_dev->watch_request = NULL; 3083 rbd_dev->watch_request = NULL;
3078 3084
3079out_put: 3085 obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3080 rbd_obj_request_put(obj_request); 3086 if (!IS_ERR(obj_request))
3081out_cancel: 3087 rbd_obj_request_put(obj_request);
3088 else
3089 rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3090 PTR_ERR(obj_request));
3091
3082 ceph_osdc_cancel_event(rbd_dev->watch_event); 3092 ceph_osdc_cancel_event(rbd_dev->watch_event);
3083 rbd_dev->watch_event = NULL; 3093 rbd_dev->watch_event = NULL;
3084
3085 return ret;
3086}
3087
3088static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3089{
3090 int ret;
3091
3092 ret = __rbd_dev_header_unwatch_sync(rbd_dev);
3093 if (ret) {
3094 rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
3095 ret);
3096 }
3097} 3094}
3098 3095
3099/* 3096/*
@@ -3183,102 +3180,129 @@ out:
3183 return ret; 3180 return ret;
3184} 3181}
3185 3182
3186static void rbd_request_fn(struct request_queue *q) 3183static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3187 __releases(q->queue_lock) __acquires(q->queue_lock)
3188{ 3184{
3189 struct rbd_device *rbd_dev = q->queuedata; 3185 struct rbd_img_request *img_request;
3190 struct request *rq; 3186 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3187 u64 length = blk_rq_bytes(rq);
3188 bool wr = rq_data_dir(rq) == WRITE;
3191 int result; 3189 int result;
3192 3190
3193 while ((rq = blk_fetch_request(q))) { 3191 /* Ignore/skip any zero-length requests */
3194 bool write_request = rq_data_dir(rq) == WRITE;
3195 struct rbd_img_request *img_request;
3196 u64 offset;
3197 u64 length;
3198 3192
3199 /* Ignore any non-FS requests that filter through. */ 3193 if (!length) {
3194 dout("%s: zero-length request\n", __func__);
3195 result = 0;
3196 goto err_rq;
3197 }
3200 3198
3201 if (rq->cmd_type != REQ_TYPE_FS) { 3199 /* Disallow writes to a read-only device */
3202 dout("%s: non-fs request type %d\n", __func__, 3200
3203 (int) rq->cmd_type); 3201 if (wr) {
3204 __blk_end_request_all(rq, 0); 3202 if (rbd_dev->mapping.read_only) {
3205 continue; 3203 result = -EROFS;
3204 goto err_rq;
3206 } 3205 }
3206 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3207 }
3207 3208
3208 /* Ignore/skip any zero-length requests */ 3209 /*
3210 * Quit early if the mapped snapshot no longer exists. It's
3211 * still possible the snapshot will have disappeared by the
3212 * time our request arrives at the osd, but there's no sense in
3213 * sending it if we already know.
3214 */
3215 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3216 dout("request for non-existent snapshot");
3217 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3218 result = -ENXIO;
3219 goto err_rq;
3220 }
3209 3221
3210 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; 3222 if (offset && length > U64_MAX - offset + 1) {
3211 length = (u64) blk_rq_bytes(rq); 3223 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3224 length);
3225 result = -EINVAL;
3226 goto err_rq; /* Shouldn't happen */
3227 }
3212 3228
3213 if (!length) { 3229 if (offset + length > rbd_dev->mapping.size) {
3214 dout("%s: zero-length request\n", __func__); 3230 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3215 __blk_end_request_all(rq, 0); 3231 length, rbd_dev->mapping.size);
3216 continue; 3232 result = -EIO;
3217 } 3233 goto err_rq;
3234 }
3218 3235
3219 spin_unlock_irq(q->queue_lock); 3236 img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
3237 if (!img_request) {
3238 result = -ENOMEM;
3239 goto err_rq;
3240 }
3241 img_request->rq = rq;
3220 3242
3221 /* Disallow writes to a read-only device */ 3243 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
3244 if (result)
3245 goto err_img_request;
3222 3246
3223 if (write_request) { 3247 result = rbd_img_request_submit(img_request);
3224 result = -EROFS; 3248 if (result)
3225 if (rbd_dev->mapping.read_only) 3249 goto err_img_request;
3226 goto end_request;
3227 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3228 }
3229 3250
3230 /* 3251 return;
3231 * Quit early if the mapped snapshot no longer
3232 * exists. It's still possible the snapshot will
3233 * have disappeared by the time our request arrives
3234 * at the osd, but there's no sense in sending it if
3235 * we already know.
3236 */
3237 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3238 dout("request for non-existent snapshot");
3239 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3240 result = -ENXIO;
3241 goto end_request;
3242 }
3243 3252
3244 result = -EINVAL; 3253err_img_request:
3245 if (offset && length > U64_MAX - offset + 1) { 3254 rbd_img_request_put(img_request);
3246 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", 3255err_rq:
3247 offset, length); 3256 if (result)
3248 goto end_request; /* Shouldn't happen */ 3257 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3249 } 3258 wr ? "write" : "read", length, offset, result);
3259 blk_end_request_all(rq, result);
3260}
3250 3261
3251 result = -EIO; 3262static void rbd_request_workfn(struct work_struct *work)
3252 if (offset + length > rbd_dev->mapping.size) { 3263{
3253 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", 3264 struct rbd_device *rbd_dev =
3254 offset, length, rbd_dev->mapping.size); 3265 container_of(work, struct rbd_device, rq_work);
3255 goto end_request; 3266 struct request *rq, *next;
3256 } 3267 LIST_HEAD(requests);
3257 3268
3258 result = -ENOMEM; 3269 spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3259 img_request = rbd_img_request_create(rbd_dev, offset, length, 3270 list_splice_init(&rbd_dev->rq_queue, &requests);
3260 write_request); 3271 spin_unlock_irq(&rbd_dev->lock);
3261 if (!img_request)
3262 goto end_request;
3263 3272
3264 img_request->rq = rq; 3273 list_for_each_entry_safe(rq, next, &requests, queuelist) {
3274 list_del_init(&rq->queuelist);
3275 rbd_handle_request(rbd_dev, rq);
3276 }
3277}
3265 3278
3266 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 3279/*
3267 rq->bio); 3280 * Called with q->queue_lock held and interrupts disabled, possibly on
3268 if (!result) 3281 * the way to schedule(). Do not sleep here!
3269 result = rbd_img_request_submit(img_request); 3282 */
3270 if (result) 3283static void rbd_request_fn(struct request_queue *q)
3271 rbd_img_request_put(img_request); 3284{
3272end_request: 3285 struct rbd_device *rbd_dev = q->queuedata;
3273 spin_lock_irq(q->queue_lock); 3286 struct request *rq;
3274 if (result < 0) { 3287 int queued = 0;
3275 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", 3288
3276 write_request ? "write" : "read", 3289 rbd_assert(rbd_dev);
3277 length, offset, result); 3290
3278 3291 while ((rq = blk_fetch_request(q))) {
3279 __blk_end_request_all(rq, result); 3292 /* Ignore any non-FS requests that filter through. */
3293 if (rq->cmd_type != REQ_TYPE_FS) {
3294 dout("%s: non-fs request type %d\n", __func__,
3295 (int) rq->cmd_type);
3296 __blk_end_request_all(rq, 0);
3297 continue;
3280 } 3298 }
3299
3300 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3301 queued++;
3281 } 3302 }
3303
3304 if (queued)
3305 queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
3282} 3306}
3283 3307
3284/* 3308/*
@@ -3517,24 +3541,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3517 u64 mapping_size; 3541 u64 mapping_size;
3518 int ret; 3542 int ret;
3519 3543
3520 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3521 down_write(&rbd_dev->header_rwsem); 3544 down_write(&rbd_dev->header_rwsem);
3522 mapping_size = rbd_dev->mapping.size; 3545 mapping_size = rbd_dev->mapping.size;
3523 if (rbd_dev->image_format == 1)
3524 ret = rbd_dev_v1_header_info(rbd_dev);
3525 else
3526 ret = rbd_dev_v2_header_info(rbd_dev);
3527 3546
3528 /* If it's a mapped snapshot, validate its EXISTS flag */ 3547 ret = rbd_dev_header_info(rbd_dev);
3548 if (ret)
3549 return ret;
3550
3551 /*
3552 * If there is a parent, see if it has disappeared due to the
3553 * mapped image getting flattened.
3554 */
3555 if (rbd_dev->parent) {
3556 ret = rbd_dev_v2_parent_info(rbd_dev);
3557 if (ret)
3558 return ret;
3559 }
3560
3561 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3562 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3563 rbd_dev->mapping.size = rbd_dev->header.image_size;
3564 } else {
3565 /* validate mapped snapshot's EXISTS flag */
3566 rbd_exists_validate(rbd_dev);
3567 }
3529 3568
3530 rbd_exists_validate(rbd_dev);
3531 up_write(&rbd_dev->header_rwsem); 3569 up_write(&rbd_dev->header_rwsem);
3532 3570
3533 if (mapping_size != rbd_dev->mapping.size) { 3571 if (mapping_size != rbd_dev->mapping.size)
3534 rbd_dev_update_size(rbd_dev); 3572 rbd_dev_update_size(rbd_dev);
3535 }
3536 3573
3537 return ret; 3574 return 0;
3538} 3575}
3539 3576
3540static int rbd_init_disk(struct rbd_device *rbd_dev) 3577static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3696,46 +3733,36 @@ static ssize_t rbd_snap_show(struct device *dev,
3696} 3733}
3697 3734
3698/* 3735/*
3699 * For an rbd v2 image, shows the pool id, image id, and snapshot id 3736 * For a v2 image, shows the chain of parent images, separated by empty
3700 * for the parent image. If there is no parent, simply shows 3737 * lines. For v1 images or if there is no parent, shows "(no parent
3701 * "(no parent image)". 3738 * image)".
3702 */ 3739 */
3703static ssize_t rbd_parent_show(struct device *dev, 3740static ssize_t rbd_parent_show(struct device *dev,
3704 struct device_attribute *attr, 3741 struct device_attribute *attr,
3705 char *buf) 3742 char *buf)
3706{ 3743{
3707 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3744 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3708 struct rbd_spec *spec = rbd_dev->parent_spec; 3745 ssize_t count = 0;
3709 int count;
3710 char *bufp = buf;
3711 3746
3712 if (!spec) 3747 if (!rbd_dev->parent)
3713 return sprintf(buf, "(no parent image)\n"); 3748 return sprintf(buf, "(no parent image)\n");
3714 3749
3715 count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 3750 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3716 (unsigned long long) spec->pool_id, spec->pool_name); 3751 struct rbd_spec *spec = rbd_dev->parent_spec;
3717 if (count < 0) 3752
3718 return count; 3753 count += sprintf(&buf[count], "%s"
3719 bufp += count; 3754 "pool_id %llu\npool_name %s\n"
3720 3755 "image_id %s\nimage_name %s\n"
3721 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 3756 "snap_id %llu\nsnap_name %s\n"
3722 spec->image_name ? spec->image_name : "(unknown)"); 3757 "overlap %llu\n",
3723 if (count < 0) 3758 !count ? "" : "\n", /* first? */
3724 return count; 3759 spec->pool_id, spec->pool_name,
3725 bufp += count; 3760 spec->image_id, spec->image_name ?: "(unknown)",
3726 3761 spec->snap_id, spec->snap_name,
3727 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 3762 rbd_dev->parent_overlap);
3728 (unsigned long long) spec->snap_id, spec->snap_name); 3763 }
3729 if (count < 0)
3730 return count;
3731 bufp += count;
3732
3733 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3734 if (count < 0)
3735 return count;
3736 bufp += count;
3737 3764
3738 return (ssize_t) (bufp - buf); 3765 return count;
3739} 3766}
3740 3767
3741static ssize_t rbd_image_refresh(struct device *dev, 3768static ssize_t rbd_image_refresh(struct device *dev,
@@ -3748,9 +3775,9 @@ static ssize_t rbd_image_refresh(struct device *dev,
3748 3775
3749 ret = rbd_dev_refresh(rbd_dev); 3776 ret = rbd_dev_refresh(rbd_dev);
3750 if (ret) 3777 if (ret)
3751 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret); 3778 return ret;
3752 3779
3753 return ret < 0 ? ret : size; 3780 return size;
3754} 3781}
3755 3782
3756static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 3783static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
@@ -3822,6 +3849,9 @@ static struct rbd_spec *rbd_spec_alloc(void)
3822 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 3849 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3823 if (!spec) 3850 if (!spec)
3824 return NULL; 3851 return NULL;
3852
3853 spec->pool_id = CEPH_NOPOOL;
3854 spec->snap_id = CEPH_NOSNAP;
3825 kref_init(&spec->kref); 3855 kref_init(&spec->kref);
3826 3856
3827 return spec; 3857 return spec;
@@ -3848,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3848 return NULL; 3878 return NULL;
3849 3879
3850 spin_lock_init(&rbd_dev->lock); 3880 spin_lock_init(&rbd_dev->lock);
3881 INIT_LIST_HEAD(&rbd_dev->rq_queue);
3882 INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
3851 rbd_dev->flags = 0; 3883 rbd_dev->flags = 0;
3852 atomic_set(&rbd_dev->parent_ref, 0); 3884 atomic_set(&rbd_dev->parent_ref, 0);
3853 INIT_LIST_HEAD(&rbd_dev->node); 3885 INIT_LIST_HEAD(&rbd_dev->node);
@@ -4021,7 +4053,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4021 goto out_err; 4053 goto out_err;
4022 } 4054 }
4023 4055
4024 snapid = cpu_to_le64(CEPH_NOSNAP); 4056 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4025 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 4057 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4026 "rbd", "get_parent", 4058 "rbd", "get_parent",
4027 &snapid, sizeof (snapid), 4059 &snapid, sizeof (snapid),
@@ -4059,7 +4091,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4059 4091
4060 ret = -EIO; 4092 ret = -EIO;
4061 if (pool_id > (u64)U32_MAX) { 4093 if (pool_id > (u64)U32_MAX) {
4062 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", 4094 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4063 (unsigned long long)pool_id, U32_MAX); 4095 (unsigned long long)pool_id, U32_MAX);
4064 goto out_err; 4096 goto out_err;
4065 } 4097 }
@@ -4083,6 +4115,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4083 parent_spec->snap_id = snap_id; 4115 parent_spec->snap_id = snap_id;
4084 rbd_dev->parent_spec = parent_spec; 4116 rbd_dev->parent_spec = parent_spec;
4085 parent_spec = NULL; /* rbd_dev now owns this */ 4117 parent_spec = NULL; /* rbd_dev now owns this */
4118 } else {
4119 kfree(image_id);
4086 } 4120 }
4087 4121
4088 /* 4122 /*
@@ -4110,8 +4144,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4110 * overlap is zero we just pretend there was 4144 * overlap is zero we just pretend there was
4111 * no parent image. 4145 * no parent image.
4112 */ 4146 */
4113 rbd_warn(rbd_dev, "ignoring parent of " 4147 rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4114 "clone with overlap 0\n");
4115 } 4148 }
4116 } 4149 }
4117out: 4150out:
@@ -4279,18 +4312,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4279} 4312}
4280 4313
4281/* 4314/*
4282 * When an rbd image has a parent image, it is identified by the 4315 * An image being mapped will have everything but the snap id.
4283 * pool, image, and snapshot ids (not names). This function fills 4316 */
4284 * in the names for those ids. (It's OK if we can't figure out the 4317static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4285 * name for an image id, but the pool and snapshot ids should always 4318{
4286 * exist and have names.) All names in an rbd spec are dynamically 4319 struct rbd_spec *spec = rbd_dev->spec;
4287 * allocated. 4320
4321 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4322 rbd_assert(spec->image_id && spec->image_name);
4323 rbd_assert(spec->snap_name);
4324
4325 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4326 u64 snap_id;
4327
4328 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4329 if (snap_id == CEPH_NOSNAP)
4330 return -ENOENT;
4331
4332 spec->snap_id = snap_id;
4333 } else {
4334 spec->snap_id = CEPH_NOSNAP;
4335 }
4336
4337 return 0;
4338}
4339
4340/*
4341 * A parent image will have all ids but none of the names.
4288 * 4342 *
4289 * When an image being mapped (not a parent) is probed, we have the 4343 * All names in an rbd spec are dynamically allocated. It's OK if we
4290 * pool name and pool id, image name and image id, and the snapshot 4344 * can't figure out the name for an image id.
4291 * name. The only thing we're missing is the snapshot id.
4292 */ 4345 */
4293static int rbd_dev_spec_update(struct rbd_device *rbd_dev) 4346static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4294{ 4347{
4295 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 4348 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4296 struct rbd_spec *spec = rbd_dev->spec; 4349 struct rbd_spec *spec = rbd_dev->spec;
@@ -4299,24 +4352,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4299 const char *snap_name; 4352 const char *snap_name;
4300 int ret; 4353 int ret;
4301 4354
4302 /* 4355 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4303 * An image being mapped will have the pool name (etc.), but 4356 rbd_assert(spec->image_id);
4304 * we need to look up the snapshot id. 4357 rbd_assert(spec->snap_id != CEPH_NOSNAP);
4305 */
4306 if (spec->pool_name) {
4307 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4308 u64 snap_id;
4309
4310 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4311 if (snap_id == CEPH_NOSNAP)
4312 return -ENOENT;
4313 spec->snap_id = snap_id;
4314 } else {
4315 spec->snap_id = CEPH_NOSNAP;
4316 }
4317
4318 return 0;
4319 }
4320 4358
4321 /* Get the pool name; we have to make our own copy of this */ 4359 /* Get the pool name; we have to make our own copy of this */
4322 4360
@@ -4335,7 +4373,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4335 if (!image_name) 4373 if (!image_name)
4336 rbd_warn(rbd_dev, "unable to get image name"); 4374 rbd_warn(rbd_dev, "unable to get image name");
4337 4375
4338 /* Look up the snapshot name, and make a copy */ 4376 /* Fetch the snapshot name */
4339 4377
4340 snap_name = rbd_snap_name(rbd_dev, spec->snap_id); 4378 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4341 if (IS_ERR(snap_name)) { 4379 if (IS_ERR(snap_name)) {
@@ -4348,10 +4386,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4348 spec->snap_name = snap_name; 4386 spec->snap_name = snap_name;
4349 4387
4350 return 0; 4388 return 0;
4389
4351out_err: 4390out_err:
4352 kfree(image_name); 4391 kfree(image_name);
4353 kfree(pool_name); 4392 kfree(pool_name);
4354
4355 return ret; 4393 return ret;
4356} 4394}
4357 4395
@@ -4483,43 +4521,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4483 return ret; 4521 return ret;
4484 } 4522 }
4485 4523
4486 /*
4487 * If the image supports layering, get the parent info. We
4488 * need to probe the first time regardless. Thereafter we
4489 * only need to if there's a parent, to see if it has
4490 * disappeared due to the mapped image getting flattened.
4491 */
4492 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4493 (first_time || rbd_dev->parent_spec)) {
4494 bool warn;
4495
4496 ret = rbd_dev_v2_parent_info(rbd_dev);
4497 if (ret)
4498 return ret;
4499
4500 /*
4501 * Print a warning if this is the initial probe and
4502 * the image has a parent. Don't print it if the
4503 * image now being probed is itself a parent. We
4504 * can tell at this point because we won't know its
4505 * pool name yet (just its pool id).
4506 */
4507 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4508 if (first_time && warn)
4509 rbd_warn(rbd_dev, "WARNING: kernel layering "
4510 "is EXPERIMENTAL!");
4511 }
4512
4513 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4514 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4515 rbd_dev->mapping.size = rbd_dev->header.image_size;
4516
4517 ret = rbd_dev_v2_snap_context(rbd_dev); 4524 ret = rbd_dev_v2_snap_context(rbd_dev);
4518 dout("rbd_dev_v2_snap_context returned %d\n", ret); 4525 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4519 4526
4520 return ret; 4527 return ret;
4521} 4528}
4522 4529
4530static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4531{
4532 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4533
4534 if (rbd_dev->image_format == 1)
4535 return rbd_dev_v1_header_info(rbd_dev);
4536
4537 return rbd_dev_v2_header_info(rbd_dev);
4538}
4539
4523static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 4540static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4524{ 4541{
4525 struct device *dev; 4542 struct device *dev;
@@ -5066,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5066 ret = rbd_dev_mapping_set(rbd_dev); 5083 ret = rbd_dev_mapping_set(rbd_dev);
5067 if (ret) 5084 if (ret)
5068 goto err_out_disk; 5085 goto err_out_disk;
5086
5069 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 5087 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5070 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); 5088 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5071 5089
5090 rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
5091 if (!rbd_dev->rq_wq)
5092 goto err_out_mapping;
5093
5072 ret = rbd_bus_add_dev(rbd_dev); 5094 ret = rbd_bus_add_dev(rbd_dev);
5073 if (ret) 5095 if (ret)
5074 goto err_out_mapping; 5096 goto err_out_workqueue;
5075 5097
5076 /* Everything's ready. Announce the disk to the world. */ 5098 /* Everything's ready. Announce the disk to the world. */
5077 5099
@@ -5083,6 +5105,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5083 5105
5084 return ret; 5106 return ret;
5085 5107
5108err_out_workqueue:
5109 destroy_workqueue(rbd_dev->rq_wq);
5110 rbd_dev->rq_wq = NULL;
5086err_out_mapping: 5111err_out_mapping:
5087 rbd_dev_mapping_clear(rbd_dev); 5112 rbd_dev_mapping_clear(rbd_dev);
5088err_out_disk: 5113err_out_disk:
@@ -5155,8 +5180,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5155 ret = rbd_dev_image_id(rbd_dev); 5180 ret = rbd_dev_image_id(rbd_dev);
5156 if (ret) 5181 if (ret)
5157 return ret; 5182 return ret;
5158 rbd_assert(rbd_dev->spec->image_id);
5159 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5160 5183
5161 ret = rbd_dev_header_name(rbd_dev); 5184 ret = rbd_dev_header_name(rbd_dev);
5162 if (ret) 5185 if (ret)
@@ -5168,25 +5191,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5168 goto out_header_name; 5191 goto out_header_name;
5169 } 5192 }
5170 5193
5171 if (rbd_dev->image_format == 1) 5194 ret = rbd_dev_header_info(rbd_dev);
5172 ret = rbd_dev_v1_header_info(rbd_dev);
5173 else
5174 ret = rbd_dev_v2_header_info(rbd_dev);
5175 if (ret) 5195 if (ret)
5176 goto err_out_watch; 5196 goto err_out_watch;
5177 5197
5178 ret = rbd_dev_spec_update(rbd_dev); 5198 /*
5199 * If this image is the one being mapped, we have pool name and
5200 * id, image name and id, and snap name - need to fill snap id.
5201 * Otherwise this is a parent image, identified by pool, image
5202 * and snap ids - need to fill in names for those ids.
5203 */
5204 if (mapping)
5205 ret = rbd_spec_fill_snap_id(rbd_dev);
5206 else
5207 ret = rbd_spec_fill_names(rbd_dev);
5179 if (ret) 5208 if (ret)
5180 goto err_out_probe; 5209 goto err_out_probe;
5181 5210
5211 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5212 ret = rbd_dev_v2_parent_info(rbd_dev);
5213 if (ret)
5214 goto err_out_probe;
5215
5216 /*
5217 * Need to warn users if this image is the one being
5218 * mapped and has a parent.
5219 */
5220 if (mapping && rbd_dev->parent_spec)
5221 rbd_warn(rbd_dev,
5222 "WARNING: kernel layering is EXPERIMENTAL!");
5223 }
5224
5182 ret = rbd_dev_probe_parent(rbd_dev); 5225 ret = rbd_dev_probe_parent(rbd_dev);
5183 if (ret) 5226 if (ret)
5184 goto err_out_probe; 5227 goto err_out_probe;
5185 5228
5186 dout("discovered format %u image, header name is %s\n", 5229 dout("discovered format %u image, header name is %s\n",
5187 rbd_dev->image_format, rbd_dev->header_name); 5230 rbd_dev->image_format, rbd_dev->header_name);
5188
5189 return 0; 5231 return 0;
5232
5190err_out_probe: 5233err_out_probe:
5191 rbd_dev_unprobe(rbd_dev); 5234 rbd_dev_unprobe(rbd_dev);
5192err_out_watch: 5235err_out_watch:
@@ -5199,9 +5242,6 @@ err_out_format:
5199 rbd_dev->image_format = 0; 5242 rbd_dev->image_format = 0;
5200 kfree(rbd_dev->spec->image_id); 5243 kfree(rbd_dev->spec->image_id);
5201 rbd_dev->spec->image_id = NULL; 5244 rbd_dev->spec->image_id = NULL;
5202
5203 dout("probe failed, returning %d\n", ret);
5204
5205 return ret; 5245 return ret;
5206} 5246}
5207 5247
@@ -5243,7 +5283,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
5243 /* The ceph file layout needs to fit pool id in 32 bits */ 5283 /* The ceph file layout needs to fit pool id in 32 bits */
5244 5284
5245 if (spec->pool_id > (u64)U32_MAX) { 5285 if (spec->pool_id > (u64)U32_MAX) {
5246 rbd_warn(NULL, "pool id too large (%llu > %u)\n", 5286 rbd_warn(NULL, "pool id too large (%llu > %u)",
5247 (unsigned long long)spec->pool_id, U32_MAX); 5287 (unsigned long long)spec->pool_id, U32_MAX);
5248 rc = -EIO; 5288 rc = -EIO;
5249 goto err_out_client; 5289 goto err_out_client;
@@ -5314,6 +5354,7 @@ static void rbd_dev_device_release(struct device *dev)
5314{ 5354{
5315 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 5355 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5316 5356
5357 destroy_workqueue(rbd_dev->rq_wq);
5317 rbd_free_disk(rbd_dev); 5358 rbd_free_disk(rbd_dev);
5318 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 5359 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5319 rbd_dev_mapping_clear(rbd_dev); 5360 rbd_dev_mapping_clear(rbd_dev);
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 469f2e8657e8..cebf2ebefb55 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -172,14 +172,24 @@ out:
172int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) 172int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
173{ 173{
174 struct posix_acl *default_acl, *acl; 174 struct posix_acl *default_acl, *acl;
175 umode_t new_mode = inode->i_mode;
175 int error; 176 int error;
176 177
177 error = posix_acl_create(dir, &inode->i_mode, &default_acl, &acl); 178 error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
178 if (error) 179 if (error)
179 return error; 180 return error;
180 181
181 if (!default_acl && !acl) 182 if (!default_acl && !acl) {
182 cache_no_acl(inode); 183 cache_no_acl(inode);
184 if (new_mode != inode->i_mode) {
185 struct iattr newattrs = {
186 .ia_mode = new_mode,
187 .ia_valid = ATTR_MODE,
188 };
189 error = ceph_setattr(dentry, &newattrs);
190 }
191 return error;
192 }
183 193
184 if (default_acl) { 194 if (default_acl) {
185 error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT); 195 error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 1fde164b74b5..6d1cd45dca89 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -3277,7 +3277,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
3277 rel->ino = cpu_to_le64(ceph_ino(inode)); 3277 rel->ino = cpu_to_le64(ceph_ino(inode));
3278 rel->cap_id = cpu_to_le64(cap->cap_id); 3278 rel->cap_id = cpu_to_le64(cap->cap_id);
3279 rel->seq = cpu_to_le32(cap->seq); 3279 rel->seq = cpu_to_le32(cap->seq);
3280 rel->issue_seq = cpu_to_le32(cap->issue_seq), 3280 rel->issue_seq = cpu_to_le32(cap->issue_seq);
3281 rel->mseq = cpu_to_le32(cap->mseq); 3281 rel->mseq = cpu_to_le32(cap->mseq);
3282 rel->caps = cpu_to_le32(cap->implemented); 3282 rel->caps = cpu_to_le32(cap->implemented);
3283 rel->wanted = cpu_to_le32(cap->mds_wanted); 3283 rel->wanted = cpu_to_le32(cap->mds_wanted);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 302085100c28..2eb02f80a0ab 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -423,6 +423,9 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
423 dout("sync_read on file %p %llu~%u %s\n", file, off, 423 dout("sync_read on file %p %llu~%u %s\n", file, off,
424 (unsigned)len, 424 (unsigned)len,
425 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 425 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
426
427 if (!len)
428 return 0;
426 /* 429 /*
427 * flush any page cache pages in this range. this 430 * flush any page cache pages in this range. this
428 * will make concurrent normal and sync io slow, 431 * will make concurrent normal and sync io slow,
@@ -470,8 +473,11 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
470 size_t left = ret; 473 size_t left = ret;
471 474
472 while (left) { 475 while (left) {
473 int copy = min_t(size_t, PAGE_SIZE, left); 476 size_t page_off = off & ~PAGE_MASK;
474 l = copy_page_to_iter(pages[k++], 0, copy, i); 477 size_t copy = min_t(size_t,
478 PAGE_SIZE - page_off, left);
479 l = copy_page_to_iter(pages[k++], page_off,
480 copy, i);
475 off += l; 481 off += l;
476 left -= l; 482 left -= l;
477 if (l < copy) 483 if (l < copy)
@@ -531,7 +537,7 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
531 * objects, rollback on failure, etc.) 537 * objects, rollback on failure, etc.)
532 */ 538 */
533static ssize_t 539static ssize_t
534ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from) 540ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
535{ 541{
536 struct file *file = iocb->ki_filp; 542 struct file *file = iocb->ki_filp;
537 struct inode *inode = file_inode(file); 543 struct inode *inode = file_inode(file);
@@ -547,7 +553,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
547 int check_caps = 0; 553 int check_caps = 0;
548 int ret; 554 int ret;
549 struct timespec mtime = CURRENT_TIME; 555 struct timespec mtime = CURRENT_TIME;
550 loff_t pos = iocb->ki_pos;
551 size_t count = iov_iter_count(from); 556 size_t count = iov_iter_count(from);
552 557
553 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 558 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
@@ -646,7 +651,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
646 * correct atomic write, we should e.g. take write locks on all 651 * correct atomic write, we should e.g. take write locks on all
647 * objects, rollback on failure, etc.) 652 * objects, rollback on failure, etc.)
648 */ 653 */
649static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from) 654static ssize_t
655ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
650{ 656{
651 struct file *file = iocb->ki_filp; 657 struct file *file = iocb->ki_filp;
652 struct inode *inode = file_inode(file); 658 struct inode *inode = file_inode(file);
@@ -663,7 +669,6 @@ static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from)
663 int check_caps = 0; 669 int check_caps = 0;
664 int ret; 670 int ret;
665 struct timespec mtime = CURRENT_TIME; 671 struct timespec mtime = CURRENT_TIME;
666 loff_t pos = iocb->ki_pos;
667 size_t count = iov_iter_count(from); 672 size_t count = iov_iter_count(from);
668 673
669 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 674 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
@@ -918,9 +923,9 @@ retry_snap:
918 /* we might need to revert back to that point */ 923 /* we might need to revert back to that point */
919 data = *from; 924 data = *from;
920 if (file->f_flags & O_DIRECT) 925 if (file->f_flags & O_DIRECT)
921 written = ceph_sync_direct_write(iocb, &data); 926 written = ceph_sync_direct_write(iocb, &data, pos);
922 else 927 else
923 written = ceph_sync_write(iocb, &data); 928 written = ceph_sync_write(iocb, &data, pos);
924 if (written == -EOLDSNAPC) { 929 if (written == -EOLDSNAPC) {
925 dout("aio_write %p %llx.%llx %llu~%u" 930 dout("aio_write %p %llx.%llx %llu~%u"
926 "got EOLDSNAPC, retrying\n", 931 "got EOLDSNAPC, retrying\n",
@@ -1177,6 +1182,9 @@ static long ceph_fallocate(struct file *file, int mode,
1177 loff_t endoff = 0; 1182 loff_t endoff = 0;
1178 loff_t size; 1183 loff_t size;
1179 1184
1185 if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
1186 return -EOPNOTSUPP;
1187
1180 if (!S_ISREG(inode->i_mode)) 1188 if (!S_ISREG(inode->i_mode))
1181 return -EOPNOTSUPP; 1189 return -EOPNOTSUPP;
1182 1190
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 92a2548278fc..bad07c09f91e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1904,6 +1904,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1904 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1904 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1905 1905
1906 if (req->r_got_unsafe) { 1906 if (req->r_got_unsafe) {
1907 void *p;
1907 /* 1908 /*
1908 * Replay. Do not regenerate message (and rebuild 1909 * Replay. Do not regenerate message (and rebuild
1909 * paths, etc.); just use the original message. 1910 * paths, etc.); just use the original message.
@@ -1924,8 +1925,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1924 1925
1925 /* remove cap/dentry releases from message */ 1926 /* remove cap/dentry releases from message */
1926 rhead->num_releases = 0; 1927 rhead->num_releases = 0;
1927 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); 1928
1928 msg->front.iov_len = req->r_request_release_offset; 1929 /* time stamp */
1930 p = msg->front.iov_base + req->r_request_release_offset;
1931 ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
1932
1933 msg->front.iov_len = p - msg->front.iov_base;
1934 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1929 return 0; 1935 return 0;
1930 } 1936 }
1931 1937
@@ -2061,11 +2067,12 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
2061static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2067static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2062{ 2068{
2063 struct ceph_mds_request *req; 2069 struct ceph_mds_request *req;
2064 struct rb_node *p; 2070 struct rb_node *p = rb_first(&mdsc->request_tree);
2065 2071
2066 dout("kick_requests mds%d\n", mds); 2072 dout("kick_requests mds%d\n", mds);
2067 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { 2073 while (p) {
2068 req = rb_entry(p, struct ceph_mds_request, r_node); 2074 req = rb_entry(p, struct ceph_mds_request, r_node);
2075 p = rb_next(p);
2069 if (req->r_got_unsafe) 2076 if (req->r_got_unsafe)
2070 continue; 2077 continue;
2071 if (req->r_session && 2078 if (req->r_session &&
@@ -2248,6 +2255,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2248 */ 2255 */
2249 if (result == -ESTALE) { 2256 if (result == -ESTALE) {
2250 dout("got ESTALE on request %llu", req->r_tid); 2257 dout("got ESTALE on request %llu", req->r_tid);
2258 req->r_resend_mds = -1;
2251 if (req->r_direct_mode != USE_AUTH_MDS) { 2259 if (req->r_direct_mode != USE_AUTH_MDS) {
2252 dout("not using auth, setting for that now"); 2260 dout("not using auth, setting for that now");
2253 req->r_direct_mode = USE_AUTH_MDS; 2261 req->r_direct_mode = USE_AUTH_MDS;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index c9c2b887381e..12f58d22e017 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -592,12 +592,12 @@ start:
592 xattr_version = ci->i_xattrs.version; 592 xattr_version = ci->i_xattrs.version;
593 spin_unlock(&ci->i_ceph_lock); 593 spin_unlock(&ci->i_ceph_lock);
594 594
595 xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *), 595 xattrs = kcalloc(numattr, sizeof(struct ceph_inode_xattr *),
596 GFP_NOFS); 596 GFP_NOFS);
597 err = -ENOMEM; 597 err = -ENOMEM;
598 if (!xattrs) 598 if (!xattrs)
599 goto bad_lock; 599 goto bad_lock;
600 memset(xattrs, 0, numattr*sizeof(struct ceph_xattr *)); 600
601 for (i = 0; i < numattr; i++) { 601 for (i = 0; i < numattr; i++) {
602 xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr), 602 xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr),
603 GFP_NOFS); 603 GFP_NOFS);
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index d21f2dba0731..40ae58e3e9db 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -285,19 +285,9 @@ extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
285 285
286extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 286extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
287 bool can_fail); 287 bool can_fail);
288extern void ceph_msg_kfree(struct ceph_msg *m);
289 288
290 289extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
291static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) 290extern void ceph_msg_put(struct ceph_msg *msg);
292{
293 kref_get(&msg->kref);
294 return msg;
295}
296extern void ceph_msg_last_put(struct kref *kref);
297static inline void ceph_msg_put(struct ceph_msg *msg)
298{
299 kref_put(&msg->kref, ceph_msg_last_put);
300}
301 291
302extern void ceph_msg_dump(struct ceph_msg *msg); 292extern void ceph_msg_dump(struct ceph_msg *msg);
303 293
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 94ec69672164..03aeb27fcc69 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -117,7 +117,7 @@ struct ceph_osd_request {
117 struct list_head r_req_lru_item; 117 struct list_head r_req_lru_item;
118 struct list_head r_osd_item; 118 struct list_head r_osd_item;
119 struct list_head r_linger_item; 119 struct list_head r_linger_item;
120 struct list_head r_linger_osd; 120 struct list_head r_linger_osd_item;
121 struct ceph_osd *r_osd; 121 struct ceph_osd *r_osd;
122 struct ceph_pg r_pgid; 122 struct ceph_pg r_pgid;
123 int r_pg_osds[CEPH_PG_MAX_SIZE]; 123 int r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -325,22 +325,14 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
325 325
326extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 326extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
327 struct ceph_osd_request *req); 327 struct ceph_osd_request *req);
328extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 328
329 struct ceph_osd_request *req); 329extern void ceph_osdc_get_request(struct ceph_osd_request *req);
330 330extern void ceph_osdc_put_request(struct ceph_osd_request *req);
331static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
332{
333 kref_get(&req->r_kref);
334}
335extern void ceph_osdc_release_request(struct kref *kref);
336static inline void ceph_osdc_put_request(struct ceph_osd_request *req)
337{
338 kref_put(&req->r_kref, ceph_osdc_release_request);
339}
340 331
341extern int ceph_osdc_start_request(struct ceph_osd_client *osdc, 332extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
342 struct ceph_osd_request *req, 333 struct ceph_osd_request *req,
343 bool nofail); 334 bool nofail);
335extern void ceph_osdc_cancel_request(struct ceph_osd_request *req);
344extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 336extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
345 struct ceph_osd_request *req); 337 struct ceph_osd_request *req);
346extern void ceph_osdc_sync(struct ceph_osd_client *osdc); 338extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 1948d592aa54..b2f571dd933d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -174,6 +174,7 @@ static struct lock_class_key socket_class;
174#define SKIP_BUF_SIZE 1024 174#define SKIP_BUF_SIZE 1024
175 175
176static void queue_con(struct ceph_connection *con); 176static void queue_con(struct ceph_connection *con);
177static void cancel_con(struct ceph_connection *con);
177static void con_work(struct work_struct *); 178static void con_work(struct work_struct *);
178static void con_fault(struct ceph_connection *con); 179static void con_fault(struct ceph_connection *con);
179 180
@@ -680,7 +681,7 @@ void ceph_con_close(struct ceph_connection *con)
680 681
681 reset_connection(con); 682 reset_connection(con);
682 con->peer_global_seq = 0; 683 con->peer_global_seq = 0;
683 cancel_delayed_work(&con->work); 684 cancel_con(con);
684 con_close_socket(con); 685 con_close_socket(con);
685 mutex_unlock(&con->mutex); 686 mutex_unlock(&con->mutex);
686} 687}
@@ -900,7 +901,7 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
900 BUG_ON(page_count > (int)USHRT_MAX); 901 BUG_ON(page_count > (int)USHRT_MAX);
901 cursor->page_count = (unsigned short)page_count; 902 cursor->page_count = (unsigned short)page_count;
902 BUG_ON(length > SIZE_MAX - cursor->page_offset); 903 BUG_ON(length > SIZE_MAX - cursor->page_offset);
903 cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE; 904 cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
904} 905}
905 906
906static struct page * 907static struct page *
@@ -2667,19 +2668,16 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2667{ 2668{
2668 if (!con->ops->get(con)) { 2669 if (!con->ops->get(con)) {
2669 dout("%s %p ref count 0\n", __func__, con); 2670 dout("%s %p ref count 0\n", __func__, con);
2670
2671 return -ENOENT; 2671 return -ENOENT;
2672 } 2672 }
2673 2673
2674 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { 2674 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2675 dout("%s %p - already queued\n", __func__, con); 2675 dout("%s %p - already queued\n", __func__, con);
2676 con->ops->put(con); 2676 con->ops->put(con);
2677
2678 return -EBUSY; 2677 return -EBUSY;
2679 } 2678 }
2680 2679
2681 dout("%s %p %lu\n", __func__, con, delay); 2680 dout("%s %p %lu\n", __func__, con, delay);
2682
2683 return 0; 2681 return 0;
2684} 2682}
2685 2683
@@ -2688,6 +2686,14 @@ static void queue_con(struct ceph_connection *con)
2688 (void) queue_con_delay(con, 0); 2686 (void) queue_con_delay(con, 0);
2689} 2687}
2690 2688
2689static void cancel_con(struct ceph_connection *con)
2690{
2691 if (cancel_delayed_work(&con->work)) {
2692 dout("%s %p\n", __func__, con);
2693 con->ops->put(con);
2694 }
2695}
2696
2691static bool con_sock_closed(struct ceph_connection *con) 2697static bool con_sock_closed(struct ceph_connection *con)
2692{ 2698{
2693 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) 2699 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
@@ -3269,24 +3275,21 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3269/* 3275/*
3270 * Free a generically kmalloc'd message. 3276 * Free a generically kmalloc'd message.
3271 */ 3277 */
3272void ceph_msg_kfree(struct ceph_msg *m) 3278static void ceph_msg_free(struct ceph_msg *m)
3273{ 3279{
3274 dout("msg_kfree %p\n", m); 3280 dout("%s %p\n", __func__, m);
3275 ceph_kvfree(m->front.iov_base); 3281 ceph_kvfree(m->front.iov_base);
3276 kmem_cache_free(ceph_msg_cache, m); 3282 kmem_cache_free(ceph_msg_cache, m);
3277} 3283}
3278 3284
3279/* 3285static void ceph_msg_release(struct kref *kref)
3280 * Drop a msg ref. Destroy as needed.
3281 */
3282void ceph_msg_last_put(struct kref *kref)
3283{ 3286{
3284 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3287 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3285 LIST_HEAD(data); 3288 LIST_HEAD(data);
3286 struct list_head *links; 3289 struct list_head *links;
3287 struct list_head *next; 3290 struct list_head *next;
3288 3291
3289 dout("ceph_msg_put last one on %p\n", m); 3292 dout("%s %p\n", __func__, m);
3290 WARN_ON(!list_empty(&m->list_head)); 3293 WARN_ON(!list_empty(&m->list_head));
3291 3294
3292 /* drop middle, data, if any */ 3295 /* drop middle, data, if any */
@@ -3308,9 +3311,25 @@ void ceph_msg_last_put(struct kref *kref)
3308 if (m->pool) 3311 if (m->pool)
3309 ceph_msgpool_put(m->pool, m); 3312 ceph_msgpool_put(m->pool, m);
3310 else 3313 else
3311 ceph_msg_kfree(m); 3314 ceph_msg_free(m);
3315}
3316
3317struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
3318{
3319 dout("%s %p (was %d)\n", __func__, msg,
3320 atomic_read(&msg->kref.refcount));
3321 kref_get(&msg->kref);
3322 return msg;
3323}
3324EXPORT_SYMBOL(ceph_msg_get);
3325
3326void ceph_msg_put(struct ceph_msg *msg)
3327{
3328 dout("%s %p (was %d)\n", __func__, msg,
3329 atomic_read(&msg->kref.refcount));
3330 kref_put(&msg->kref, ceph_msg_release);
3312} 3331}
3313EXPORT_SYMBOL(ceph_msg_last_put); 3332EXPORT_SYMBOL(ceph_msg_put);
3314 3333
3315void ceph_msg_dump(struct ceph_msg *msg) 3334void ceph_msg_dump(struct ceph_msg *msg)
3316{ 3335{
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 05be0c181695..30f6faf3584f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -297,12 +297,21 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
297/* 297/*
298 * requests 298 * requests
299 */ 299 */
300void ceph_osdc_release_request(struct kref *kref) 300static void ceph_osdc_release_request(struct kref *kref)
301{ 301{
302 struct ceph_osd_request *req; 302 struct ceph_osd_request *req = container_of(kref,
303 struct ceph_osd_request, r_kref);
303 unsigned int which; 304 unsigned int which;
304 305
305 req = container_of(kref, struct ceph_osd_request, r_kref); 306 dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
307 req->r_request, req->r_reply);
308 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
309 WARN_ON(!list_empty(&req->r_req_lru_item));
310 WARN_ON(!list_empty(&req->r_osd_item));
311 WARN_ON(!list_empty(&req->r_linger_item));
312 WARN_ON(!list_empty(&req->r_linger_osd_item));
313 WARN_ON(req->r_osd);
314
306 if (req->r_request) 315 if (req->r_request)
307 ceph_msg_put(req->r_request); 316 ceph_msg_put(req->r_request);
308 if (req->r_reply) { 317 if (req->r_reply) {
@@ -320,7 +329,22 @@ void ceph_osdc_release_request(struct kref *kref)
320 kmem_cache_free(ceph_osd_request_cache, req); 329 kmem_cache_free(ceph_osd_request_cache, req);
321 330
322} 331}
323EXPORT_SYMBOL(ceph_osdc_release_request); 332
333void ceph_osdc_get_request(struct ceph_osd_request *req)
334{
335 dout("%s %p (was %d)\n", __func__, req,
336 atomic_read(&req->r_kref.refcount));
337 kref_get(&req->r_kref);
338}
339EXPORT_SYMBOL(ceph_osdc_get_request);
340
341void ceph_osdc_put_request(struct ceph_osd_request *req)
342{
343 dout("%s %p (was %d)\n", __func__, req,
344 atomic_read(&req->r_kref.refcount));
345 kref_put(&req->r_kref, ceph_osdc_release_request);
346}
347EXPORT_SYMBOL(ceph_osdc_put_request);
324 348
325struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 349struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
326 struct ceph_snap_context *snapc, 350 struct ceph_snap_context *snapc,
@@ -364,7 +388,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
364 RB_CLEAR_NODE(&req->r_node); 388 RB_CLEAR_NODE(&req->r_node);
365 INIT_LIST_HEAD(&req->r_unsafe_item); 389 INIT_LIST_HEAD(&req->r_unsafe_item);
366 INIT_LIST_HEAD(&req->r_linger_item); 390 INIT_LIST_HEAD(&req->r_linger_item);
367 INIT_LIST_HEAD(&req->r_linger_osd); 391 INIT_LIST_HEAD(&req->r_linger_osd_item);
368 INIT_LIST_HEAD(&req->r_req_lru_item); 392 INIT_LIST_HEAD(&req->r_req_lru_item);
369 INIT_LIST_HEAD(&req->r_osd_item); 393 INIT_LIST_HEAD(&req->r_osd_item);
370 394
@@ -916,7 +940,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
916 * list at the end to keep things in tid order. 940 * list at the end to keep things in tid order.
917 */ 941 */
918 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 942 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
919 r_linger_osd) { 943 r_linger_osd_item) {
920 /* 944 /*
921 * reregister request prior to unregistering linger so 945 * reregister request prior to unregistering linger so
922 * that r_osd is preserved. 946 * that r_osd is preserved.
@@ -1008,6 +1032,8 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
1008{ 1032{
1009 dout("__remove_osd %p\n", osd); 1033 dout("__remove_osd %p\n", osd);
1010 BUG_ON(!list_empty(&osd->o_requests)); 1034 BUG_ON(!list_empty(&osd->o_requests));
1035 BUG_ON(!list_empty(&osd->o_linger_requests));
1036
1011 rb_erase(&osd->o_node, &osdc->osds); 1037 rb_erase(&osd->o_node, &osdc->osds);
1012 list_del_init(&osd->o_osd_lru); 1038 list_del_init(&osd->o_osd_lru);
1013 ceph_con_close(&osd->o_con); 1039 ceph_con_close(&osd->o_con);
@@ -1029,12 +1055,23 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
1029static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1055static void __move_osd_to_lru(struct ceph_osd_client *osdc,
1030 struct ceph_osd *osd) 1056 struct ceph_osd *osd)
1031{ 1057{
1032 dout("__move_osd_to_lru %p\n", osd); 1058 dout("%s %p\n", __func__, osd);
1033 BUG_ON(!list_empty(&osd->o_osd_lru)); 1059 BUG_ON(!list_empty(&osd->o_osd_lru));
1060
1034 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1061 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1035 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 1062 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
1036} 1063}
1037 1064
1065static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
1066 struct ceph_osd *osd)
1067{
1068 dout("%s %p\n", __func__, osd);
1069
1070 if (list_empty(&osd->o_requests) &&
1071 list_empty(&osd->o_linger_requests))
1072 __move_osd_to_lru(osdc, osd);
1073}
1074
1038static void __remove_osd_from_lru(struct ceph_osd *osd) 1075static void __remove_osd_from_lru(struct ceph_osd *osd)
1039{ 1076{
1040 dout("__remove_osd_from_lru %p\n", osd); 1077 dout("__remove_osd_from_lru %p\n", osd);
@@ -1175,6 +1212,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
1175 1212
1176 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1213 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1177 rb_erase(&req->r_node, &osdc->requests); 1214 rb_erase(&req->r_node, &osdc->requests);
1215 RB_CLEAR_NODE(&req->r_node);
1178 osdc->num_requests--; 1216 osdc->num_requests--;
1179 1217
1180 if (req->r_osd) { 1218 if (req->r_osd) {
@@ -1182,12 +1220,8 @@ static void __unregister_request(struct ceph_osd_client *osdc,
1182 ceph_msg_revoke(req->r_request); 1220 ceph_msg_revoke(req->r_request);
1183 1221
1184 list_del_init(&req->r_osd_item); 1222 list_del_init(&req->r_osd_item);
1185 if (list_empty(&req->r_osd->o_requests) && 1223 maybe_move_osd_to_lru(osdc, req->r_osd);
1186 list_empty(&req->r_osd->o_linger_requests)) { 1224 if (list_empty(&req->r_linger_osd_item))
1187 dout("moving osd to %p lru\n", req->r_osd);
1188 __move_osd_to_lru(osdc, req->r_osd);
1189 }
1190 if (list_empty(&req->r_linger_item))
1191 req->r_osd = NULL; 1225 req->r_osd = NULL;
1192 } 1226 }
1193 1227
@@ -1214,45 +1248,39 @@ static void __cancel_request(struct ceph_osd_request *req)
1214static void __register_linger_request(struct ceph_osd_client *osdc, 1248static void __register_linger_request(struct ceph_osd_client *osdc,
1215 struct ceph_osd_request *req) 1249 struct ceph_osd_request *req)
1216{ 1250{
1217 dout("__register_linger_request %p\n", req); 1251 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1252 WARN_ON(!req->r_linger);
1253
1218 ceph_osdc_get_request(req); 1254 ceph_osdc_get_request(req);
1219 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1255 list_add_tail(&req->r_linger_item, &osdc->req_linger);
1220 if (req->r_osd) 1256 if (req->r_osd)
1221 list_add_tail(&req->r_linger_osd, 1257 list_add_tail(&req->r_linger_osd_item,
1222 &req->r_osd->o_linger_requests); 1258 &req->r_osd->o_linger_requests);
1223} 1259}
1224 1260
1225static void __unregister_linger_request(struct ceph_osd_client *osdc, 1261static void __unregister_linger_request(struct ceph_osd_client *osdc,
1226 struct ceph_osd_request *req) 1262 struct ceph_osd_request *req)
1227{ 1263{
1228 dout("__unregister_linger_request %p\n", req); 1264 WARN_ON(!req->r_linger);
1265
1266 if (list_empty(&req->r_linger_item)) {
1267 dout("%s %p tid %llu not registered\n", __func__, req,
1268 req->r_tid);
1269 return;
1270 }
1271
1272 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1229 list_del_init(&req->r_linger_item); 1273 list_del_init(&req->r_linger_item);
1230 if (req->r_osd) {
1231 list_del_init(&req->r_linger_osd);
1232 1274
1233 if (list_empty(&req->r_osd->o_requests) && 1275 if (req->r_osd) {
1234 list_empty(&req->r_osd->o_linger_requests)) { 1276 list_del_init(&req->r_linger_osd_item);
1235 dout("moving osd to %p lru\n", req->r_osd); 1277 maybe_move_osd_to_lru(osdc, req->r_osd);
1236 __move_osd_to_lru(osdc, req->r_osd);
1237 }
1238 if (list_empty(&req->r_osd_item)) 1278 if (list_empty(&req->r_osd_item))
1239 req->r_osd = NULL; 1279 req->r_osd = NULL;
1240 } 1280 }
1241 ceph_osdc_put_request(req); 1281 ceph_osdc_put_request(req);
1242} 1282}
1243 1283
1244void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
1245 struct ceph_osd_request *req)
1246{
1247 mutex_lock(&osdc->request_mutex);
1248 if (req->r_linger) {
1249 req->r_linger = 0;
1250 __unregister_linger_request(osdc, req);
1251 }
1252 mutex_unlock(&osdc->request_mutex);
1253}
1254EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
1255
1256void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1284void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1257 struct ceph_osd_request *req) 1285 struct ceph_osd_request *req)
1258{ 1286{
@@ -2430,6 +2458,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2430EXPORT_SYMBOL(ceph_osdc_start_request); 2458EXPORT_SYMBOL(ceph_osdc_start_request);
2431 2459
2432/* 2460/*
2461 * Unregister a registered request. The request is not completed (i.e.
2462 * no callbacks or wakeups) - higher layers are supposed to know what
2463 * they are canceling.
2464 */
2465void ceph_osdc_cancel_request(struct ceph_osd_request *req)
2466{
2467 struct ceph_osd_client *osdc = req->r_osdc;
2468
2469 mutex_lock(&osdc->request_mutex);
2470 if (req->r_linger)
2471 __unregister_linger_request(osdc, req);
2472 __unregister_request(osdc, req);
2473 mutex_unlock(&osdc->request_mutex);
2474
2475 dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
2476}
2477EXPORT_SYMBOL(ceph_osdc_cancel_request);
2478
2479/*
2433 * wait for a request to complete 2480 * wait for a request to complete
2434 */ 2481 */
2435int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2482int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
@@ -2437,18 +2484,18 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
2437{ 2484{
2438 int rc; 2485 int rc;
2439 2486
2487 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
2488
2440 rc = wait_for_completion_interruptible(&req->r_completion); 2489 rc = wait_for_completion_interruptible(&req->r_completion);
2441 if (rc < 0) { 2490 if (rc < 0) {
2442 mutex_lock(&osdc->request_mutex); 2491 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
2443 __cancel_request(req); 2492 ceph_osdc_cancel_request(req);
2444 __unregister_request(osdc, req);
2445 mutex_unlock(&osdc->request_mutex);
2446 complete_request(req); 2493 complete_request(req);
2447 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
2448 return rc; 2494 return rc;
2449 } 2495 }
2450 2496
2451 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 2497 dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
2498 req->r_result);
2452 return req->r_result; 2499 return req->r_result;
2453} 2500}
2454EXPORT_SYMBOL(ceph_osdc_wait_request); 2501EXPORT_SYMBOL(ceph_osdc_wait_request);