aboutsummaryrefslogtreecommitdiffstats
path: root/drivers
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2011-03-22 19:25:25 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2011-03-22 19:25:25 -0400
commit0adfc56ce8fdc5c17630434e49f30536ba7b8559 (patch)
treeed63f34e74998a8a1550d4af61b3178e68a5d60d /drivers
parentf23eb2b2b28547fc70df82dd5049eb39bec5ba12 (diff)
parent59c2be1e4d42c0d4949cecdeef3f37070a1fbc13 (diff)
Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
* git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: rbd: use watch/notify for changes in rbd header libceph: add lingering request and watch/notify event framework rbd: update email address in Documentation ceph: rename dentry_release -> d_release, fix comment ceph: add request to the tail of unsafe write list ceph: remove request from unsafe list if it is canceled/timed out ceph: move readahead default to fs/ceph from libceph ceph: add ino32 mount option ceph: update common header files ceph: remove debugfs debug cruft libceph: fix osd request queuing on osdmap updates ceph: preserve I_COMPLETE across rename libceph: Fix base64-decoding when input ends in newline.
Diffstat (limited to 'drivers')
-rw-r--r--drivers/block/rbd.c361
1 files changed, 335 insertions, 26 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e1e38b11f48a..16dc3645291c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -31,6 +31,7 @@
31#include <linux/ceph/osd_client.h> 31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h> 32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h> 33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
34 35
35#include <linux/kernel.h> 36#include <linux/kernel.h>
36#include <linux/device.h> 37#include <linux/device.h>
@@ -54,6 +55,8 @@
54 55
55#define DEV_NAME_LEN 32 56#define DEV_NAME_LEN 32
56 57
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
57/* 60/*
58 * block device image metadata (in-memory version) 61 * block device image metadata (in-memory version)
59 */ 62 */
@@ -71,6 +74,12 @@ struct rbd_image_header {
71 74
72 char *snap_names; 75 char *snap_names;
73 u64 *snap_sizes; 76 u64 *snap_sizes;
77
78 u64 obj_version;
79};
80
81struct rbd_options {
82 int notify_timeout;
74}; 83};
75 84
76/* 85/*
@@ -78,6 +87,7 @@ struct rbd_image_header {
78 */ 87 */
79struct rbd_client { 88struct rbd_client {
80 struct ceph_client *client; 89 struct ceph_client *client;
90 struct rbd_options *rbd_opts;
81 struct kref kref; 91 struct kref kref;
82 struct list_head node; 92 struct list_head node;
83}; 93};
@@ -124,6 +134,9 @@ struct rbd_device {
124 char pool_name[RBD_MAX_POOL_NAME_LEN]; 134 char pool_name[RBD_MAX_POOL_NAME_LEN];
125 int poolid; 135 int poolid;
126 136
137 struct ceph_osd_event *watch_event;
138 struct ceph_osd_request *watch_request;
139
127 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 140 char snap_name[RBD_MAX_SNAP_NAME_LEN];
128 u32 cur_snap; /* index+1 of current snapshot within snap context 141 u32 cur_snap; /* index+1 of current snapshot within snap context
129 0 - for the head */ 142 0 - for the head */
@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
177 put_device(&rbd_dev->dev); 190 put_device(&rbd_dev->dev);
178} 191}
179 192
193static int __rbd_update_snaps(struct rbd_device *rbd_dev);
194
180static int rbd_open(struct block_device *bdev, fmode_t mode) 195static int rbd_open(struct block_device *bdev, fmode_t mode)
181{ 196{
182 struct gendisk *disk = bdev->bd_disk; 197 struct gendisk *disk = bdev->bd_disk;
@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = {
211 * Initialize an rbd client instance. 226 * Initialize an rbd client instance.
212 * We own *opt. 227 * We own *opt.
213 */ 228 */
214static struct rbd_client *rbd_client_create(struct ceph_options *opt) 229static struct rbd_client *rbd_client_create(struct ceph_options *opt,
230 struct rbd_options *rbd_opts)
215{ 231{
216 struct rbd_client *rbdc; 232 struct rbd_client *rbdc;
217 int ret = -ENOMEM; 233 int ret = -ENOMEM;
@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt)
233 if (ret < 0) 249 if (ret < 0)
234 goto out_err; 250 goto out_err;
235 251
252 rbdc->rbd_opts = rbd_opts;
253
236 spin_lock(&node_lock); 254 spin_lock(&node_lock);
237 list_add_tail(&rbdc->node, &rbd_client_list); 255 list_add_tail(&rbdc->node, &rbd_client_list);
238 spin_unlock(&node_lock); 256 spin_unlock(&node_lock);
@@ -267,6 +285,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
267} 285}
268 286
269/* 287/*
288 * mount options
289 */
290enum {
291 Opt_notify_timeout,
292 Opt_last_int,
293 /* int args above */
294 Opt_last_string,
295 /* string args above */
296};
297
298static match_table_t rbdopt_tokens = {
299 {Opt_notify_timeout, "notify_timeout=%d"},
300 /* int args above */
301 /* string args above */
302 {-1, NULL}
303};
304
305static int parse_rbd_opts_token(char *c, void *private)
306{
307 struct rbd_options *rbdopt = private;
308 substring_t argstr[MAX_OPT_ARGS];
309 int token, intval, ret;
310
311 token = match_token((char *)c, rbdopt_tokens, argstr);
312 if (token < 0)
313 return -EINVAL;
314
315 if (token < Opt_last_int) {
316 ret = match_int(&argstr[0], &intval);
317 if (ret < 0) {
318 pr_err("bad mount option arg (not int) "
319 "at '%s'\n", c);
320 return ret;
321 }
322 dout("got int token %d val %d\n", token, intval);
323 } else if (token > Opt_last_int && token < Opt_last_string) {
324 dout("got string token %d val %s\n", token,
325 argstr[0].from);
326 } else {
327 dout("got token %d\n", token);
328 }
329
330 switch (token) {
331 case Opt_notify_timeout:
332 rbdopt->notify_timeout = intval;
333 break;
334 default:
335 BUG_ON(token);
336 }
337 return 0;
338}
339
340/*
270 * Get a ceph client with specific addr and configuration, if one does 341 * Get a ceph client with specific addr and configuration, if one does
271 * not exist create it. 342 * not exist create it.
272 */ 343 */
@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
276 struct rbd_client *rbdc; 347 struct rbd_client *rbdc;
277 struct ceph_options *opt; 348 struct ceph_options *opt;
278 int ret; 349 int ret;
350 struct rbd_options *rbd_opts;
351
352 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
353 if (!rbd_opts)
354 return -ENOMEM;
355
356 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
279 357
280 ret = ceph_parse_options(&opt, options, mon_addr, 358 ret = ceph_parse_options(&opt, options, mon_addr,
281 mon_addr + strlen(mon_addr), NULL, NULL); 359 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
282 if (ret < 0) 360 if (ret < 0)
283 return ret; 361 goto done_err;
284 362
285 spin_lock(&node_lock); 363 spin_lock(&node_lock);
286 rbdc = __rbd_client_find(opt); 364 rbdc = __rbd_client_find(opt);
@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
296 } 374 }
297 spin_unlock(&node_lock); 375 spin_unlock(&node_lock);
298 376
299 rbdc = rbd_client_create(opt); 377 rbdc = rbd_client_create(opt, rbd_opts);
300 if (IS_ERR(rbdc)) 378 if (IS_ERR(rbdc)) {
301 return PTR_ERR(rbdc); 379 ret = PTR_ERR(rbdc);
380 goto done_err;
381 }
302 382
303 rbd_dev->rbd_client = rbdc; 383 rbd_dev->rbd_client = rbdc;
304 rbd_dev->client = rbdc->client; 384 rbd_dev->client = rbdc->client;
305 return 0; 385 return 0;
386done_err:
387 kfree(rbd_opts);
388 return ret;
306} 389}
307 390
308/* 391/*
@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref)
318 spin_unlock(&node_lock); 401 spin_unlock(&node_lock);
319 402
320 ceph_destroy_client(rbdc->client); 403 ceph_destroy_client(rbdc->client);
404 kfree(rbdc->rbd_opts);
321 kfree(rbdc); 405 kfree(rbdc);
322} 406}
323 407
@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq,
666 struct ceph_osd_req_op *ops, 750 struct ceph_osd_req_op *ops,
667 int num_reply, 751 int num_reply,
668 void (*rbd_cb)(struct ceph_osd_request *req, 752 void (*rbd_cb)(struct ceph_osd_request *req,
669 struct ceph_msg *msg)) 753 struct ceph_msg *msg),
754 struct ceph_osd_request **linger_req,
755 u64 *ver)
670{ 756{
671 struct ceph_osd_request *req; 757 struct ceph_osd_request *req;
672 struct ceph_file_layout *layout; 758 struct ceph_file_layout *layout;
@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq,
729 req->r_oid, req->r_oid_len); 815 req->r_oid, req->r_oid_len);
730 up_read(&header->snap_rwsem); 816 up_read(&header->snap_rwsem);
731 817
818 if (linger_req) {
819 ceph_osdc_set_request_linger(&dev->client->osdc, req);
820 *linger_req = req;
821 }
822
732 ret = ceph_osdc_start_request(&dev->client->osdc, req, false); 823 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
733 if (ret < 0) 824 if (ret < 0)
734 goto done_err; 825 goto done_err;
735 826
736 if (!rbd_cb) { 827 if (!rbd_cb) {
737 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 828 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829 if (ver)
830 *ver = le64_to_cpu(req->r_reassert_version.version);
831 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
738 ceph_osdc_put_request(req); 832 ceph_osdc_put_request(req);
739 } 833 }
740 return ret; 834 return ret;
@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
789 kfree(req_data); 883 kfree(req_data);
790} 884}
791 885
886static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
887{
888 ceph_osdc_put_request(req);
889}
890
792/* 891/*
793 * Do a synchronous ceph osd operation 892 * Do a synchronous ceph osd operation
794 */ 893 */
@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev,
801 int num_reply, 900 int num_reply,
802 const char *obj, 901 const char *obj,
803 u64 ofs, u64 len, 902 u64 ofs, u64 len,
804 char *buf) 903 char *buf,
904 struct ceph_osd_request **linger_req,
905 u64 *ver)
805{ 906{
806 int ret; 907 int ret;
807 struct page **pages; 908 struct page **pages;
@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev,
833 flags, 934 flags,
834 ops, 935 ops,
835 2, 936 2,
836 NULL); 937 NULL,
938 linger_req, ver);
837 if (ret < 0) 939 if (ret < 0)
838 goto done_ops; 940 goto done_ops;
839 941
@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq,
893 flags, 995 flags,
894 ops, 996 ops,
895 num_reply, 997 num_reply,
896 rbd_req_cb); 998 rbd_req_cb, 0, NULL);
897done: 999done:
898 kfree(seg_name); 1000 kfree(seg_name);
899 return ret; 1001 return ret;
@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev,
940 u64 snapid, 1042 u64 snapid,
941 const char *obj, 1043 const char *obj,
942 u64 ofs, u64 len, 1044 u64 ofs, u64 len,
943 char *buf) 1045 char *buf,
1046 u64 *ver)
944{ 1047{
945 return rbd_req_sync_op(dev, NULL, 1048 return rbd_req_sync_op(dev, NULL,
946 (snapid ? snapid : CEPH_NOSNAP), 1049 (snapid ? snapid : CEPH_NOSNAP),
947 CEPH_OSD_OP_READ, 1050 CEPH_OSD_OP_READ,
948 CEPH_OSD_FLAG_READ, 1051 CEPH_OSD_FLAG_READ,
949 NULL, 1052 NULL,
950 1, obj, ofs, len, buf); 1053 1, obj, ofs, len, buf, NULL, ver);
951} 1054}
952 1055
953/* 1056/*
954 * Request sync osd read 1057 * Request sync osd watch
1058 */
1059static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1060 u64 ver,
1061 u64 notify_id,
1062 const char *obj)
1063{
1064 struct ceph_osd_req_op *ops;
1065 struct page **pages = NULL;
1066 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1067 if (ret < 0)
1068 return ret;
1069
1070 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1071 ops[0].watch.cookie = notify_id;
1072 ops[0].watch.flag = 0;
1073
1074 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1075 obj, 0, 0, NULL,
1076 pages, 0,
1077 CEPH_OSD_FLAG_READ,
1078 ops,
1079 1,
1080 rbd_simple_req_cb, 0, NULL);
1081
1082 rbd_destroy_ops(ops);
1083 return ret;
1084}
1085
1086static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1087{
1088 struct rbd_device *dev = (struct rbd_device *)data;
1089 if (!dev)
1090 return;
1091
1092 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1093 notify_id, (int)opcode);
1094 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1095 __rbd_update_snaps(dev);
1096 mutex_unlock(&ctl_mutex);
1097
1098 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1099}
1100
1101/*
1102 * Request sync osd watch
1103 */
1104static int rbd_req_sync_watch(struct rbd_device *dev,
1105 const char *obj,
1106 u64 ver)
1107{
1108 struct ceph_osd_req_op *ops;
1109 struct ceph_osd_client *osdc = &dev->client->osdc;
1110
1111 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1112 if (ret < 0)
1113 return ret;
1114
1115 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1116 (void *)dev, &dev->watch_event);
1117 if (ret < 0)
1118 goto fail;
1119
1120 ops[0].watch.ver = cpu_to_le64(ver);
1121 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1122 ops[0].watch.flag = 1;
1123
1124 ret = rbd_req_sync_op(dev, NULL,
1125 CEPH_NOSNAP,
1126 0,
1127 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1128 ops,
1129 1, obj, 0, 0, NULL,
1130 &dev->watch_request, NULL);
1131
1132 if (ret < 0)
1133 goto fail_event;
1134
1135 rbd_destroy_ops(ops);
1136 return 0;
1137
1138fail_event:
1139 ceph_osdc_cancel_event(dev->watch_event);
1140 dev->watch_event = NULL;
1141fail:
1142 rbd_destroy_ops(ops);
1143 return ret;
1144}
1145
1146struct rbd_notify_info {
1147 struct rbd_device *dev;
1148};
1149
1150static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1151{
1152 struct rbd_device *dev = (struct rbd_device *)data;
1153 if (!dev)
1154 return;
1155
1156 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1157 notify_id, (int)opcode);
1158}
1159
1160/*
1161 * Request sync osd notify
1162 */
1163static int rbd_req_sync_notify(struct rbd_device *dev,
1164 const char *obj)
1165{
1166 struct ceph_osd_req_op *ops;
1167 struct ceph_osd_client *osdc = &dev->client->osdc;
1168 struct ceph_osd_event *event;
1169 struct rbd_notify_info info;
1170 int payload_len = sizeof(u32) + sizeof(u32);
1171 int ret;
1172
1173 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1174 if (ret < 0)
1175 return ret;
1176
1177 info.dev = dev;
1178
1179 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1180 (void *)&info, &event);
1181 if (ret < 0)
1182 goto fail;
1183
1184 ops[0].watch.ver = 1;
1185 ops[0].watch.flag = 1;
1186 ops[0].watch.cookie = event->cookie;
1187 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1188 ops[0].watch.timeout = 12;
1189
1190 ret = rbd_req_sync_op(dev, NULL,
1191 CEPH_NOSNAP,
1192 0,
1193 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1194 ops,
1195 1, obj, 0, 0, NULL, NULL, NULL);
1196 if (ret < 0)
1197 goto fail_event;
1198
1199 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1200 dout("ceph_osdc_wait_event returned %d\n", ret);
1201 rbd_destroy_ops(ops);
1202 return 0;
1203
1204fail_event:
1205 ceph_osdc_cancel_event(event);
1206fail:
1207 rbd_destroy_ops(ops);
1208 return ret;
1209}
1210
1211/*
1212 * Request sync osd rollback
955 */ 1213 */
956static int rbd_req_sync_rollback_obj(struct rbd_device *dev, 1214static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
957 u64 snapid, 1215 u64 snapid,
@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
969 0, 1227 0,
970 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1228 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
971 ops, 1229 ops,
972 1, obj, 0, 0, NULL); 1230 1, obj, 0, 0, NULL, NULL, NULL);
973 1231
974 rbd_destroy_ops(ops); 1232 rbd_destroy_ops(ops);
975 1233
976 if (ret < 0)
977 return ret;
978
979 return ret; 1234 return ret;
980} 1235}
981 1236
@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
987 const char *cls, 1242 const char *cls,
988 const char *method, 1243 const char *method,
989 const char *data, 1244 const char *data,
990 int len) 1245 int len,
1246 u64 *ver)
991{ 1247{
992 struct ceph_osd_req_op *ops; 1248 struct ceph_osd_req_op *ops;
993 int cls_len = strlen(cls); 1249 int cls_len = strlen(cls);
@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
1010 0, 1266 0,
1011 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1267 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1012 ops, 1268 ops,
1013 1, obj, 0, 0, NULL); 1269 1, obj, 0, 0, NULL, NULL, ver);
1014 1270
1015 rbd_destroy_ops(ops); 1271 rbd_destroy_ops(ops);
1016 1272
@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1156 struct rbd_image_header_ondisk *dh; 1412 struct rbd_image_header_ondisk *dh;
1157 int snap_count = 0; 1413 int snap_count = 0;
1158 u64 snap_names_len = 0; 1414 u64 snap_names_len = 0;
1415 u64 ver;
1159 1416
1160 while (1) { 1417 while (1) {
1161 int len = sizeof(*dh) + 1418 int len = sizeof(*dh) +
@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1171 NULL, CEPH_NOSNAP, 1428 NULL, CEPH_NOSNAP,
1172 rbd_dev->obj_md_name, 1429 rbd_dev->obj_md_name,
1173 0, len, 1430 0, len,
1174 (char *)dh); 1431 (char *)dh, &ver);
1175 if (rc < 0) 1432 if (rc < 0)
1176 goto out_dh; 1433 goto out_dh;
1177 1434
@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1188 } 1445 }
1189 break; 1446 break;
1190 } 1447 }
1448 header->obj_version = ver;
1191 1449
1192out_dh: 1450out_dh:
1193 kfree(dh); 1451 kfree(dh);
@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1205 u64 new_snapid; 1463 u64 new_snapid;
1206 int ret; 1464 int ret;
1207 void *data, *data_start, *data_end; 1465 void *data, *data_start, *data_end;
1466 u64 ver;
1208 1467
1209 /* we should create a snapshot only if we're pointing at the head */ 1468 /* we should create a snapshot only if we're pointing at the head */
1210 if (dev->cur_snap) 1469 if (dev->cur_snap)
@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1227 ceph_encode_64_safe(&data, data_end, new_snapid, bad); 1486 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1228 1487
1229 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", 1488 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1230 data_start, data - data_start); 1489 data_start, data - data_start, &ver);
1231 1490
1232 kfree(data_start); 1491 kfree(data_start);
1233 1492
@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1259 int ret; 1518 int ret;
1260 struct rbd_image_header h; 1519 struct rbd_image_header h;
1261 u64 snap_seq; 1520 u64 snap_seq;
1521 int follow_seq = 0;
1262 1522
1263 ret = rbd_read_header(rbd_dev, &h); 1523 ret = rbd_read_header(rbd_dev, &h);
1264 if (ret < 0) 1524 if (ret < 0)
@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1267 down_write(&rbd_dev->header.snap_rwsem); 1527 down_write(&rbd_dev->header.snap_rwsem);
1268 1528
1269 snap_seq = rbd_dev->header.snapc->seq; 1529 snap_seq = rbd_dev->header.snapc->seq;
1530 if (rbd_dev->header.total_snaps &&
1531 rbd_dev->header.snapc->snaps[0] == snap_seq)
1532 /* pointing at the head, will need to follow that
1533 if head moves */
1534 follow_seq = 1;
1270 1535
1271 kfree(rbd_dev->header.snapc); 1536 kfree(rbd_dev->header.snapc);
1272 kfree(rbd_dev->header.snap_names); 1537 kfree(rbd_dev->header.snap_names);
@@ -1277,7 +1542,10 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1277 rbd_dev->header.snap_names = h.snap_names; 1542 rbd_dev->header.snap_names = h.snap_names;
1278 rbd_dev->header.snap_names_len = h.snap_names_len; 1543 rbd_dev->header.snap_names_len = h.snap_names_len;
1279 rbd_dev->header.snap_sizes = h.snap_sizes; 1544 rbd_dev->header.snap_sizes = h.snap_sizes;
1280 rbd_dev->header.snapc->seq = snap_seq; 1545 if (follow_seq)
1546 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1547 else
1548 rbd_dev->header.snapc->seq = snap_seq;
1281 1549
1282 ret = __rbd_init_snaps_header(rbd_dev); 1550 ret = __rbd_init_snaps_header(rbd_dev);
1283 1551
@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1699 device_unregister(&rbd_dev->dev); 1967 device_unregister(&rbd_dev->dev);
1700} 1968}
1701 1969
1702static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) 1970static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
1971{
1972 int ret, rc;
1973
1974 do {
1975 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
1976 rbd_dev->header.obj_version);
1977 if (ret == -ERANGE) {
1978 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1979 rc = __rbd_update_snaps(rbd_dev);
1980 mutex_unlock(&ctl_mutex);
1981 if (rc < 0)
1982 return rc;
1983 }
1984 } while (ret == -ERANGE);
1985
1986 return ret;
1987}
1988
1989static ssize_t rbd_add(struct bus_type *bus,
1990 const char *buf,
1991 size_t count)
1703{ 1992{
1704 struct ceph_osd_client *osdc; 1993 struct ceph_osd_client *osdc;
1705 struct rbd_device *rbd_dev; 1994 struct rbd_device *rbd_dev;
@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
1797 if (rc) 2086 if (rc)
1798 goto err_out_bus; 2087 goto err_out_bus;
1799 2088
2089 rc = rbd_init_watch_dev(rbd_dev);
2090 if (rc)
2091 goto err_out_bus;
2092
1800 return count; 2093 return count;
1801 2094
1802err_out_bus: 2095err_out_bus:
@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev)
1849 struct rbd_device *rbd_dev = 2142 struct rbd_device *rbd_dev =
1850 container_of(dev, struct rbd_device, dev); 2143 container_of(dev, struct rbd_device, dev);
1851 2144
2145 if (rbd_dev->watch_request)
2146 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2147 rbd_dev->watch_request);
2148 if (rbd_dev->watch_event)
2149 ceph_osdc_cancel_event(rbd_dev->watch_event);
2150
1852 rbd_put_client(rbd_dev); 2151 rbd_put_client(rbd_dev);
1853 2152
1854 /* clean up and free blkdev */ 2153 /* clean up and free blkdev */
@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev,
1914 ret = rbd_header_add_snap(rbd_dev, 2213 ret = rbd_header_add_snap(rbd_dev,
1915 name, GFP_KERNEL); 2214 name, GFP_KERNEL);
1916 if (ret < 0) 2215 if (ret < 0)
1917 goto done_unlock; 2216 goto err_unlock;
1918 2217
1919 ret = __rbd_update_snaps(rbd_dev); 2218 ret = __rbd_update_snaps(rbd_dev);
1920 if (ret < 0) 2219 if (ret < 0)
1921 goto done_unlock; 2220 goto err_unlock;
2221
2222 /* shouldn't hold ctl_mutex when notifying.. notify might
2223 trigger a watch callback that would need to get that mutex */
2224 mutex_unlock(&ctl_mutex);
2225
2226 /* make a best effort, don't error if failed */
2227 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
1922 2228
1923 ret = count; 2229 ret = count;
1924done_unlock: 2230 kfree(name);
2231 return ret;
2232
2233err_unlock:
1925 mutex_unlock(&ctl_mutex); 2234 mutex_unlock(&ctl_mutex);
1926 kfree(name); 2235 kfree(name);
1927 return ret; 2236 return ret;