aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-07-31 17:35:28 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-07-31 17:35:28 -0400
commitcc8362b1f6d724e46f515121d442779924b19fec (patch)
tree86fb5c3767e538ec9ded57dd7b3ce5d69dcde691
parent2e3ee613480563a6d5c01b57d342e65cc58c06df (diff)
parent1fe5e9932156f6122c3b1ff6ba7541c27c86718c (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "Lots of stuff this time around: - lots of cleanup and refactoring in the libceph messenger code, and many hard to hit races and bugs closed as a result. - lots of cleanup and refactoring in the rbd code from Alex Elder, mostly in preparation for the layering functionality that will be coming in 3.7. - some misc rbd cleanups from Josh Durgin that are finally going upstream - support for CRUSH tunables (used by newer clusters to improve the data placement) - some cleanup in our use of d_parent that Al brought up a while back - a random collection of fixes across the tree There is another patch coming that fixes up our ->atomic_open() behavior, but I'm going to hammer on it a bit more before sending it." Fix up conflicts due to commits that were already committed earlier in drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c} * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits) rbd: create rbd_refresh_helper() rbd: return obj version in __rbd_refresh_header() rbd: fixes in rbd_header_from_disk() rbd: always pass ops array to rbd_req_sync_op() rbd: pass null version pointer in add_snap() rbd: make rbd_create_rw_ops() return a pointer rbd: have __rbd_add_snap_dev() return a pointer libceph: recheck con state after allocating incoming message libceph: change ceph_con_in_msg_alloc convention to be less weird libceph: avoid dropping con mutex before fault libceph: verify state after retaking con lock after dispatch libceph: revoke mon_client messages on session restart libceph: fix handling of immediate socket connect failure ceph: update MAINTAINERS file libceph: be less chatty about stray replies libceph: clear all flags on con_close libceph: clean up con flags libceph: replace connection state bits with states libceph: drop unnecessary CLOSED check in socket state change callback libceph: close socket directly from ceph_con_close() ...
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd10
-rw-r--r--MAINTAINERS13
-rw-r--r--drivers/block/rbd.c816
-rw-r--r--drivers/block/rbd_types.h1
-rw-r--r--fs/ceph/dir.c7
-rw-r--r--fs/ceph/mds_client.c23
-rw-r--r--fs/ceph/snap.c18
-rw-r--r--fs/ceph/super.c1
-rw-r--r--fs/ceph/super.h4
-rw-r--r--fs/ceph/xattr.c1
-rw-r--r--include/linux/ceph/ceph_features.h27
-rw-r--r--include/linux/ceph/ceph_fs.h14
-rw-r--r--include/linux/ceph/decode.h49
-rw-r--r--include/linux/ceph/libceph.h10
-rw-r--r--include/linux/ceph/messenger.h60
-rw-r--r--include/linux/ceph/mon_client.h2
-rw-r--r--include/linux/ceph/msgpool.h3
-rw-r--r--include/linux/crush/crush.h8
-rw-r--r--net/ceph/ceph_common.c25
-rw-r--r--net/ceph/crush/mapper.c13
-rw-r--r--net/ceph/messenger.c925
-rw-r--r--net/ceph/mon_client.c76
-rw-r--r--net/ceph/msgpool.c7
-rw-r--r--net/ceph/osd_client.c77
-rw-r--r--net/ceph/osdmap.c59
25 files changed, 1351 insertions, 898 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index bcd88eb7ebcd..3c17b62899f6 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -35,8 +35,14 @@ name
35 35
36pool 36pool
37 37
38 The pool where this rbd image resides. The pool-name pair is unique 38 The name of the storage pool where this rbd image resides.
39 per rados system. 39 An rbd image name is unique within its pool.
40
41pool_id
42
43 The unique identifier for the rbd image's pool. This is
44 a permanent attribute of the pool. A pool's id will never
45 change.
40 46
41size 47size
42 48
diff --git a/MAINTAINERS b/MAINTAINERS
index fb036a062a5d..5b44872b64ec 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1789,15 +1789,16 @@ F: arch/powerpc/oprofile/*cell*
1789F: arch/powerpc/platforms/cell/ 1789F: arch/powerpc/platforms/cell/
1790 1790
1791CEPH DISTRIBUTED FILE SYSTEM CLIENT 1791CEPH DISTRIBUTED FILE SYSTEM CLIENT
1792M: Sage Weil <sage@newdream.net> 1792M: Sage Weil <sage@inktank.com>
1793L: ceph-devel@vger.kernel.org 1793L: ceph-devel@vger.kernel.org
1794W: http://ceph.newdream.net/ 1794W: http://ceph.com/
1795T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git 1795T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
1796S: Supported 1796S: Supported
1797F: Documentation/filesystems/ceph.txt 1797F: Documentation/filesystems/ceph.txt
1798F: fs/ceph 1798F: fs/ceph
1799F: net/ceph 1799F: net/ceph
1800F: include/linux/ceph 1800F: include/linux/ceph
1801F: include/linux/crush
1801 1802
1802CERTIFIED WIRELESS USB (WUSB) SUBSYSTEM: 1803CERTIFIED WIRELESS USB (WUSB) SUBSYSTEM:
1803L: linux-usb@vger.kernel.org 1804L: linux-usb@vger.kernel.org
@@ -5639,10 +5640,12 @@ S: Supported
5639F: arch/hexagon/ 5640F: arch/hexagon/
5640 5641
5641RADOS BLOCK DEVICE (RBD) 5642RADOS BLOCK DEVICE (RBD)
5642F: include/linux/qnxtypes.h 5643M: Yehuda Sadeh <yehuda@inktank.com>
5643M: Yehuda Sadeh <yehuda@hq.newdream.net> 5644M: Sage Weil <sage@inktank.com>
5644M: Sage Weil <sage@newdream.net> 5645M: Alex Elder <elder@inktank.com>
5645M: ceph-devel@vger.kernel.org 5646M: ceph-devel@vger.kernel.org
5647W: http://ceph.com/
5648T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
5646S: Supported 5649S: Supported
5647F: drivers/block/rbd.c 5650F: drivers/block/rbd.c
5648F: drivers/block/rbd_types.h 5651F: drivers/block/rbd_types.h
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8f428a8ab003..9917943a3572 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -55,8 +55,6 @@
55 55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57 57
58#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59#define RBD_MAX_POOL_NAME_LEN 64
60#define RBD_MAX_SNAP_NAME_LEN 32 58#define RBD_MAX_SNAP_NAME_LEN 32
61#define RBD_MAX_OPT_LEN 1024 59#define RBD_MAX_OPT_LEN 1024
62 60
@@ -78,13 +76,12 @@
78 */ 76 */
79struct rbd_image_header { 77struct rbd_image_header {
80 u64 image_size; 78 u64 image_size;
81 char block_name[32]; 79 char *object_prefix;
82 __u8 obj_order; 80 __u8 obj_order;
83 __u8 crypt_type; 81 __u8 crypt_type;
84 __u8 comp_type; 82 __u8 comp_type;
85 struct ceph_snap_context *snapc; 83 struct ceph_snap_context *snapc;
86 size_t snap_names_len; 84 size_t snap_names_len;
87 u64 snap_seq;
88 u32 total_snaps; 85 u32 total_snaps;
89 86
90 char *snap_names; 87 char *snap_names;
@@ -150,7 +147,7 @@ struct rbd_snap {
150 * a single device 147 * a single device
151 */ 148 */
152struct rbd_device { 149struct rbd_device {
153 int id; /* blkdev unique id */ 150 int dev_id; /* blkdev unique id */
154 151
155 int major; /* blkdev assigned major */ 152 int major; /* blkdev assigned major */
156 struct gendisk *disk; /* blkdev's gendisk and rq */ 153 struct gendisk *disk; /* blkdev's gendisk and rq */
@@ -163,20 +160,24 @@ struct rbd_device {
163 spinlock_t lock; /* queue lock */ 160 spinlock_t lock; /* queue lock */
164 161
165 struct rbd_image_header header; 162 struct rbd_image_header header;
166 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */ 163 char *image_name;
167 int obj_len; 164 size_t image_name_len;
168 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */ 165 char *header_name;
169 char pool_name[RBD_MAX_POOL_NAME_LEN]; 166 char *pool_name;
170 int poolid; 167 int pool_id;
171 168
172 struct ceph_osd_event *watch_event; 169 struct ceph_osd_event *watch_event;
173 struct ceph_osd_request *watch_request; 170 struct ceph_osd_request *watch_request;
174 171
175 /* protects updating the header */ 172 /* protects updating the header */
176 struct rw_semaphore header_rwsem; 173 struct rw_semaphore header_rwsem;
177 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 174 /* name of the snapshot this device reads from */
175 char *snap_name;
176 /* id of the snapshot this device reads from */
178 u64 snap_id; /* current snapshot id */ 177 u64 snap_id; /* current snapshot id */
179 int read_only; 178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
180 181
181 struct list_head node; 182 struct list_head node;
182 183
@@ -201,8 +202,7 @@ static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr, 202 struct device_attribute *attr,
202 const char *buf, 203 const char *buf,
203 size_t count); 204 size_t count);
204static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 205static void __rbd_remove_snap_dev(struct rbd_snap *snap);
205 struct rbd_snap *snap);
206 206
207static ssize_t rbd_add(struct bus_type *bus, const char *buf, 207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 size_t count); 208 size_t count);
@@ -240,7 +240,7 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev); 240 put_device(&rbd_dev->dev);
241} 241}
242 242
243static int __rbd_refresh_header(struct rbd_device *rbd_dev); 243static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244 244
245static int rbd_open(struct block_device *bdev, fmode_t mode) 245static int rbd_open(struct block_device *bdev, fmode_t mode)
246{ 246{
@@ -273,9 +273,9 @@ static const struct block_device_operations rbd_bd_ops = {
273 273
274/* 274/*
275 * Initialize an rbd client instance. 275 * Initialize an rbd client instance.
276 * We own *opt. 276 * We own *ceph_opts.
277 */ 277 */
278static struct rbd_client *rbd_client_create(struct ceph_options *opt, 278static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279 struct rbd_options *rbd_opts) 279 struct rbd_options *rbd_opts)
280{ 280{
281 struct rbd_client *rbdc; 281 struct rbd_client *rbdc;
@@ -291,10 +291,10 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
291 291
292 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 292 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 293
294 rbdc->client = ceph_create_client(opt, rbdc, 0, 0); 294 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295 if (IS_ERR(rbdc->client)) 295 if (IS_ERR(rbdc->client))
296 goto out_mutex; 296 goto out_mutex;
297 opt = NULL; /* Now rbdc->client is responsible for opt */ 297 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298 298
299 ret = ceph_open_session(rbdc->client); 299 ret = ceph_open_session(rbdc->client);
300 if (ret < 0) 300 if (ret < 0)
@@ -317,23 +317,23 @@ out_mutex:
317 mutex_unlock(&ctl_mutex); 317 mutex_unlock(&ctl_mutex);
318 kfree(rbdc); 318 kfree(rbdc);
319out_opt: 319out_opt:
320 if (opt) 320 if (ceph_opts)
321 ceph_destroy_options(opt); 321 ceph_destroy_options(ceph_opts);
322 return ERR_PTR(ret); 322 return ERR_PTR(ret);
323} 323}
324 324
325/* 325/*
326 * Find a ceph client with specific addr and configuration. 326 * Find a ceph client with specific addr and configuration.
327 */ 327 */
328static struct rbd_client *__rbd_client_find(struct ceph_options *opt) 328static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329{ 329{
330 struct rbd_client *client_node; 330 struct rbd_client *client_node;
331 331
332 if (opt->flags & CEPH_OPT_NOSHARE) 332 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333 return NULL; 333 return NULL;
334 334
335 list_for_each_entry(client_node, &rbd_client_list, node) 335 list_for_each_entry(client_node, &rbd_client_list, node)
336 if (ceph_compare_options(opt, client_node->client) == 0) 336 if (!ceph_compare_options(ceph_opts, client_node->client))
337 return client_node; 337 return client_node;
338 return NULL; 338 return NULL;
339} 339}
@@ -349,7 +349,7 @@ enum {
349 /* string args above */ 349 /* string args above */
350}; 350};
351 351
352static match_table_t rbdopt_tokens = { 352static match_table_t rbd_opts_tokens = {
353 {Opt_notify_timeout, "notify_timeout=%d"}, 353 {Opt_notify_timeout, "notify_timeout=%d"},
354 /* int args above */ 354 /* int args above */
355 /* string args above */ 355 /* string args above */
@@ -358,11 +358,11 @@ static match_table_t rbdopt_tokens = {
358 358
359static int parse_rbd_opts_token(char *c, void *private) 359static int parse_rbd_opts_token(char *c, void *private)
360{ 360{
361 struct rbd_options *rbdopt = private; 361 struct rbd_options *rbd_opts = private;
362 substring_t argstr[MAX_OPT_ARGS]; 362 substring_t argstr[MAX_OPT_ARGS];
363 int token, intval, ret; 363 int token, intval, ret;
364 364
365 token = match_token(c, rbdopt_tokens, argstr); 365 token = match_token(c, rbd_opts_tokens, argstr);
366 if (token < 0) 366 if (token < 0)
367 return -EINVAL; 367 return -EINVAL;
368 368
@@ -383,7 +383,7 @@ static int parse_rbd_opts_token(char *c, void *private)
383 383
384 switch (token) { 384 switch (token) {
385 case Opt_notify_timeout: 385 case Opt_notify_timeout:
386 rbdopt->notify_timeout = intval; 386 rbd_opts->notify_timeout = intval;
387 break; 387 break;
388 default: 388 default:
389 BUG_ON(token); 389 BUG_ON(token);
@@ -400,7 +400,7 @@ static struct rbd_client *rbd_get_client(const char *mon_addr,
400 char *options) 400 char *options)
401{ 401{
402 struct rbd_client *rbdc; 402 struct rbd_client *rbdc;
403 struct ceph_options *opt; 403 struct ceph_options *ceph_opts;
404 struct rbd_options *rbd_opts; 404 struct rbd_options *rbd_opts;
405 405
406 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); 406 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
@@ -409,29 +409,29 @@ static struct rbd_client *rbd_get_client(const char *mon_addr,
409 409
410 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; 410 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 411
412 opt = ceph_parse_options(options, mon_addr, 412 ceph_opts = ceph_parse_options(options, mon_addr,
413 mon_addr + mon_addr_len, 413 mon_addr + mon_addr_len,
414 parse_rbd_opts_token, rbd_opts); 414 parse_rbd_opts_token, rbd_opts);
415 if (IS_ERR(opt)) { 415 if (IS_ERR(ceph_opts)) {
416 kfree(rbd_opts); 416 kfree(rbd_opts);
417 return ERR_CAST(opt); 417 return ERR_CAST(ceph_opts);
418 } 418 }
419 419
420 spin_lock(&rbd_client_list_lock); 420 spin_lock(&rbd_client_list_lock);
421 rbdc = __rbd_client_find(opt); 421 rbdc = __rbd_client_find(ceph_opts);
422 if (rbdc) { 422 if (rbdc) {
423 /* using an existing client */ 423 /* using an existing client */
424 kref_get(&rbdc->kref); 424 kref_get(&rbdc->kref);
425 spin_unlock(&rbd_client_list_lock); 425 spin_unlock(&rbd_client_list_lock);
426 426
427 ceph_destroy_options(opt); 427 ceph_destroy_options(ceph_opts);
428 kfree(rbd_opts); 428 kfree(rbd_opts);
429 429
430 return rbdc; 430 return rbdc;
431 } 431 }
432 spin_unlock(&rbd_client_list_lock); 432 spin_unlock(&rbd_client_list_lock);
433 433
434 rbdc = rbd_client_create(opt, rbd_opts); 434 rbdc = rbd_client_create(ceph_opts, rbd_opts);
435 435
436 if (IS_ERR(rbdc)) 436 if (IS_ERR(rbdc))
437 kfree(rbd_opts); 437 kfree(rbd_opts);
@@ -480,46 +480,60 @@ static void rbd_coll_release(struct kref *kref)
480 kfree(coll); 480 kfree(coll);
481} 481}
482 482
483static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484{
485 return !memcmp(&ondisk->text,
486 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487}
488
483/* 489/*
484 * Create a new header structure, translate header format from the on-disk 490 * Create a new header structure, translate header format from the on-disk
485 * header. 491 * header.
486 */ 492 */
487static int rbd_header_from_disk(struct rbd_image_header *header, 493static int rbd_header_from_disk(struct rbd_image_header *header,
488 struct rbd_image_header_ondisk *ondisk, 494 struct rbd_image_header_ondisk *ondisk,
489 u32 allocated_snaps, 495 u32 allocated_snaps)
490 gfp_t gfp_flags)
491{ 496{
492 u32 i, snap_count; 497 u32 snap_count;
493 498
494 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) 499 if (!rbd_dev_ondisk_valid(ondisk))
495 return -ENXIO; 500 return -ENXIO;
496 501
497 snap_count = le32_to_cpu(ondisk->snap_count); 502 snap_count = le32_to_cpu(ondisk->snap_count);
498 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context)) 503 if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
499 / sizeof (*ondisk)) 504 / sizeof (u64))
500 return -EINVAL; 505 return -EINVAL;
501 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 506 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
502 snap_count * sizeof(u64), 507 snap_count * sizeof(u64),
503 gfp_flags); 508 GFP_KERNEL);
504 if (!header->snapc) 509 if (!header->snapc)
505 return -ENOMEM; 510 return -ENOMEM;
506 511
507 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
508 if (snap_count) { 512 if (snap_count) {
513 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
509 header->snap_names = kmalloc(header->snap_names_len, 514 header->snap_names = kmalloc(header->snap_names_len,
510 gfp_flags); 515 GFP_KERNEL);
511 if (!header->snap_names) 516 if (!header->snap_names)
512 goto err_snapc; 517 goto err_snapc;
513 header->snap_sizes = kmalloc(snap_count * sizeof(u64), 518 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
514 gfp_flags); 519 GFP_KERNEL);
515 if (!header->snap_sizes) 520 if (!header->snap_sizes)
516 goto err_names; 521 goto err_names;
517 } else { 522 } else {
523 WARN_ON(ondisk->snap_names_len);
524 header->snap_names_len = 0;
518 header->snap_names = NULL; 525 header->snap_names = NULL;
519 header->snap_sizes = NULL; 526 header->snap_sizes = NULL;
520 } 527 }
521 memcpy(header->block_name, ondisk->block_name, 528
529 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
530 GFP_KERNEL);
531 if (!header->object_prefix)
532 goto err_sizes;
533
534 memcpy(header->object_prefix, ondisk->block_name,
522 sizeof(ondisk->block_name)); 535 sizeof(ondisk->block_name));
536 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
523 537
524 header->image_size = le64_to_cpu(ondisk->image_size); 538 header->image_size = le64_to_cpu(ondisk->image_size);
525 header->obj_order = ondisk->options.order; 539 header->obj_order = ondisk->options.order;
@@ -527,11 +541,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
527 header->comp_type = ondisk->options.comp_type; 541 header->comp_type = ondisk->options.comp_type;
528 542
529 atomic_set(&header->snapc->nref, 1); 543 atomic_set(&header->snapc->nref, 1);
530 header->snap_seq = le64_to_cpu(ondisk->snap_seq); 544 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
531 header->snapc->num_snaps = snap_count; 545 header->snapc->num_snaps = snap_count;
532 header->total_snaps = snap_count; 546 header->total_snaps = snap_count;
533 547
534 if (snap_count && allocated_snaps == snap_count) { 548 if (snap_count && allocated_snaps == snap_count) {
549 int i;
550
535 for (i = 0; i < snap_count; i++) { 551 for (i = 0; i < snap_count; i++) {
536 header->snapc->snaps[i] = 552 header->snapc->snaps[i] =
537 le64_to_cpu(ondisk->snaps[i].id); 553 le64_to_cpu(ondisk->snaps[i].id);
@@ -540,16 +556,22 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
540 } 556 }
541 557
542 /* copy snapshot names */ 558 /* copy snapshot names */
543 memcpy(header->snap_names, &ondisk->snaps[i], 559 memcpy(header->snap_names, &ondisk->snaps[snap_count],
544 header->snap_names_len); 560 header->snap_names_len);
545 } 561 }
546 562
547 return 0; 563 return 0;
548 564
565err_sizes:
566 kfree(header->snap_sizes);
567 header->snap_sizes = NULL;
549err_names: 568err_names:
550 kfree(header->snap_names); 569 kfree(header->snap_names);
570 header->snap_names = NULL;
551err_snapc: 571err_snapc:
552 kfree(header->snapc); 572 kfree(header->snapc);
573 header->snapc = NULL;
574
553 return -ENOMEM; 575 return -ENOMEM;
554} 576}
555 577
@@ -575,52 +597,50 @@ static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
575 return -ENOENT; 597 return -ENOENT;
576} 598}
577 599
578static int rbd_header_set_snap(struct rbd_device *dev, u64 *size) 600static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
579{ 601{
580 struct rbd_image_header *header = &dev->header; 602 int ret;
581 struct ceph_snap_context *snapc = header->snapc;
582 int ret = -ENOENT;
583
584 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
585 603
586 down_write(&dev->header_rwsem); 604 down_write(&rbd_dev->header_rwsem);
587 605
588 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME, 606 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
589 sizeof (RBD_SNAP_HEAD_NAME))) { 607 sizeof (RBD_SNAP_HEAD_NAME))) {
590 if (header->total_snaps) 608 rbd_dev->snap_id = CEPH_NOSNAP;
591 snapc->seq = header->snap_seq; 609 rbd_dev->snap_exists = false;
592 else 610 rbd_dev->read_only = 0;
593 snapc->seq = 0;
594 dev->snap_id = CEPH_NOSNAP;
595 dev->read_only = 0;
596 if (size) 611 if (size)
597 *size = header->image_size; 612 *size = rbd_dev->header.image_size;
598 } else { 613 } else {
599 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size); 614 u64 snap_id = 0;
615
616 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
617 &snap_id, size);
600 if (ret < 0) 618 if (ret < 0)
601 goto done; 619 goto done;
602 dev->snap_id = snapc->seq; 620 rbd_dev->snap_id = snap_id;
603 dev->read_only = 1; 621 rbd_dev->snap_exists = true;
622 rbd_dev->read_only = 1;
604 } 623 }
605 624
606 ret = 0; 625 ret = 0;
607done: 626done:
608 up_write(&dev->header_rwsem); 627 up_write(&rbd_dev->header_rwsem);
609 return ret; 628 return ret;
610} 629}
611 630
612static void rbd_header_free(struct rbd_image_header *header) 631static void rbd_header_free(struct rbd_image_header *header)
613{ 632{
614 kfree(header->snapc); 633 kfree(header->object_prefix);
615 kfree(header->snap_names);
616 kfree(header->snap_sizes); 634 kfree(header->snap_sizes);
635 kfree(header->snap_names);
636 ceph_put_snap_context(header->snapc);
617} 637}
618 638
619/* 639/*
620 * get the actual striped segment name, offset and length 640 * get the actual striped segment name, offset and length
621 */ 641 */
622static u64 rbd_get_segment(struct rbd_image_header *header, 642static u64 rbd_get_segment(struct rbd_image_header *header,
623 const char *block_name, 643 const char *object_prefix,
624 u64 ofs, u64 len, 644 u64 ofs, u64 len,
625 char *seg_name, u64 *segofs) 645 char *seg_name, u64 *segofs)
626{ 646{
@@ -628,7 +648,7 @@ static u64 rbd_get_segment(struct rbd_image_header *header,
628 648
629 if (seg_name) 649 if (seg_name)
630 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN, 650 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
631 "%s.%012llx", block_name, seg); 651 "%s.%012llx", object_prefix, seg);
632 652
633 ofs = ofs & ((1 << header->obj_order) - 1); 653 ofs = ofs & ((1 << header->obj_order) - 1);
634 len = min_t(u64, len, (1 << header->obj_order) - ofs); 654 len = min_t(u64, len, (1 << header->obj_order) - ofs);
@@ -726,9 +746,8 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
726 * split_bio will BUG_ON if this is not the case 746 * split_bio will BUG_ON if this is not the case
727 */ 747 */
728 dout("bio_chain_clone split! total=%d remaining=%d" 748 dout("bio_chain_clone split! total=%d remaining=%d"
729 "bi_size=%d\n", 749 "bi_size=%u\n",
730 (int)total, (int)len-total, 750 total, len - total, old_chain->bi_size);
731 (int)old_chain->bi_size);
732 751
733 /* split the bio. We'll release it either in the next 752 /* split the bio. We'll release it either in the next
734 call, or it will have to be released outside */ 753 call, or it will have to be released outside */
@@ -777,22 +796,24 @@ err_out:
777/* 796/*
778 * helpers for osd request op vectors. 797 * helpers for osd request op vectors.
779 */ 798 */
780static int rbd_create_rw_ops(struct ceph_osd_req_op **ops, 799static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
781 int num_ops, 800 int opcode, u32 payload_len)
782 int opcode, 801{
783 u32 payload_len) 802 struct ceph_osd_req_op *ops;
784{ 803
785 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1), 804 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
786 GFP_NOIO); 805 if (!ops)
787 if (!*ops) 806 return NULL;
788 return -ENOMEM; 807
789 (*ops)[0].op = opcode; 808 ops[0].op = opcode;
809
790 /* 810 /*
791 * op extent offset and length will be set later on 811 * op extent offset and length will be set later on
792 * in calc_raw_layout() 812 * in calc_raw_layout()
793 */ 813 */
794 (*ops)[0].payload_len = payload_len; 814 ops[0].payload_len = payload_len;
795 return 0; 815
816 return ops;
796} 817}
797 818
798static void rbd_destroy_ops(struct ceph_osd_req_op *ops) 819static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
@@ -808,8 +829,8 @@ static void rbd_coll_end_req_index(struct request *rq,
808 struct request_queue *q; 829 struct request_queue *q;
809 int min, max, i; 830 int min, max, i;
810 831
811 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n", 832 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
812 coll, index, ret, len); 833 coll, index, ret, (unsigned long long) len);
813 834
814 if (!rq) 835 if (!rq)
815 return; 836 return;
@@ -848,16 +869,15 @@ static void rbd_coll_end_req(struct rbd_request *req,
848 * Send ceph osd request 869 * Send ceph osd request
849 */ 870 */
850static int rbd_do_request(struct request *rq, 871static int rbd_do_request(struct request *rq,
851 struct rbd_device *dev, 872 struct rbd_device *rbd_dev,
852 struct ceph_snap_context *snapc, 873 struct ceph_snap_context *snapc,
853 u64 snapid, 874 u64 snapid,
854 const char *obj, u64 ofs, u64 len, 875 const char *object_name, u64 ofs, u64 len,
855 struct bio *bio, 876 struct bio *bio,
856 struct page **pages, 877 struct page **pages,
857 int num_pages, 878 int num_pages,
858 int flags, 879 int flags,
859 struct ceph_osd_req_op *ops, 880 struct ceph_osd_req_op *ops,
860 int num_reply,
861 struct rbd_req_coll *coll, 881 struct rbd_req_coll *coll,
862 int coll_index, 882 int coll_index,
863 void (*rbd_cb)(struct ceph_osd_request *req, 883 void (*rbd_cb)(struct ceph_osd_request *req,
@@ -887,15 +907,13 @@ static int rbd_do_request(struct request *rq,
887 req_data->coll_index = coll_index; 907 req_data->coll_index = coll_index;
888 } 908 }
889 909
890 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs); 910 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
891 911 (unsigned long long) ofs, (unsigned long long) len);
892 down_read(&dev->header_rwsem);
893 912
894 osdc = &dev->rbd_client->client->osdc; 913 osdc = &rbd_dev->rbd_client->client->osdc;
895 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, 914 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
896 false, GFP_NOIO, pages, bio); 915 false, GFP_NOIO, pages, bio);
897 if (!req) { 916 if (!req) {
898 up_read(&dev->header_rwsem);
899 ret = -ENOMEM; 917 ret = -ENOMEM;
900 goto done_pages; 918 goto done_pages;
901 } 919 }
@@ -912,7 +930,7 @@ static int rbd_do_request(struct request *rq,
912 reqhead = req->r_request->front.iov_base; 930 reqhead = req->r_request->front.iov_base;
913 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); 931 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
914 932
915 strncpy(req->r_oid, obj, sizeof(req->r_oid)); 933 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
916 req->r_oid_len = strlen(req->r_oid); 934 req->r_oid_len = strlen(req->r_oid);
917 935
918 layout = &req->r_file_layout; 936 layout = &req->r_file_layout;
@@ -920,7 +938,7 @@ static int rbd_do_request(struct request *rq,
920 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 938 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
921 layout->fl_stripe_count = cpu_to_le32(1); 939 layout->fl_stripe_count = cpu_to_le32(1);
922 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 940 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
923 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 941 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
924 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, 942 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
925 req, ops); 943 req, ops);
926 944
@@ -929,7 +947,6 @@ static int rbd_do_request(struct request *rq,
929 snapc, 947 snapc,
930 &mtime, 948 &mtime,
931 req->r_oid, req->r_oid_len); 949 req->r_oid, req->r_oid_len);
932 up_read(&dev->header_rwsem);
933 950
934 if (linger_req) { 951 if (linger_req) {
935 ceph_osdc_set_request_linger(osdc, req); 952 ceph_osdc_set_request_linger(osdc, req);
@@ -944,8 +961,9 @@ static int rbd_do_request(struct request *rq,
944 ret = ceph_osdc_wait_request(osdc, req); 961 ret = ceph_osdc_wait_request(osdc, req);
945 if (ver) 962 if (ver)
946 *ver = le64_to_cpu(req->r_reassert_version.version); 963 *ver = le64_to_cpu(req->r_reassert_version.version);
947 dout("reassert_ver=%lld\n", 964 dout("reassert_ver=%llu\n",
948 le64_to_cpu(req->r_reassert_version.version)); 965 (unsigned long long)
966 le64_to_cpu(req->r_reassert_version.version));
949 ceph_osdc_put_request(req); 967 ceph_osdc_put_request(req);
950 } 968 }
951 return ret; 969 return ret;
@@ -979,7 +997,8 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979 bytes = le64_to_cpu(op->extent.length); 997 bytes = le64_to_cpu(op->extent.length);
980 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); 998 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
981 999
982 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc); 1000 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1001 (unsigned long long) bytes, read_op, (int) rc);
983 1002
984 if (rc == -ENOENT && read_op) { 1003 if (rc == -ENOENT && read_op) {
985 zero_bio_chain(req_data->bio, 0); 1004 zero_bio_chain(req_data->bio, 0);
@@ -1006,14 +1025,12 @@ static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg
1006/* 1025/*
1007 * Do a synchronous ceph osd operation 1026 * Do a synchronous ceph osd operation
1008 */ 1027 */
1009static int rbd_req_sync_op(struct rbd_device *dev, 1028static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1010 struct ceph_snap_context *snapc, 1029 struct ceph_snap_context *snapc,
1011 u64 snapid, 1030 u64 snapid,
1012 int opcode,
1013 int flags, 1031 int flags,
1014 struct ceph_osd_req_op *orig_ops, 1032 struct ceph_osd_req_op *ops,
1015 int num_reply, 1033 const char *object_name,
1016 const char *obj,
1017 u64 ofs, u64 len, 1034 u64 ofs, u64 len,
1018 char *buf, 1035 char *buf,
1019 struct ceph_osd_request **linger_req, 1036 struct ceph_osd_request **linger_req,
@@ -1022,45 +1039,28 @@ static int rbd_req_sync_op(struct rbd_device *dev,
1022 int ret; 1039 int ret;
1023 struct page **pages; 1040 struct page **pages;
1024 int num_pages; 1041 int num_pages;
1025 struct ceph_osd_req_op *ops = orig_ops; 1042
1026 u32 payload_len; 1043 BUG_ON(ops == NULL);
1027 1044
1028 num_pages = calc_pages_for(ofs , len); 1045 num_pages = calc_pages_for(ofs , len);
1029 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1046 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1030 if (IS_ERR(pages)) 1047 if (IS_ERR(pages))
1031 return PTR_ERR(pages); 1048 return PTR_ERR(pages);
1032 1049
1033 if (!orig_ops) { 1050 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1034 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0); 1051 object_name, ofs, len, NULL,
1035 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1036 if (ret < 0)
1037 goto done;
1038
1039 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1040 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1041 if (ret < 0)
1042 goto done_ops;
1043 }
1044 }
1045
1046 ret = rbd_do_request(NULL, dev, snapc, snapid,
1047 obj, ofs, len, NULL,
1048 pages, num_pages, 1052 pages, num_pages,
1049 flags, 1053 flags,
1050 ops, 1054 ops,
1051 2,
1052 NULL, 0, 1055 NULL, 0,
1053 NULL, 1056 NULL,
1054 linger_req, ver); 1057 linger_req, ver);
1055 if (ret < 0) 1058 if (ret < 0)
1056 goto done_ops; 1059 goto done;
1057 1060
1058 if ((flags & CEPH_OSD_FLAG_READ) && buf) 1061 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1059 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); 1062 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1060 1063
1061done_ops:
1062 if (!orig_ops)
1063 rbd_destroy_ops(ops);
1064done: 1064done:
1065 ceph_release_page_vector(pages, num_pages); 1065 ceph_release_page_vector(pages, num_pages);
1066 return ret; 1066 return ret;
@@ -1070,10 +1070,10 @@ done:
1070 * Do an asynchronous ceph osd operation 1070 * Do an asynchronous ceph osd operation
1071 */ 1071 */
1072static int rbd_do_op(struct request *rq, 1072static int rbd_do_op(struct request *rq,
1073 struct rbd_device *rbd_dev , 1073 struct rbd_device *rbd_dev,
1074 struct ceph_snap_context *snapc, 1074 struct ceph_snap_context *snapc,
1075 u64 snapid, 1075 u64 snapid,
1076 int opcode, int flags, int num_reply, 1076 int opcode, int flags,
1077 u64 ofs, u64 len, 1077 u64 ofs, u64 len,
1078 struct bio *bio, 1078 struct bio *bio,
1079 struct rbd_req_coll *coll, 1079 struct rbd_req_coll *coll,
@@ -1091,14 +1091,15 @@ static int rbd_do_op(struct request *rq,
1091 return -ENOMEM; 1091 return -ENOMEM;
1092 1092
1093 seg_len = rbd_get_segment(&rbd_dev->header, 1093 seg_len = rbd_get_segment(&rbd_dev->header,
1094 rbd_dev->header.block_name, 1094 rbd_dev->header.object_prefix,
1095 ofs, len, 1095 ofs, len,
1096 seg_name, &seg_ofs); 1096 seg_name, &seg_ofs);
1097 1097
1098 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); 1098 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099 1099
1100 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); 1100 ret = -ENOMEM;
1101 if (ret < 0) 1101 ops = rbd_create_rw_ops(1, opcode, payload_len);
1102 if (!ops)
1102 goto done; 1103 goto done;
1103 1104
1104 /* we've taken care of segment sizes earlier when we 1105 /* we've taken care of segment sizes earlier when we
@@ -1112,7 +1113,6 @@ static int rbd_do_op(struct request *rq,
1112 NULL, 0, 1113 NULL, 0,
1113 flags, 1114 flags,
1114 ops, 1115 ops,
1115 num_reply,
1116 coll, coll_index, 1116 coll, coll_index,
1117 rbd_req_cb, 0, NULL); 1117 rbd_req_cb, 0, NULL);
1118 1118
@@ -1136,7 +1136,6 @@ static int rbd_req_write(struct request *rq,
1136 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, 1136 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137 CEPH_OSD_OP_WRITE, 1137 CEPH_OSD_OP_WRITE,
1138 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1138 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139 2,
1140 ofs, len, bio, coll, coll_index); 1139 ofs, len, bio, coll, coll_index);
1141} 1140}
1142 1141
@@ -1155,55 +1154,58 @@ static int rbd_req_read(struct request *rq,
1155 snapid, 1154 snapid,
1156 CEPH_OSD_OP_READ, 1155 CEPH_OSD_OP_READ,
1157 CEPH_OSD_FLAG_READ, 1156 CEPH_OSD_FLAG_READ,
1158 2,
1159 ofs, len, bio, coll, coll_index); 1157 ofs, len, bio, coll, coll_index);
1160} 1158}
1161 1159
1162/* 1160/*
1163 * Request sync osd read 1161 * Request sync osd read
1164 */ 1162 */
1165static int rbd_req_sync_read(struct rbd_device *dev, 1163static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1166 struct ceph_snap_context *snapc,
1167 u64 snapid, 1164 u64 snapid,
1168 const char *obj, 1165 const char *object_name,
1169 u64 ofs, u64 len, 1166 u64 ofs, u64 len,
1170 char *buf, 1167 char *buf,
1171 u64 *ver) 1168 u64 *ver)
1172{ 1169{
1173 return rbd_req_sync_op(dev, NULL, 1170 struct ceph_osd_req_op *ops;
1171 int ret;
1172
1173 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1174 if (!ops)
1175 return -ENOMEM;
1176
1177 ret = rbd_req_sync_op(rbd_dev, NULL,
1174 snapid, 1178 snapid,
1175 CEPH_OSD_OP_READ,
1176 CEPH_OSD_FLAG_READ, 1179 CEPH_OSD_FLAG_READ,
1177 NULL, 1180 ops, object_name, ofs, len, buf, NULL, ver);
1178 1, obj, ofs, len, buf, NULL, ver); 1181 rbd_destroy_ops(ops);
1182
1183 return ret;
1179} 1184}
1180 1185
1181/* 1186/*
1182 * Request sync osd watch 1187 * Request sync osd watch
1183 */ 1188 */
1184static int rbd_req_sync_notify_ack(struct rbd_device *dev, 1189static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1185 u64 ver, 1190 u64 ver,
1186 u64 notify_id, 1191 u64 notify_id)
1187 const char *obj)
1188{ 1192{
1189 struct ceph_osd_req_op *ops; 1193 struct ceph_osd_req_op *ops;
1190 struct page **pages = NULL;
1191 int ret; 1194 int ret;
1192 1195
1193 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0); 1196 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1194 if (ret < 0) 1197 if (!ops)
1195 return ret; 1198 return -ENOMEM;
1196 1199
1197 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version); 1200 ops[0].watch.ver = cpu_to_le64(ver);
1198 ops[0].watch.cookie = notify_id; 1201 ops[0].watch.cookie = notify_id;
1199 ops[0].watch.flag = 0; 1202 ops[0].watch.flag = 0;
1200 1203
1201 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP, 1204 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1202 obj, 0, 0, NULL, 1205 rbd_dev->header_name, 0, 0, NULL,
1203 pages, 0, 1206 NULL, 0,
1204 CEPH_OSD_FLAG_READ, 1207 CEPH_OSD_FLAG_READ,
1205 ops, 1208 ops,
1206 1,
1207 NULL, 0, 1209 NULL, 0,
1208 rbd_simple_req_cb, 0, NULL); 1210 rbd_simple_req_cb, 0, NULL);
1209 1211
@@ -1213,54 +1215,53 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1213 1215
1214static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1216static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215{ 1217{
1216 struct rbd_device *dev = (struct rbd_device *)data; 1218 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1219 u64 hver;
1217 int rc; 1220 int rc;
1218 1221
1219 if (!dev) 1222 if (!rbd_dev)
1220 return; 1223 return;
1221 1224
1222 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1225 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1223 notify_id, (int)opcode); 1226 rbd_dev->header_name, (unsigned long long) notify_id,
1224 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 1227 (unsigned int) opcode);
1225 rc = __rbd_refresh_header(dev); 1228 rc = rbd_refresh_header(rbd_dev, &hver);
1226 mutex_unlock(&ctl_mutex);
1227 if (rc) 1229 if (rc)
1228 pr_warning(RBD_DRV_NAME "%d got notification but failed to " 1230 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229 " update snaps: %d\n", dev->major, rc); 1231 " update snaps: %d\n", rbd_dev->major, rc);
1230 1232
1231 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); 1233 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1232} 1234}
1233 1235
1234/* 1236/*
1235 * Request sync osd watch 1237 * Request sync osd watch
1236 */ 1238 */
1237static int rbd_req_sync_watch(struct rbd_device *dev, 1239static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1238 const char *obj,
1239 u64 ver)
1240{ 1240{
1241 struct ceph_osd_req_op *ops; 1241 struct ceph_osd_req_op *ops;
1242 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc; 1242 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1243 int ret;
1243 1244
1244 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1245 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1245 if (ret < 0) 1246 if (!ops)
1246 return ret; 1247 return -ENOMEM;
1247 1248
1248 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, 1249 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249 (void *)dev, &dev->watch_event); 1250 (void *)rbd_dev, &rbd_dev->watch_event);
1250 if (ret < 0) 1251 if (ret < 0)
1251 goto fail; 1252 goto fail;
1252 1253
1253 ops[0].watch.ver = cpu_to_le64(ver); 1254 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); 1255 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255 ops[0].watch.flag = 1; 1256 ops[0].watch.flag = 1;
1256 1257
1257 ret = rbd_req_sync_op(dev, NULL, 1258 ret = rbd_req_sync_op(rbd_dev, NULL,
1258 CEPH_NOSNAP, 1259 CEPH_NOSNAP,
1259 0,
1260 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1260 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261 ops, 1261 ops,
1262 1, obj, 0, 0, NULL, 1262 rbd_dev->header_name,
1263 &dev->watch_request, NULL); 1263 0, 0, NULL,
1264 &rbd_dev->watch_request, NULL);
1264 1265
1265 if (ret < 0) 1266 if (ret < 0)
1266 goto fail_event; 1267 goto fail_event;
@@ -1269,8 +1270,8 @@ static int rbd_req_sync_watch(struct rbd_device *dev,
1269 return 0; 1270 return 0;
1270 1271
1271fail_event: 1272fail_event:
1272 ceph_osdc_cancel_event(dev->watch_event); 1273 ceph_osdc_cancel_event(rbd_dev->watch_event);
1273 dev->watch_event = NULL; 1274 rbd_dev->watch_event = NULL;
1274fail: 1275fail:
1275 rbd_destroy_ops(ops); 1276 rbd_destroy_ops(ops);
1276 return ret; 1277 return ret;
@@ -1279,64 +1280,65 @@ fail:
1279/* 1280/*
1280 * Request sync osd unwatch 1281 * Request sync osd unwatch
1281 */ 1282 */
1282static int rbd_req_sync_unwatch(struct rbd_device *dev, 1283static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1283 const char *obj)
1284{ 1284{
1285 struct ceph_osd_req_op *ops; 1285 struct ceph_osd_req_op *ops;
1286 int ret;
1286 1287
1287 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1288 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1288 if (ret < 0) 1289 if (!ops)
1289 return ret; 1290 return -ENOMEM;
1290 1291
1291 ops[0].watch.ver = 0; 1292 ops[0].watch.ver = 0;
1292 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); 1293 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293 ops[0].watch.flag = 0; 1294 ops[0].watch.flag = 0;
1294 1295
1295 ret = rbd_req_sync_op(dev, NULL, 1296 ret = rbd_req_sync_op(rbd_dev, NULL,
1296 CEPH_NOSNAP, 1297 CEPH_NOSNAP,
1297 0,
1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 ops, 1299 ops,
1300 1, obj, 0, 0, NULL, NULL, NULL); 1300 rbd_dev->header_name,
1301 0, 0, NULL, NULL, NULL);
1302
1301 1303
1302 rbd_destroy_ops(ops); 1304 rbd_destroy_ops(ops);
1303 ceph_osdc_cancel_event(dev->watch_event); 1305 ceph_osdc_cancel_event(rbd_dev->watch_event);
1304 dev->watch_event = NULL; 1306 rbd_dev->watch_event = NULL;
1305 return ret; 1307 return ret;
1306} 1308}
1307 1309
1308struct rbd_notify_info { 1310struct rbd_notify_info {
1309 struct rbd_device *dev; 1311 struct rbd_device *rbd_dev;
1310}; 1312};
1311 1313
1312static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1314static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313{ 1315{
1314 struct rbd_device *dev = (struct rbd_device *)data; 1316 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1315 if (!dev) 1317 if (!rbd_dev)
1316 return; 1318 return;
1317 1319
1318 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, 1320 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1319 notify_id, (int)opcode); 1321 rbd_dev->header_name, (unsigned long long) notify_id,
1322 (unsigned int) opcode);
1320} 1323}
1321 1324
1322/* 1325/*
1323 * Request sync osd notify 1326 * Request sync osd notify
1324 */ 1327 */
1325static int rbd_req_sync_notify(struct rbd_device *dev, 1328static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1326 const char *obj)
1327{ 1329{
1328 struct ceph_osd_req_op *ops; 1330 struct ceph_osd_req_op *ops;
1329 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc; 1331 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1330 struct ceph_osd_event *event; 1332 struct ceph_osd_event *event;
1331 struct rbd_notify_info info; 1333 struct rbd_notify_info info;
1332 int payload_len = sizeof(u32) + sizeof(u32); 1334 int payload_len = sizeof(u32) + sizeof(u32);
1333 int ret; 1335 int ret;
1334 1336
1335 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len); 1337 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1336 if (ret < 0) 1338 if (!ops)
1337 return ret; 1339 return -ENOMEM;
1338 1340
1339 info.dev = dev; 1341 info.rbd_dev = rbd_dev;
1340 1342
1341 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, 1343 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342 (void *)&info, &event); 1344 (void *)&info, &event);
@@ -1349,12 +1351,12 @@ static int rbd_req_sync_notify(struct rbd_device *dev,
1349 ops[0].watch.prot_ver = RADOS_NOTIFY_VER; 1351 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350 ops[0].watch.timeout = 12; 1352 ops[0].watch.timeout = 12;
1351 1353
1352 ret = rbd_req_sync_op(dev, NULL, 1354 ret = rbd_req_sync_op(rbd_dev, NULL,
1353 CEPH_NOSNAP, 1355 CEPH_NOSNAP,
1354 0,
1355 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1356 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356 ops, 1357 ops,
1357 1, obj, 0, 0, NULL, NULL, NULL); 1358 rbd_dev->header_name,
1359 0, 0, NULL, NULL, NULL);
1358 if (ret < 0) 1360 if (ret < 0)
1359 goto fail_event; 1361 goto fail_event;
1360 1362
@@ -1373,36 +1375,37 @@ fail:
1373/* 1375/*
1374 * Request sync osd read 1376 * Request sync osd read
1375 */ 1377 */
1376static int rbd_req_sync_exec(struct rbd_device *dev, 1378static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1377 const char *obj, 1379 const char *object_name,
1378 const char *cls, 1380 const char *class_name,
1379 const char *method, 1381 const char *method_name,
1380 const char *data, 1382 const char *data,
1381 int len, 1383 int len,
1382 u64 *ver) 1384 u64 *ver)
1383{ 1385{
1384 struct ceph_osd_req_op *ops; 1386 struct ceph_osd_req_op *ops;
1385 int cls_len = strlen(cls); 1387 int class_name_len = strlen(class_name);
1386 int method_len = strlen(method); 1388 int method_name_len = strlen(method_name);
1387 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL, 1389 int ret;
1388 cls_len + method_len + len);
1389 if (ret < 0)
1390 return ret;
1391 1390
1392 ops[0].cls.class_name = cls; 1391 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1393 ops[0].cls.class_len = (__u8)cls_len; 1392 class_name_len + method_name_len + len);
1394 ops[0].cls.method_name = method; 1393 if (!ops)
1395 ops[0].cls.method_len = (__u8)method_len; 1394 return -ENOMEM;
1395
1396 ops[0].cls.class_name = class_name;
1397 ops[0].cls.class_len = (__u8) class_name_len;
1398 ops[0].cls.method_name = method_name;
1399 ops[0].cls.method_len = (__u8) method_name_len;
1396 ops[0].cls.argc = 0; 1400 ops[0].cls.argc = 0;
1397 ops[0].cls.indata = data; 1401 ops[0].cls.indata = data;
1398 ops[0].cls.indata_len = len; 1402 ops[0].cls.indata_len = len;
1399 1403
1400 ret = rbd_req_sync_op(dev, NULL, 1404 ret = rbd_req_sync_op(rbd_dev, NULL,
1401 CEPH_NOSNAP, 1405 CEPH_NOSNAP,
1402 0,
1403 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 1406 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404 ops, 1407 ops,
1405 1, obj, 0, 0, NULL, NULL, ver); 1408 object_name, 0, 0, NULL, NULL, ver);
1406 1409
1407 rbd_destroy_ops(ops); 1410 rbd_destroy_ops(ops);
1408 1411
@@ -1437,10 +1440,12 @@ static void rbd_rq_fn(struct request_queue *q)
1437 struct bio *bio; 1440 struct bio *bio;
1438 struct bio *rq_bio, *next_bio = NULL; 1441 struct bio *rq_bio, *next_bio = NULL;
1439 bool do_write; 1442 bool do_write;
1440 int size, op_size = 0; 1443 unsigned int size;
1444 u64 op_size = 0;
1441 u64 ofs; 1445 u64 ofs;
1442 int num_segs, cur_seg = 0; 1446 int num_segs, cur_seg = 0;
1443 struct rbd_req_coll *coll; 1447 struct rbd_req_coll *coll;
1448 struct ceph_snap_context *snapc;
1444 1449
1445 /* peek at request from block layer */ 1450 /* peek at request from block layer */
1446 if (!rq) 1451 if (!rq)
@@ -1467,23 +1472,38 @@ static void rbd_rq_fn(struct request_queue *q)
1467 1472
1468 spin_unlock_irq(q->queue_lock); 1473 spin_unlock_irq(q->queue_lock);
1469 1474
1475 down_read(&rbd_dev->header_rwsem);
1476
1477 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1478 up_read(&rbd_dev->header_rwsem);
1479 dout("request for non-existent snapshot");
1480 spin_lock_irq(q->queue_lock);
1481 __blk_end_request_all(rq, -ENXIO);
1482 continue;
1483 }
1484
1485 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1486
1487 up_read(&rbd_dev->header_rwsem);
1488
1470 dout("%s 0x%x bytes at 0x%llx\n", 1489 dout("%s 0x%x bytes at 0x%llx\n",
1471 do_write ? "write" : "read", 1490 do_write ? "write" : "read",
1472 size, blk_rq_pos(rq) * SECTOR_SIZE); 1491 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1473 1492
1474 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 1493 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1475 coll = rbd_alloc_coll(num_segs); 1494 coll = rbd_alloc_coll(num_segs);
1476 if (!coll) { 1495 if (!coll) {
1477 spin_lock_irq(q->queue_lock); 1496 spin_lock_irq(q->queue_lock);
1478 __blk_end_request_all(rq, -ENOMEM); 1497 __blk_end_request_all(rq, -ENOMEM);
1498 ceph_put_snap_context(snapc);
1479 continue; 1499 continue;
1480 } 1500 }
1481 1501
1482 do { 1502 do {
1483 /* a bio clone to be passed down to OSD req */ 1503 /* a bio clone to be passed down to OSD req */
1484 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); 1504 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1485 op_size = rbd_get_segment(&rbd_dev->header, 1505 op_size = rbd_get_segment(&rbd_dev->header,
1486 rbd_dev->header.block_name, 1506 rbd_dev->header.object_prefix,
1487 ofs, size, 1507 ofs, size,
1488 NULL, NULL); 1508 NULL, NULL);
1489 kref_get(&coll->kref); 1509 kref_get(&coll->kref);
@@ -1499,7 +1519,7 @@ static void rbd_rq_fn(struct request_queue *q)
1499 /* init OSD command: write or read */ 1519 /* init OSD command: write or read */
1500 if (do_write) 1520 if (do_write)
1501 rbd_req_write(rq, rbd_dev, 1521 rbd_req_write(rq, rbd_dev,
1502 rbd_dev->header.snapc, 1522 snapc,
1503 ofs, 1523 ofs,
1504 op_size, bio, 1524 op_size, bio,
1505 coll, cur_seg); 1525 coll, cur_seg);
@@ -1522,6 +1542,8 @@ next_seg:
1522 if (bp) 1542 if (bp)
1523 bio_pair_release(bp); 1543 bio_pair_release(bp);
1524 spin_lock_irq(q->queue_lock); 1544 spin_lock_irq(q->queue_lock);
1545
1546 ceph_put_snap_context(snapc);
1525 } 1547 }
1526} 1548}
1527 1549
@@ -1592,18 +1614,19 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1592 return -ENOMEM; 1614 return -ENOMEM;
1593 1615
1594 rc = rbd_req_sync_read(rbd_dev, 1616 rc = rbd_req_sync_read(rbd_dev,
1595 NULL, CEPH_NOSNAP, 1617 CEPH_NOSNAP,
1596 rbd_dev->obj_md_name, 1618 rbd_dev->header_name,
1597 0, len, 1619 0, len,
1598 (char *)dh, &ver); 1620 (char *)dh, &ver);
1599 if (rc < 0) 1621 if (rc < 0)
1600 goto out_dh; 1622 goto out_dh;
1601 1623
1602 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); 1624 rc = rbd_header_from_disk(header, dh, snap_count);
1603 if (rc < 0) { 1625 if (rc < 0) {
1604 if (rc == -ENXIO) 1626 if (rc == -ENXIO)
1605 pr_warning("unrecognized header format" 1627 pr_warning("unrecognized header format"
1606 " for image %s", rbd_dev->obj); 1628 " for image %s\n",
1629 rbd_dev->image_name);
1607 goto out_dh; 1630 goto out_dh;
1608 } 1631 }
1609 1632
@@ -1628,7 +1651,7 @@ out_dh:
1628/* 1651/*
1629 * create a snapshot 1652 * create a snapshot
1630 */ 1653 */
1631static int rbd_header_add_snap(struct rbd_device *dev, 1654static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1632 const char *snap_name, 1655 const char *snap_name,
1633 gfp_t gfp_flags) 1656 gfp_t gfp_flags)
1634{ 1657{
@@ -1636,16 +1659,15 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1636 u64 new_snapid; 1659 u64 new_snapid;
1637 int ret; 1660 int ret;
1638 void *data, *p, *e; 1661 void *data, *p, *e;
1639 u64 ver;
1640 struct ceph_mon_client *monc; 1662 struct ceph_mon_client *monc;
1641 1663
1642 /* we should create a snapshot only if we're pointing at the head */ 1664 /* we should create a snapshot only if we're pointing at the head */
1643 if (dev->snap_id != CEPH_NOSNAP) 1665 if (rbd_dev->snap_id != CEPH_NOSNAP)
1644 return -EINVAL; 1666 return -EINVAL;
1645 1667
1646 monc = &dev->rbd_client->client->monc; 1668 monc = &rbd_dev->rbd_client->client->monc;
1647 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid); 1669 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1648 dout("created snapid=%lld\n", new_snapid); 1670 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1649 if (ret < 0) 1671 if (ret < 0)
1650 return ret; 1672 return ret;
1651 1673
@@ -1659,19 +1681,13 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1659 ceph_encode_string_safe(&p, e, snap_name, name_len, bad); 1681 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1660 ceph_encode_64_safe(&p, e, new_snapid, bad); 1682 ceph_encode_64_safe(&p, e, new_snapid, bad);
1661 1683
1662 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", 1684 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1663 data, p - data, &ver); 1685 "rbd", "snap_add",
1686 data, p - data, NULL);
1664 1687
1665 kfree(data); 1688 kfree(data);
1666 1689
1667 if (ret < 0) 1690 return ret < 0 ? ret : 0;
1668 return ret;
1669
1670 down_write(&dev->header_rwsem);
1671 dev->header.snapc->seq = new_snapid;
1672 up_write(&dev->header_rwsem);
1673
1674 return 0;
1675bad: 1691bad:
1676 return -ERANGE; 1692 return -ERANGE;
1677} 1693}
@@ -1679,52 +1695,52 @@ bad:
1679static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) 1695static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1680{ 1696{
1681 struct rbd_snap *snap; 1697 struct rbd_snap *snap;
1698 struct rbd_snap *next;
1682 1699
1683 while (!list_empty(&rbd_dev->snaps)) { 1700 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1684 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node); 1701 __rbd_remove_snap_dev(snap);
1685 __rbd_remove_snap_dev(rbd_dev, snap);
1686 }
1687} 1702}
1688 1703
1689/* 1704/*
1690 * only read the first part of the ondisk header, without the snaps info 1705 * only read the first part of the ondisk header, without the snaps info
1691 */ 1706 */
1692static int __rbd_refresh_header(struct rbd_device *rbd_dev) 1707static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1693{ 1708{
1694 int ret; 1709 int ret;
1695 struct rbd_image_header h; 1710 struct rbd_image_header h;
1696 u64 snap_seq;
1697 int follow_seq = 0;
1698 1711
1699 ret = rbd_read_header(rbd_dev, &h); 1712 ret = rbd_read_header(rbd_dev, &h);
1700 if (ret < 0) 1713 if (ret < 0)
1701 return ret; 1714 return ret;
1702 1715
1703 /* resized? */
1704 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1705
1706 down_write(&rbd_dev->header_rwsem); 1716 down_write(&rbd_dev->header_rwsem);
1707 1717
1708 snap_seq = rbd_dev->header.snapc->seq; 1718 /* resized? */
1709 if (rbd_dev->header.total_snaps && 1719 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1710 rbd_dev->header.snapc->snaps[0] == snap_seq) 1720 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1711 /* pointing at the head, will need to follow that
1712 if head moves */
1713 follow_seq = 1;
1714 1721
1715 kfree(rbd_dev->header.snapc); 1722 dout("setting size to %llu sectors", (unsigned long long) size);
1716 kfree(rbd_dev->header.snap_names); 1723 set_capacity(rbd_dev->disk, size);
1724 }
1725
1726 /* rbd_dev->header.object_prefix shouldn't change */
1717 kfree(rbd_dev->header.snap_sizes); 1727 kfree(rbd_dev->header.snap_sizes);
1728 kfree(rbd_dev->header.snap_names);
1729 /* osd requests may still refer to snapc */
1730 ceph_put_snap_context(rbd_dev->header.snapc);
1718 1731
1732 if (hver)
1733 *hver = h.obj_version;
1734 rbd_dev->header.obj_version = h.obj_version;
1735 rbd_dev->header.image_size = h.image_size;
1719 rbd_dev->header.total_snaps = h.total_snaps; 1736 rbd_dev->header.total_snaps = h.total_snaps;
1720 rbd_dev->header.snapc = h.snapc; 1737 rbd_dev->header.snapc = h.snapc;
1721 rbd_dev->header.snap_names = h.snap_names; 1738 rbd_dev->header.snap_names = h.snap_names;
1722 rbd_dev->header.snap_names_len = h.snap_names_len; 1739 rbd_dev->header.snap_names_len = h.snap_names_len;
1723 rbd_dev->header.snap_sizes = h.snap_sizes; 1740 rbd_dev->header.snap_sizes = h.snap_sizes;
1724 if (follow_seq) 1741 /* Free the extra copy of the object prefix */
1725 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0]; 1742 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1726 else 1743 kfree(h.object_prefix);
1727 rbd_dev->header.snapc->seq = snap_seq;
1728 1744
1729 ret = __rbd_init_snaps_header(rbd_dev); 1745 ret = __rbd_init_snaps_header(rbd_dev);
1730 1746
@@ -1733,6 +1749,17 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1733 return ret; 1749 return ret;
1734} 1750}
1735 1751
1752static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1753{
1754 int ret;
1755
1756 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1757 ret = __rbd_refresh_header(rbd_dev, hver);
1758 mutex_unlock(&ctl_mutex);
1759
1760 return ret;
1761}
1762
1736static int rbd_init_disk(struct rbd_device *rbd_dev) 1763static int rbd_init_disk(struct rbd_device *rbd_dev)
1737{ 1764{
1738 struct gendisk *disk; 1765 struct gendisk *disk;
@@ -1762,7 +1789,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1762 goto out; 1789 goto out;
1763 1790
1764 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 1791 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1765 rbd_dev->id); 1792 rbd_dev->dev_id);
1766 disk->major = rbd_dev->major; 1793 disk->major = rbd_dev->major;
1767 disk->first_minor = 0; 1794 disk->first_minor = 0;
1768 disk->fops = &rbd_bd_ops; 1795 disk->fops = &rbd_bd_ops;
@@ -1819,8 +1846,13 @@ static ssize_t rbd_size_show(struct device *dev,
1819 struct device_attribute *attr, char *buf) 1846 struct device_attribute *attr, char *buf)
1820{ 1847{
1821 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1848 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849 sector_t size;
1850
1851 down_read(&rbd_dev->header_rwsem);
1852 size = get_capacity(rbd_dev->disk);
1853 up_read(&rbd_dev->header_rwsem);
1822 1854
1823 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); 1855 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1824} 1856}
1825 1857
1826static ssize_t rbd_major_show(struct device *dev, 1858static ssize_t rbd_major_show(struct device *dev,
@@ -1848,12 +1880,20 @@ static ssize_t rbd_pool_show(struct device *dev,
1848 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1880 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1849} 1881}
1850 1882
1883static ssize_t rbd_pool_id_show(struct device *dev,
1884 struct device_attribute *attr, char *buf)
1885{
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1889}
1890
1851static ssize_t rbd_name_show(struct device *dev, 1891static ssize_t rbd_name_show(struct device *dev,
1852 struct device_attribute *attr, char *buf) 1892 struct device_attribute *attr, char *buf)
1853{ 1893{
1854 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855 1895
1856 return sprintf(buf, "%s\n", rbd_dev->obj); 1896 return sprintf(buf, "%s\n", rbd_dev->image_name);
1857} 1897}
1858 1898
1859static ssize_t rbd_snap_show(struct device *dev, 1899static ssize_t rbd_snap_show(struct device *dev,
@@ -1871,23 +1911,18 @@ static ssize_t rbd_image_refresh(struct device *dev,
1871 size_t size) 1911 size_t size)
1872{ 1912{
1873 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 1913 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874 int rc; 1914 int ret;
1875 int ret = size;
1876
1877 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1878 1915
1879 rc = __rbd_refresh_header(rbd_dev); 1916 ret = rbd_refresh_header(rbd_dev, NULL);
1880 if (rc < 0)
1881 ret = rc;
1882 1917
1883 mutex_unlock(&ctl_mutex); 1918 return ret < 0 ? ret : size;
1884 return ret;
1885} 1919}
1886 1920
1887static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 1921static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1888static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 1922static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1889static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 1923static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1890static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 1924static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1925static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1891static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 1926static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1892static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 1927static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1893static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 1928static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
@@ -1898,6 +1933,7 @@ static struct attribute *rbd_attrs[] = {
1898 &dev_attr_major.attr, 1933 &dev_attr_major.attr,
1899 &dev_attr_client_id.attr, 1934 &dev_attr_client_id.attr,
1900 &dev_attr_pool.attr, 1935 &dev_attr_pool.attr,
1936 &dev_attr_pool_id.attr,
1901 &dev_attr_name.attr, 1937 &dev_attr_name.attr,
1902 &dev_attr_current_snap.attr, 1938 &dev_attr_current_snap.attr,
1903 &dev_attr_refresh.attr, 1939 &dev_attr_refresh.attr,
@@ -1977,15 +2013,13 @@ static struct device_type rbd_snap_device_type = {
1977 .release = rbd_snap_dev_release, 2013 .release = rbd_snap_dev_release,
1978}; 2014};
1979 2015
1980static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 2016static void __rbd_remove_snap_dev(struct rbd_snap *snap)
1981 struct rbd_snap *snap)
1982{ 2017{
1983 list_del(&snap->node); 2018 list_del(&snap->node);
1984 device_unregister(&snap->dev); 2019 device_unregister(&snap->dev);
1985} 2020}
1986 2021
1987static int rbd_register_snap_dev(struct rbd_device *rbd_dev, 2022static int rbd_register_snap_dev(struct rbd_snap *snap,
1988 struct rbd_snap *snap,
1989 struct device *parent) 2023 struct device *parent)
1990{ 2024{
1991 struct device *dev = &snap->dev; 2025 struct device *dev = &snap->dev;
@@ -2000,29 +2034,36 @@ static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2000 return ret; 2034 return ret;
2001} 2035}
2002 2036
2003static int __rbd_add_snap_dev(struct rbd_device *rbd_dev, 2037static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2004 int i, const char *name, 2038 int i, const char *name)
2005 struct rbd_snap **snapp)
2006{ 2039{
2040 struct rbd_snap *snap;
2007 int ret; 2041 int ret;
2008 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL); 2042
2043 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2009 if (!snap) 2044 if (!snap)
2010 return -ENOMEM; 2045 return ERR_PTR(-ENOMEM);
2046
2047 ret = -ENOMEM;
2011 snap->name = kstrdup(name, GFP_KERNEL); 2048 snap->name = kstrdup(name, GFP_KERNEL);
2049 if (!snap->name)
2050 goto err;
2051
2012 snap->size = rbd_dev->header.snap_sizes[i]; 2052 snap->size = rbd_dev->header.snap_sizes[i];
2013 snap->id = rbd_dev->header.snapc->snaps[i]; 2053 snap->id = rbd_dev->header.snapc->snaps[i];
2014 if (device_is_registered(&rbd_dev->dev)) { 2054 if (device_is_registered(&rbd_dev->dev)) {
2015 ret = rbd_register_snap_dev(rbd_dev, snap, 2055 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2016 &rbd_dev->dev);
2017 if (ret < 0) 2056 if (ret < 0)
2018 goto err; 2057 goto err;
2019 } 2058 }
2020 *snapp = snap; 2059
2021 return 0; 2060 return snap;
2061
2022err: 2062err:
2023 kfree(snap->name); 2063 kfree(snap->name);
2024 kfree(snap); 2064 kfree(snap);
2025 return ret; 2065
2066 return ERR_PTR(ret);
2026} 2067}
2027 2068
2028/* 2069/*
@@ -2055,7 +2096,6 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2055 const char *name, *first_name; 2096 const char *name, *first_name;
2056 int i = rbd_dev->header.total_snaps; 2097 int i = rbd_dev->header.total_snaps;
2057 struct rbd_snap *snap, *old_snap = NULL; 2098 struct rbd_snap *snap, *old_snap = NULL;
2058 int ret;
2059 struct list_head *p, *n; 2099 struct list_head *p, *n;
2060 2100
2061 first_name = rbd_dev->header.snap_names; 2101 first_name = rbd_dev->header.snap_names;
@@ -2070,8 +2110,15 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2070 cur_id = rbd_dev->header.snapc->snaps[i - 1]; 2110 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2071 2111
2072 if (!i || old_snap->id < cur_id) { 2112 if (!i || old_snap->id < cur_id) {
2073 /* old_snap->id was skipped, thus was removed */ 2113 /*
2074 __rbd_remove_snap_dev(rbd_dev, old_snap); 2114 * old_snap->id was skipped, thus was
2115 * removed. If this rbd_dev is mapped to
2116 * the removed snapshot, record that it no
2117 * longer exists, to prevent further I/O.
2118 */
2119 if (rbd_dev->snap_id == old_snap->id)
2120 rbd_dev->snap_exists = false;
2121 __rbd_remove_snap_dev(old_snap);
2075 continue; 2122 continue;
2076 } 2123 }
2077 if (old_snap->id == cur_id) { 2124 if (old_snap->id == cur_id) {
@@ -2091,9 +2138,9 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2091 if (cur_id >= old_snap->id) 2138 if (cur_id >= old_snap->id)
2092 break; 2139 break;
2093 /* a new snapshot */ 2140 /* a new snapshot */
2094 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 2141 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2095 if (ret < 0) 2142 if (IS_ERR(snap))
2096 return ret; 2143 return PTR_ERR(snap);
2097 2144
2098 /* note that we add it backward so using n and not p */ 2145 /* note that we add it backward so using n and not p */
2099 list_add(&snap->node, n); 2146 list_add(&snap->node, n);
@@ -2107,9 +2154,9 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2107 WARN_ON(1); 2154 WARN_ON(1);
2108 return -EINVAL; 2155 return -EINVAL;
2109 } 2156 }
2110 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); 2157 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2111 if (ret < 0) 2158 if (IS_ERR(snap))
2112 return ret; 2159 return PTR_ERR(snap);
2113 list_add(&snap->node, &rbd_dev->snaps); 2160 list_add(&snap->node, &rbd_dev->snaps);
2114 } 2161 }
2115 2162
@@ -2129,14 +2176,13 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2129 dev->type = &rbd_device_type; 2176 dev->type = &rbd_device_type;
2130 dev->parent = &rbd_root_dev; 2177 dev->parent = &rbd_root_dev;
2131 dev->release = rbd_dev_release; 2178 dev->release = rbd_dev_release;
2132 dev_set_name(dev, "%d", rbd_dev->id); 2179 dev_set_name(dev, "%d", rbd_dev->dev_id);
2133 ret = device_register(dev); 2180 ret = device_register(dev);
2134 if (ret < 0) 2181 if (ret < 0)
2135 goto out; 2182 goto out;
2136 2183
2137 list_for_each_entry(snap, &rbd_dev->snaps, node) { 2184 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2138 ret = rbd_register_snap_dev(rbd_dev, snap, 2185 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2139 &rbd_dev->dev);
2140 if (ret < 0) 2186 if (ret < 0)
2141 break; 2187 break;
2142 } 2188 }
@@ -2155,12 +2201,9 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2155 int ret, rc; 2201 int ret, rc;
2156 2202
2157 do { 2203 do {
2158 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name, 2204 ret = rbd_req_sync_watch(rbd_dev);
2159 rbd_dev->header.obj_version);
2160 if (ret == -ERANGE) { 2205 if (ret == -ERANGE) {
2161 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2206 rc = rbd_refresh_header(rbd_dev, NULL);
2162 rc = __rbd_refresh_header(rbd_dev);
2163 mutex_unlock(&ctl_mutex);
2164 if (rc < 0) 2207 if (rc < 0)
2165 return rc; 2208 return rc;
2166 } 2209 }
@@ -2177,7 +2220,7 @@ static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2177 */ 2220 */
2178static void rbd_id_get(struct rbd_device *rbd_dev) 2221static void rbd_id_get(struct rbd_device *rbd_dev)
2179{ 2222{
2180 rbd_dev->id = atomic64_inc_return(&rbd_id_max); 2223 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2181 2224
2182 spin_lock(&rbd_dev_list_lock); 2225 spin_lock(&rbd_dev_list_lock);
2183 list_add_tail(&rbd_dev->node, &rbd_dev_list); 2226 list_add_tail(&rbd_dev->node, &rbd_dev_list);
@@ -2191,7 +2234,7 @@ static void rbd_id_get(struct rbd_device *rbd_dev)
2191static void rbd_id_put(struct rbd_device *rbd_dev) 2234static void rbd_id_put(struct rbd_device *rbd_dev)
2192{ 2235{
2193 struct list_head *tmp; 2236 struct list_head *tmp;
2194 int rbd_id = rbd_dev->id; 2237 int rbd_id = rbd_dev->dev_id;
2195 int max_id; 2238 int max_id;
2196 2239
2197 BUG_ON(rbd_id < 1); 2240 BUG_ON(rbd_id < 1);
@@ -2282,19 +2325,58 @@ static inline size_t copy_token(const char **buf,
2282} 2325}
2283 2326
2284/* 2327/*
2285 * This fills in the pool_name, obj, obj_len, snap_name, obj_len, 2328 * Finds the next token in *buf, dynamically allocates a buffer big
2329 * enough to hold a copy of it, and copies the token into the new
2330 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2331 * that a duplicate buffer is created even for a zero-length token.
2332 *
2333 * Returns a pointer to the newly-allocated duplicate, or a null
2334 * pointer if memory for the duplicate was not available. If
2335 * the lenp argument is a non-null pointer, the length of the token
2336 * (not including the '\0') is returned in *lenp.
2337 *
2338 * If successful, the *buf pointer will be updated to point beyond
2339 * the end of the found token.
2340 *
2341 * Note: uses GFP_KERNEL for allocation.
2342 */
2343static inline char *dup_token(const char **buf, size_t *lenp)
2344{
2345 char *dup;
2346 size_t len;
2347
2348 len = next_token(buf);
2349 dup = kmalloc(len + 1, GFP_KERNEL);
2350 if (!dup)
2351 return NULL;
2352
2353 memcpy(dup, *buf, len);
2354 *(dup + len) = '\0';
2355 *buf += len;
2356
2357 if (lenp)
2358 *lenp = len;
2359
2360 return dup;
2361}
2362
2363/*
2364 * This fills in the pool_name, image_name, image_name_len, snap_name,
2286 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based 2365 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2287 * on the list of monitor addresses and other options provided via 2366 * on the list of monitor addresses and other options provided via
2288 * /sys/bus/rbd/add. 2367 * /sys/bus/rbd/add.
2368 *
2369 * Note: rbd_dev is assumed to have been initially zero-filled.
2289 */ 2370 */
2290static int rbd_add_parse_args(struct rbd_device *rbd_dev, 2371static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2291 const char *buf, 2372 const char *buf,
2292 const char **mon_addrs, 2373 const char **mon_addrs,
2293 size_t *mon_addrs_size, 2374 size_t *mon_addrs_size,
2294 char *options, 2375 char *options,
2295 size_t options_size) 2376 size_t options_size)
2296{ 2377{
2297 size_t len; 2378 size_t len;
2379 int ret;
2298 2380
2299 /* The first four tokens are required */ 2381 /* The first four tokens are required */
2300 2382
@@ -2310,56 +2392,74 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2310 if (!len || len >= options_size) 2392 if (!len || len >= options_size)
2311 return -EINVAL; 2393 return -EINVAL;
2312 2394
2313 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name)); 2395 ret = -ENOMEM;
2314 if (!len || len >= sizeof (rbd_dev->pool_name)) 2396 rbd_dev->pool_name = dup_token(&buf, NULL);
2315 return -EINVAL; 2397 if (!rbd_dev->pool_name)
2316 2398 goto out_err;
2317 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2318 if (!len || len >= sizeof (rbd_dev->obj))
2319 return -EINVAL;
2320 2399
2321 /* We have the object length in hand, save it. */ 2400 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2401 if (!rbd_dev->image_name)
2402 goto out_err;
2322 2403
2323 rbd_dev->obj_len = len; 2404 /* Create the name of the header object */
2324 2405
2325 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN 2406 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2326 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX)); 2407 + sizeof (RBD_SUFFIX),
2327 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX); 2408 GFP_KERNEL);
2409 if (!rbd_dev->header_name)
2410 goto out_err;
2411 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2328 2412
2329 /* 2413 /*
2330 * The snapshot name is optional, but it's an error if it's 2414 * The snapshot name is optional. If none is is supplied,
2331 * too long. If no snapshot is supplied, fill in the default. 2415 * we use the default value.
2332 */ 2416 */
2333 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name)); 2417 rbd_dev->snap_name = dup_token(&buf, &len);
2334 if (!len) 2418 if (!rbd_dev->snap_name)
2419 goto out_err;
2420 if (!len) {
2421 /* Replace the empty name with the default */
2422 kfree(rbd_dev->snap_name);
2423 rbd_dev->snap_name
2424 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2425 if (!rbd_dev->snap_name)
2426 goto out_err;
2427
2335 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME, 2428 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2336 sizeof (RBD_SNAP_HEAD_NAME)); 2429 sizeof (RBD_SNAP_HEAD_NAME));
2337 else if (len >= sizeof (rbd_dev->snap_name)) 2430 }
2338 return -EINVAL;
2339 2431
2340 return 0; 2432 return 0;
2433
2434out_err:
2435 kfree(rbd_dev->header_name);
2436 kfree(rbd_dev->image_name);
2437 kfree(rbd_dev->pool_name);
2438 rbd_dev->pool_name = NULL;
2439
2440 return ret;
2341} 2441}
2342 2442
2343static ssize_t rbd_add(struct bus_type *bus, 2443static ssize_t rbd_add(struct bus_type *bus,
2344 const char *buf, 2444 const char *buf,
2345 size_t count) 2445 size_t count)
2346{ 2446{
2347 struct rbd_device *rbd_dev; 2447 char *options;
2448 struct rbd_device *rbd_dev = NULL;
2348 const char *mon_addrs = NULL; 2449 const char *mon_addrs = NULL;
2349 size_t mon_addrs_size = 0; 2450 size_t mon_addrs_size = 0;
2350 char *options = NULL;
2351 struct ceph_osd_client *osdc; 2451 struct ceph_osd_client *osdc;
2352 int rc = -ENOMEM; 2452 int rc = -ENOMEM;
2353 2453
2354 if (!try_module_get(THIS_MODULE)) 2454 if (!try_module_get(THIS_MODULE))
2355 return -ENODEV; 2455 return -ENODEV;
2356 2456
2357 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2358 if (!rbd_dev)
2359 goto err_nomem;
2360 options = kmalloc(count, GFP_KERNEL); 2457 options = kmalloc(count, GFP_KERNEL);
2361 if (!options) 2458 if (!options)
2362 goto err_nomem; 2459 goto err_nomem;
2460 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2461 if (!rbd_dev)
2462 goto err_nomem;
2363 2463
2364 /* static rbd_device initialization */ 2464 /* static rbd_device initialization */
2365 spin_lock_init(&rbd_dev->lock); 2465 spin_lock_init(&rbd_dev->lock);
@@ -2367,15 +2467,13 @@ static ssize_t rbd_add(struct bus_type *bus,
2367 INIT_LIST_HEAD(&rbd_dev->snaps); 2467 INIT_LIST_HEAD(&rbd_dev->snaps);
2368 init_rwsem(&rbd_dev->header_rwsem); 2468 init_rwsem(&rbd_dev->header_rwsem);
2369 2469
2370 init_rwsem(&rbd_dev->header_rwsem);
2371
2372 /* generate unique id: find highest unique id, add one */ 2470 /* generate unique id: find highest unique id, add one */
2373 rbd_id_get(rbd_dev); 2471 rbd_id_get(rbd_dev);
2374 2472
2375 /* Fill in the device name, now that we have its id. */ 2473 /* Fill in the device name, now that we have its id. */
2376 BUILD_BUG_ON(DEV_NAME_LEN 2474 BUILD_BUG_ON(DEV_NAME_LEN
2377 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 2475 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2378 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id); 2476 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2379 2477
2380 /* parse add command */ 2478 /* parse add command */
2381 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size, 2479 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
@@ -2395,7 +2493,7 @@ static ssize_t rbd_add(struct bus_type *bus,
2395 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 2493 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2396 if (rc < 0) 2494 if (rc < 0)
2397 goto err_out_client; 2495 goto err_out_client;
2398 rbd_dev->poolid = rc; 2496 rbd_dev->pool_id = rc;
2399 2497
2400 /* register our block device */ 2498 /* register our block device */
2401 rc = register_blkdev(0, rbd_dev->name); 2499 rc = register_blkdev(0, rbd_dev->name);
@@ -2435,10 +2533,16 @@ err_out_blkdev:
2435err_out_client: 2533err_out_client:
2436 rbd_put_client(rbd_dev); 2534 rbd_put_client(rbd_dev);
2437err_put_id: 2535err_put_id:
2536 if (rbd_dev->pool_name) {
2537 kfree(rbd_dev->snap_name);
2538 kfree(rbd_dev->header_name);
2539 kfree(rbd_dev->image_name);
2540 kfree(rbd_dev->pool_name);
2541 }
2438 rbd_id_put(rbd_dev); 2542 rbd_id_put(rbd_dev);
2439err_nomem: 2543err_nomem:
2440 kfree(options);
2441 kfree(rbd_dev); 2544 kfree(rbd_dev);
2545 kfree(options);
2442 2546
2443 dout("Error adding device %s\n", buf); 2547 dout("Error adding device %s\n", buf);
2444 module_put(THIS_MODULE); 2548 module_put(THIS_MODULE);
@@ -2446,7 +2550,7 @@ err_nomem:
2446 return (ssize_t) rc; 2550 return (ssize_t) rc;
2447} 2551}
2448 2552
2449static struct rbd_device *__rbd_get_dev(unsigned long id) 2553static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2450{ 2554{
2451 struct list_head *tmp; 2555 struct list_head *tmp;
2452 struct rbd_device *rbd_dev; 2556 struct rbd_device *rbd_dev;
@@ -2454,7 +2558,7 @@ static struct rbd_device *__rbd_get_dev(unsigned long id)
2454 spin_lock(&rbd_dev_list_lock); 2558 spin_lock(&rbd_dev_list_lock);
2455 list_for_each(tmp, &rbd_dev_list) { 2559 list_for_each(tmp, &rbd_dev_list) {
2456 rbd_dev = list_entry(tmp, struct rbd_device, node); 2560 rbd_dev = list_entry(tmp, struct rbd_device, node);
2457 if (rbd_dev->id == id) { 2561 if (rbd_dev->dev_id == dev_id) {
2458 spin_unlock(&rbd_dev_list_lock); 2562 spin_unlock(&rbd_dev_list_lock);
2459 return rbd_dev; 2563 return rbd_dev;
2460 } 2564 }
@@ -2474,7 +2578,7 @@ static void rbd_dev_release(struct device *dev)
2474 rbd_dev->watch_request); 2578 rbd_dev->watch_request);
2475 } 2579 }
2476 if (rbd_dev->watch_event) 2580 if (rbd_dev->watch_event)
2477 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name); 2581 rbd_req_sync_unwatch(rbd_dev);
2478 2582
2479 rbd_put_client(rbd_dev); 2583 rbd_put_client(rbd_dev);
2480 2584
@@ -2483,6 +2587,10 @@ static void rbd_dev_release(struct device *dev)
2483 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2587 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2484 2588
2485 /* done with the id, and with the rbd_dev */ 2589 /* done with the id, and with the rbd_dev */
2590 kfree(rbd_dev->snap_name);
2591 kfree(rbd_dev->header_name);
2592 kfree(rbd_dev->pool_name);
2593 kfree(rbd_dev->image_name);
2486 rbd_id_put(rbd_dev); 2594 rbd_id_put(rbd_dev);
2487 kfree(rbd_dev); 2595 kfree(rbd_dev);
2488 2596
@@ -2544,7 +2652,7 @@ static ssize_t rbd_snap_add(struct device *dev,
2544 if (ret < 0) 2652 if (ret < 0)
2545 goto err_unlock; 2653 goto err_unlock;
2546 2654
2547 ret = __rbd_refresh_header(rbd_dev); 2655 ret = __rbd_refresh_header(rbd_dev, NULL);
2548 if (ret < 0) 2656 if (ret < 0)
2549 goto err_unlock; 2657 goto err_unlock;
2550 2658
@@ -2553,7 +2661,7 @@ static ssize_t rbd_snap_add(struct device *dev,
2553 mutex_unlock(&ctl_mutex); 2661 mutex_unlock(&ctl_mutex);
2554 2662
2555 /* make a best effort, don't error if failed */ 2663 /* make a best effort, don't error if failed */
2556 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name); 2664 rbd_req_sync_notify(rbd_dev);
2557 2665
2558 ret = count; 2666 ret = count;
2559 kfree(name); 2667 kfree(name);
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 950708688f17..0924e9e41a60 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -31,7 +31,6 @@
31#define RBD_MIN_OBJ_ORDER 16 31#define RBD_MIN_OBJ_ORDER 16
32#define RBD_MAX_OBJ_ORDER 30 32#define RBD_MAX_OBJ_ORDER 30
33 33
34#define RBD_MAX_OBJ_NAME_LEN 96
35#define RBD_MAX_SEG_NAME_LEN 128 34#define RBD_MAX_SEG_NAME_LEN 128
36 35
37#define RBD_COMP_NONE 0 36#define RBD_COMP_NONE 0
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 00894ff9246c..f391f1e75414 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -51,8 +51,7 @@ int ceph_init_dentry(struct dentry *dentry)
51 goto out_unlock; 51 goto out_unlock;
52 } 52 }
53 53
54 if (dentry->d_parent == NULL || /* nfs fh_to_dentry */ 54 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
55 ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
56 d_set_d_op(dentry, &ceph_dentry_ops); 55 d_set_d_op(dentry, &ceph_dentry_ops);
57 else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR) 56 else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
58 d_set_d_op(dentry, &ceph_snapdir_dentry_ops); 57 d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
@@ -79,7 +78,7 @@ struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
79 return NULL; 78 return NULL;
80 79
81 spin_lock(&dentry->d_lock); 80 spin_lock(&dentry->d_lock);
82 if (dentry->d_parent) { 81 if (!IS_ROOT(dentry)) {
83 inode = dentry->d_parent->d_inode; 82 inode = dentry->d_parent->d_inode;
84 ihold(inode); 83 ihold(inode);
85 } 84 }
@@ -1154,7 +1153,7 @@ static void ceph_d_prune(struct dentry *dentry)
1154 dout("ceph_d_prune %p\n", dentry); 1153 dout("ceph_d_prune %p\n", dentry);
1155 1154
1156 /* do we have a valid parent? */ 1155 /* do we have a valid parent? */
1157 if (!dentry->d_parent || IS_ROOT(dentry)) 1156 if (IS_ROOT(dentry))
1158 return; 1157 return;
1159 1158
1160 /* if we are not hashed, we don't affect D_COMPLETE */ 1159 /* if we are not hashed, we don't affect D_COMPLETE */
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 200bc87eceb1..a5a735422aa7 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -10,6 +10,7 @@
10#include "super.h" 10#include "super.h"
11#include "mds_client.h" 11#include "mds_client.h"
12 12
13#include <linux/ceph/ceph_features.h>
13#include <linux/ceph/messenger.h> 14#include <linux/ceph/messenger.h>
14#include <linux/ceph/decode.h> 15#include <linux/ceph/decode.h>
15#include <linux/ceph/pagelist.h> 16#include <linux/ceph/pagelist.h>
@@ -394,11 +395,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
394 s->s_seq = 0; 395 s->s_seq = 0;
395 mutex_init(&s->s_mutex); 396 mutex_init(&s->s_mutex);
396 397
397 ceph_con_init(mdsc->fsc->client->msgr, &s->s_con); 398 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
398 s->s_con.private = s;
399 s->s_con.ops = &mds_con_ops;
400 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
401 s->s_con.peer_name.num = cpu_to_le64(mds);
402 399
403 spin_lock_init(&s->s_gen_ttl_lock); 400 spin_lock_init(&s->s_gen_ttl_lock);
404 s->s_cap_gen = 0; 401 s->s_cap_gen = 0;
@@ -440,7 +437,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
440 mdsc->sessions[mds] = s; 437 mdsc->sessions[mds] = s;
441 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 438 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
442 439
443 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 440 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
441 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
444 442
445 return s; 443 return s;
446 444
@@ -1472,11 +1470,6 @@ retry:
1472 else 1470 else
1473 len += 1 + temp->d_name.len; 1471 len += 1 + temp->d_name.len;
1474 temp = temp->d_parent; 1472 temp = temp->d_parent;
1475 if (temp == NULL) {
1476 rcu_read_unlock();
1477 pr_err("build_path corrupt dentry %p\n", dentry);
1478 return ERR_PTR(-EINVAL);
1479 }
1480 } 1473 }
1481 rcu_read_unlock(); 1474 rcu_read_unlock();
1482 if (len) 1475 if (len)
@@ -1513,12 +1506,6 @@ retry:
1513 if (pos) 1506 if (pos)
1514 path[--pos] = '/'; 1507 path[--pos] = '/';
1515 temp = temp->d_parent; 1508 temp = temp->d_parent;
1516 if (temp == NULL) {
1517 rcu_read_unlock();
1518 pr_err("build_path corrupt dentry\n");
1519 kfree(path);
1520 return ERR_PTR(-EINVAL);
1521 }
1522 } 1509 }
1523 rcu_read_unlock(); 1510 rcu_read_unlock();
1524 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1511 if (pos != 0 || read_seqretry(&rename_lock, seq)) {
@@ -2531,7 +2518,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2531 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2518 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2532 session->s_seq = 0; 2519 session->s_seq = 0;
2533 2520
2521 ceph_con_close(&session->s_con);
2534 ceph_con_open(&session->s_con, 2522 ceph_con_open(&session->s_con,
2523 CEPH_ENTITY_TYPE_MDS, mds,
2535 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2524 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2536 2525
2537 /* replay unsafe requests */ 2526 /* replay unsafe requests */
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index e5206fc76562..cbb2f54a3019 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
296 struct ceph_snap_realm *parent = realm->parent; 296 struct ceph_snap_realm *parent = realm->parent;
297 struct ceph_snap_context *snapc; 297 struct ceph_snap_context *snapc;
298 int err = 0; 298 int err = 0;
299 int i; 299 u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
300 int num = realm->num_prior_parent_snaps + realm->num_snaps;
301 300
302 /* 301 /*
303 * build parent context, if it hasn't been built. 302 * build parent context, if it hasn't been built.
@@ -321,11 +320,11 @@ static int build_snap_context(struct ceph_snap_realm *realm)
321 realm->cached_context->seq == realm->seq && 320 realm->cached_context->seq == realm->seq &&
322 (!parent || 321 (!parent ||
323 realm->cached_context->seq >= parent->cached_context->seq)) { 322 realm->cached_context->seq >= parent->cached_context->seq)) {
324 dout("build_snap_context %llx %p: %p seq %lld (%d snaps)" 323 dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
325 " (unchanged)\n", 324 " (unchanged)\n",
326 realm->ino, realm, realm->cached_context, 325 realm->ino, realm, realm->cached_context,
327 realm->cached_context->seq, 326 realm->cached_context->seq,
328 realm->cached_context->num_snaps); 327 (unsigned int) realm->cached_context->num_snaps);
329 return 0; 328 return 0;
330 } 329 }
331 330
@@ -342,6 +341,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
342 num = 0; 341 num = 0;
343 snapc->seq = realm->seq; 342 snapc->seq = realm->seq;
344 if (parent) { 343 if (parent) {
344 u32 i;
345
345 /* include any of parent's snaps occurring _after_ my 346 /* include any of parent's snaps occurring _after_ my
346 parent became my parent */ 347 parent became my parent */
347 for (i = 0; i < parent->cached_context->num_snaps; i++) 348 for (i = 0; i < parent->cached_context->num_snaps; i++)
@@ -361,8 +362,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
361 362
362 sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL); 363 sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
363 snapc->num_snaps = num; 364 snapc->num_snaps = num;
364 dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n", 365 dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
365 realm->ino, realm, snapc, snapc->seq, snapc->num_snaps); 366 realm->ino, realm, snapc, snapc->seq,
367 (unsigned int) snapc->num_snaps);
366 368
367 if (realm->cached_context) 369 if (realm->cached_context)
368 ceph_put_snap_context(realm->cached_context); 370 ceph_put_snap_context(realm->cached_context);
@@ -402,9 +404,9 @@ static void rebuild_snap_realms(struct ceph_snap_realm *realm)
402 * helper to allocate and decode an array of snapids. free prior 404 * helper to allocate and decode an array of snapids. free prior
403 * instance, if any. 405 * instance, if any.
404 */ 406 */
405static int dup_array(u64 **dst, __le64 *src, int num) 407static int dup_array(u64 **dst, __le64 *src, u32 num)
406{ 408{
407 int i; 409 u32 i;
408 410
409 kfree(*dst); 411 kfree(*dst);
410 if (num) { 412 if (num) {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 7076109f014d..b982239f38f9 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -18,6 +18,7 @@
18#include "super.h" 18#include "super.h"
19#include "mds_client.h" 19#include "mds_client.h"
20 20
21#include <linux/ceph/ceph_features.h>
21#include <linux/ceph/decode.h> 22#include <linux/ceph/decode.h>
22#include <linux/ceph/mon_client.h> 23#include <linux/ceph/mon_client.h>
23#include <linux/ceph/auth.h> 24#include <linux/ceph/auth.h>
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index f4d5522cb619..ebc95cc652be 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -612,9 +612,9 @@ struct ceph_snap_realm {
612 u64 parent_since; /* snapid when our current parent became so */ 612 u64 parent_since; /* snapid when our current parent became so */
613 613
614 u64 *prior_parent_snaps; /* snaps inherited from any parents we */ 614 u64 *prior_parent_snaps; /* snaps inherited from any parents we */
615 int num_prior_parent_snaps; /* had prior to parent_since */ 615 u32 num_prior_parent_snaps; /* had prior to parent_since */
616 u64 *snaps; /* snaps specific to this realm */ 616 u64 *snaps; /* snaps specific to this realm */
617 int num_snaps; 617 u32 num_snaps;
618 618
619 struct ceph_snap_realm *parent; 619 struct ceph_snap_realm *parent;
620 struct list_head children; /* list of child realms */ 620 struct list_head children; /* list of child realms */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 785cb3057c95..2c2ae5be9902 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -457,6 +457,7 @@ start:
457 for (i = 0; i < numattr; i++) 457 for (i = 0; i < numattr; i++)
458 kfree(xattrs[i]); 458 kfree(xattrs[i]);
459 kfree(xattrs); 459 kfree(xattrs);
460 xattrs = NULL;
460 goto start; 461 goto start;
461 } 462 }
462 err = -EIO; 463 err = -EIO;
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
new file mode 100644
index 000000000000..dad579b0c0e6
--- /dev/null
+++ b/include/linux/ceph/ceph_features.h
@@ -0,0 +1,27 @@
1#ifndef __CEPH_FEATURES
2#define __CEPH_FEATURES
3
4/*
5 * feature bits
6 */
7#define CEPH_FEATURE_UID (1<<0)
8#define CEPH_FEATURE_NOSRCADDR (1<<1)
9#define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
10#define CEPH_FEATURE_FLOCK (1<<3)
11#define CEPH_FEATURE_SUBSCRIBE2 (1<<4)
12#define CEPH_FEATURE_MONNAMES (1<<5)
13#define CEPH_FEATURE_RECONNECT_SEQ (1<<6)
14#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7)
15/* bits 8-17 defined by user-space; not supported yet here */
16#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
17
18/*
19 * Features supported.
20 */
21#define CEPH_FEATURES_SUPPORTED_DEFAULT \
22 (CEPH_FEATURE_NOSRCADDR | \
23 CEPH_FEATURE_CRUSH_TUNABLES)
24
25#define CEPH_FEATURES_REQUIRED_DEFAULT \
26 (CEPH_FEATURE_NOSRCADDR)
27#endif
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index e81ab30d4896..d021610efd65 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -35,20 +35,6 @@
35/* arbitrary limit on max # of monitors (cluster of 3 is typical) */ 35/* arbitrary limit on max # of monitors (cluster of 3 is typical) */
36#define CEPH_MAX_MON 31 36#define CEPH_MAX_MON 31
37 37
38
39/*
40 * feature bits
41 */
42#define CEPH_FEATURE_UID (1<<0)
43#define CEPH_FEATURE_NOSRCADDR (1<<1)
44#define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
45#define CEPH_FEATURE_FLOCK (1<<3)
46#define CEPH_FEATURE_SUBSCRIBE2 (1<<4)
47#define CEPH_FEATURE_MONNAMES (1<<5)
48#define CEPH_FEATURE_RECONNECT_SEQ (1<<6)
49#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7)
50
51
52/* 38/*
53 * ceph_file_layout - describe data layout for a file/inode 39 * ceph_file_layout - describe data layout for a file/inode
54 */ 40 */
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index d8615dee5808..4bbf2db45f46 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -1,6 +1,7 @@
1#ifndef __CEPH_DECODE_H 1#ifndef __CEPH_DECODE_H
2#define __CEPH_DECODE_H 2#define __CEPH_DECODE_H
3 3
4#include <linux/err.h>
4#include <linux/bug.h> 5#include <linux/bug.h>
5#include <linux/time.h> 6#include <linux/time.h>
6#include <asm/unaligned.h> 7#include <asm/unaligned.h>
@@ -85,6 +86,52 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
85 } while (0) 86 } while (0)
86 87
87/* 88/*
89 * Allocate a buffer big enough to hold the wire-encoded string, and
90 * decode the string into it. The resulting string will always be
91 * terminated with '\0'. If successful, *p will be advanced
92 * past the decoded data. Also, if lenp is not a null pointer, the
93 * length (not including the terminating '\0') will be recorded in
94 * *lenp. Note that a zero-length string is a valid return value.
95 *
96 * Returns a pointer to the newly-allocated string buffer, or a
97 * pointer-coded errno if an error occurs. Neither *p nor *lenp
98 * will have been updated if an error is returned.
99 *
100 * There are two possible failures:
101 * - converting the string would require accessing memory at or
102 * beyond the "end" pointer provided (-E
103 * - memory could not be allocated for the result
104 */
105static inline char *ceph_extract_encoded_string(void **p, void *end,
106 size_t *lenp, gfp_t gfp)
107{
108 u32 len;
109 void *sp = *p;
110 char *buf;
111
112 ceph_decode_32_safe(&sp, end, len, bad);
113 if (!ceph_has_room(&sp, end, len))
114 goto bad;
115
116 buf = kmalloc(len + 1, gfp);
117 if (!buf)
118 return ERR_PTR(-ENOMEM);
119
120 if (len)
121 memcpy(buf, sp, len);
122 buf[len] = '\0';
123
124 *p = (char *) *p + sizeof (u32) + len;
125 if (lenp)
126 *lenp = (size_t) len;
127
128 return buf;
129
130bad:
131 return ERR_PTR(-ERANGE);
132}
133
134/*
88 * struct ceph_timespec <-> struct timespec 135 * struct ceph_timespec <-> struct timespec
89 */ 136 */
90static inline void ceph_decode_timespec(struct timespec *ts, 137static inline void ceph_decode_timespec(struct timespec *ts,
@@ -151,7 +198,7 @@ static inline void ceph_encode_filepath(void **p, void *end,
151 u64 ino, const char *path) 198 u64 ino, const char *path)
152{ 199{
153 u32 len = path ? strlen(path) : 0; 200 u32 len = path ? strlen(path) : 0;
154 BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end); 201 BUG_ON(*p + 1 + sizeof(ino) + sizeof(len) + len > end);
155 ceph_encode_8(p, 1); 202 ceph_encode_8(p, 1);
156 ceph_encode_64(p, ino); 203 ceph_encode_64(p, ino);
157 ceph_encode_32(p, len); 204 ceph_encode_32(p, len);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index e71d683982a6..42624789b06f 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -23,12 +23,6 @@
23#include "ceph_fs.h" 23#include "ceph_fs.h"
24 24
25/* 25/*
26 * Supported features
27 */
28#define CEPH_FEATURE_SUPPORTED_DEFAULT CEPH_FEATURE_NOSRCADDR
29#define CEPH_FEATURE_REQUIRED_DEFAULT CEPH_FEATURE_NOSRCADDR
30
31/*
32 * mount options 26 * mount options
33 */ 27 */
34#define CEPH_OPT_FSID (1<<0) 28#define CEPH_OPT_FSID (1<<0)
@@ -132,7 +126,7 @@ struct ceph_client {
132 u32 supported_features; 126 u32 supported_features;
133 u32 required_features; 127 u32 required_features;
134 128
135 struct ceph_messenger *msgr; /* messenger instance */ 129 struct ceph_messenger msgr; /* messenger instance */
136 struct ceph_mon_client monc; 130 struct ceph_mon_client monc;
137 struct ceph_osd_client osdc; 131 struct ceph_osd_client osdc;
138 132
@@ -160,7 +154,7 @@ struct ceph_client {
160struct ceph_snap_context { 154struct ceph_snap_context {
161 atomic_t nref; 155 atomic_t nref;
162 u64 seq; 156 u64 seq;
163 int num_snaps; 157 u32 num_snaps;
164 u64 snaps[]; 158 u64 snaps[];
165}; 159};
166 160
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 44c87e731e9d..189ae0637634 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -31,9 +31,6 @@ struct ceph_connection_operations {
31 int (*verify_authorizer_reply) (struct ceph_connection *con, int len); 31 int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
32 int (*invalidate_authorizer)(struct ceph_connection *con); 32 int (*invalidate_authorizer)(struct ceph_connection *con);
33 33
34 /* protocol version mismatch */
35 void (*bad_proto) (struct ceph_connection *con);
36
37 /* there was some error on the socket (disconnect, whatever) */ 34 /* there was some error on the socket (disconnect, whatever) */
38 void (*fault) (struct ceph_connection *con); 35 void (*fault) (struct ceph_connection *con);
39 36
@@ -53,6 +50,7 @@ struct ceph_messenger {
53 struct ceph_entity_inst inst; /* my name+address */ 50 struct ceph_entity_inst inst; /* my name+address */
54 struct ceph_entity_addr my_enc_addr; 51 struct ceph_entity_addr my_enc_addr;
55 52
53 atomic_t stopping;
56 bool nocrc; 54 bool nocrc;
57 55
58 /* 56 /*
@@ -80,7 +78,10 @@ struct ceph_msg {
80 unsigned nr_pages; /* size of page array */ 78 unsigned nr_pages; /* size of page array */
81 unsigned page_alignment; /* io offset in first page */ 79 unsigned page_alignment; /* io offset in first page */
82 struct ceph_pagelist *pagelist; /* instead of pages */ 80 struct ceph_pagelist *pagelist; /* instead of pages */
81
82 struct ceph_connection *con;
83 struct list_head list_head; 83 struct list_head list_head;
84
84 struct kref kref; 85 struct kref kref;
85 struct bio *bio; /* instead of pages/pagelist */ 86 struct bio *bio; /* instead of pages/pagelist */
86 struct bio *bio_iter; /* bio iterator */ 87 struct bio *bio_iter; /* bio iterator */
@@ -106,23 +107,6 @@ struct ceph_msg_pos {
106#define MAX_DELAY_INTERVAL (5 * 60 * HZ) 107#define MAX_DELAY_INTERVAL (5 * 60 * HZ)
107 108
108/* 109/*
109 * ceph_connection state bit flags
110 */
111#define LOSSYTX 0 /* we can close channel or drop messages on errors */
112#define CONNECTING 1
113#define NEGOTIATING 2
114#define KEEPALIVE_PENDING 3
115#define WRITE_PENDING 4 /* we have data ready to send */
116#define STANDBY 8 /* no outgoing messages, socket closed. we keep
117 * the ceph_connection around to maintain shared
118 * state with the peer. */
119#define CLOSED 10 /* we've closed the connection */
120#define SOCK_CLOSED 11 /* socket state changed to closed */
121#define OPENING 13 /* open connection w/ (possibly new) peer */
122#define DEAD 14 /* dead, about to kfree */
123#define BACKOFF 15
124
125/*
126 * A single connection with another host. 110 * A single connection with another host.
127 * 111 *
128 * We maintain a queue of outgoing messages, and some session state to 112 * We maintain a queue of outgoing messages, and some session state to
@@ -131,18 +115,22 @@ struct ceph_msg_pos {
131 */ 115 */
132struct ceph_connection { 116struct ceph_connection {
133 void *private; 117 void *private;
134 atomic_t nref;
135 118
136 const struct ceph_connection_operations *ops; 119 const struct ceph_connection_operations *ops;
137 120
138 struct ceph_messenger *msgr; 121 struct ceph_messenger *msgr;
122
123 atomic_t sock_state;
139 struct socket *sock; 124 struct socket *sock;
140 unsigned long state; /* connection state (see flags above) */ 125 struct ceph_entity_addr peer_addr; /* peer address */
126 struct ceph_entity_addr peer_addr_for_me;
127
128 unsigned long flags;
129 unsigned long state;
141 const char *error_msg; /* error message, if any */ 130 const char *error_msg; /* error message, if any */
142 131
143 struct ceph_entity_addr peer_addr; /* peer address */
144 struct ceph_entity_name peer_name; /* peer name */ 132 struct ceph_entity_name peer_name; /* peer name */
145 struct ceph_entity_addr peer_addr_for_me; 133
146 unsigned peer_features; 134 unsigned peer_features;
147 u32 connect_seq; /* identify the most recent connection 135 u32 connect_seq; /* identify the most recent connection
148 attempt for this connection, client */ 136 attempt for this connection, client */
@@ -207,24 +195,26 @@ extern int ceph_msgr_init(void);
207extern void ceph_msgr_exit(void); 195extern void ceph_msgr_exit(void);
208extern void ceph_msgr_flush(void); 196extern void ceph_msgr_flush(void);
209 197
210extern struct ceph_messenger *ceph_messenger_create( 198extern void ceph_messenger_init(struct ceph_messenger *msgr,
211 struct ceph_entity_addr *myaddr, 199 struct ceph_entity_addr *myaddr,
212 u32 features, u32 required); 200 u32 supported_features,
213extern void ceph_messenger_destroy(struct ceph_messenger *); 201 u32 required_features,
202 bool nocrc);
214 203
215extern void ceph_con_init(struct ceph_messenger *msgr, 204extern void ceph_con_init(struct ceph_connection *con, void *private,
216 struct ceph_connection *con); 205 const struct ceph_connection_operations *ops,
206 struct ceph_messenger *msgr);
217extern void ceph_con_open(struct ceph_connection *con, 207extern void ceph_con_open(struct ceph_connection *con,
208 __u8 entity_type, __u64 entity_num,
218 struct ceph_entity_addr *addr); 209 struct ceph_entity_addr *addr);
219extern bool ceph_con_opened(struct ceph_connection *con); 210extern bool ceph_con_opened(struct ceph_connection *con);
220extern void ceph_con_close(struct ceph_connection *con); 211extern void ceph_con_close(struct ceph_connection *con);
221extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); 212extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
222extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg); 213
223extern void ceph_con_revoke_message(struct ceph_connection *con, 214extern void ceph_msg_revoke(struct ceph_msg *msg);
224 struct ceph_msg *msg); 215extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
216
225extern void ceph_con_keepalive(struct ceph_connection *con); 217extern void ceph_con_keepalive(struct ceph_connection *con);
226extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
227extern void ceph_con_put(struct ceph_connection *con);
228 218
229extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 219extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
230 bool can_fail); 220 bool can_fail);
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 545f85917780..2113e3850a4e 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -70,7 +70,7 @@ struct ceph_mon_client {
70 bool hunting; 70 bool hunting;
71 int cur_mon; /* last monitor i contacted */ 71 int cur_mon; /* last monitor i contacted */
72 unsigned long sub_sent, sub_renew_after; 72 unsigned long sub_sent, sub_renew_after;
73 struct ceph_connection *con; 73 struct ceph_connection con;
74 bool have_fsid; 74 bool have_fsid;
75 75
76 /* pending generic requests */ 76 /* pending generic requests */
diff --git a/include/linux/ceph/msgpool.h b/include/linux/ceph/msgpool.h
index a362605f9368..09fa96b43436 100644
--- a/include/linux/ceph/msgpool.h
+++ b/include/linux/ceph/msgpool.h
@@ -11,10 +11,11 @@
11struct ceph_msgpool { 11struct ceph_msgpool {
12 const char *name; 12 const char *name;
13 mempool_t *pool; 13 mempool_t *pool;
14 int type; /* preallocated message type */
14 int front_len; /* preallocated payload size */ 15 int front_len; /* preallocated payload size */
15}; 16};
16 17
17extern int ceph_msgpool_init(struct ceph_msgpool *pool, 18extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
18 int front_len, int size, bool blocking, 19 int front_len, int size, bool blocking,
19 const char *name); 20 const char *name);
20extern void ceph_msgpool_destroy(struct ceph_msgpool *pool); 21extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 7c4750811b96..25baa287cff7 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -154,6 +154,14 @@ struct crush_map {
154 __s32 max_buckets; 154 __s32 max_buckets;
155 __u32 max_rules; 155 __u32 max_rules;
156 __s32 max_devices; 156 __s32 max_devices;
157
158 /* choose local retries before re-descent */
159 __u32 choose_local_tries;
160 /* choose local attempts using a fallback permutation before
161 * re-descent */
162 __u32 choose_local_fallback_tries;
163 /* choose attempts before giving up */
164 __u32 choose_total_tries;
157}; 165};
158 166
159 167
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index ba4323bce0e9..69e38db28e5f 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -17,6 +17,7 @@
17#include <linux/string.h> 17#include <linux/string.h>
18 18
19 19
20#include <linux/ceph/ceph_features.h>
20#include <linux/ceph/libceph.h> 21#include <linux/ceph/libceph.h>
21#include <linux/ceph/debugfs.h> 22#include <linux/ceph/debugfs.h>
22#include <linux/ceph/decode.h> 23#include <linux/ceph/decode.h>
@@ -460,27 +461,23 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
460 client->auth_err = 0; 461 client->auth_err = 0;
461 462
462 client->extra_mon_dispatch = NULL; 463 client->extra_mon_dispatch = NULL;
463 client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT | 464 client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
464 supported_features; 465 supported_features;
465 client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT | 466 client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
466 required_features; 467 required_features;
467 468
468 /* msgr */ 469 /* msgr */
469 if (ceph_test_opt(client, MYIP)) 470 if (ceph_test_opt(client, MYIP))
470 myaddr = &client->options->my_addr; 471 myaddr = &client->options->my_addr;
471 client->msgr = ceph_messenger_create(myaddr, 472 ceph_messenger_init(&client->msgr, myaddr,
472 client->supported_features, 473 client->supported_features,
473 client->required_features); 474 client->required_features,
474 if (IS_ERR(client->msgr)) { 475 ceph_test_opt(client, NOCRC));
475 err = PTR_ERR(client->msgr);
476 goto fail;
477 }
478 client->msgr->nocrc = ceph_test_opt(client, NOCRC);
479 476
480 /* subsystems */ 477 /* subsystems */
481 err = ceph_monc_init(&client->monc, client); 478 err = ceph_monc_init(&client->monc, client);
482 if (err < 0) 479 if (err < 0)
483 goto fail_msgr; 480 goto fail;
484 err = ceph_osdc_init(&client->osdc, client); 481 err = ceph_osdc_init(&client->osdc, client);
485 if (err < 0) 482 if (err < 0)
486 goto fail_monc; 483 goto fail_monc;
@@ -489,8 +486,6 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
489 486
490fail_monc: 487fail_monc:
491 ceph_monc_stop(&client->monc); 488 ceph_monc_stop(&client->monc);
492fail_msgr:
493 ceph_messenger_destroy(client->msgr);
494fail: 489fail:
495 kfree(client); 490 kfree(client);
496 return ERR_PTR(err); 491 return ERR_PTR(err);
@@ -501,6 +496,8 @@ void ceph_destroy_client(struct ceph_client *client)
501{ 496{
502 dout("destroy_client %p\n", client); 497 dout("destroy_client %p\n", client);
503 498
499 atomic_set(&client->msgr.stopping, 1);
500
504 /* unmount */ 501 /* unmount */
505 ceph_osdc_stop(&client->osdc); 502 ceph_osdc_stop(&client->osdc);
506 503
@@ -508,8 +505,6 @@ void ceph_destroy_client(struct ceph_client *client)
508 505
509 ceph_debugfs_client_cleanup(client); 506 ceph_debugfs_client_cleanup(client);
510 507
511 ceph_messenger_destroy(client->msgr);
512
513 ceph_destroy_options(client->options); 508 ceph_destroy_options(client->options);
514 509
515 kfree(client); 510 kfree(client);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index d7edc24333b8..35fce755ce10 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -306,7 +306,6 @@ static int crush_choose(const struct crush_map *map,
306 int item = 0; 306 int item = 0;
307 int itemtype; 307 int itemtype;
308 int collide, reject; 308 int collide, reject;
309 const unsigned int orig_tries = 5; /* attempts before we fall back to search */
310 309
311 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 310 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
312 bucket->id, x, outpos, numrep); 311 bucket->id, x, outpos, numrep);
@@ -351,8 +350,9 @@ static int crush_choose(const struct crush_map *map,
351 reject = 1; 350 reject = 1;
352 goto reject; 351 goto reject;
353 } 352 }
354 if (flocal >= (in->size>>1) && 353 if (map->choose_local_fallback_tries > 0 &&
355 flocal > orig_tries) 354 flocal >= (in->size>>1) &&
355 flocal > map->choose_local_fallback_tries)
356 item = bucket_perm_choose(in, x, r); 356 item = bucket_perm_choose(in, x, r);
357 else 357 else
358 item = crush_bucket_choose(in, x, r); 358 item = crush_bucket_choose(in, x, r);
@@ -422,13 +422,14 @@ reject:
422 ftotal++; 422 ftotal++;
423 flocal++; 423 flocal++;
424 424
425 if (collide && flocal < 3) 425 if (collide && flocal <= map->choose_local_tries)
426 /* retry locally a few times */ 426 /* retry locally a few times */
427 retry_bucket = 1; 427 retry_bucket = 1;
428 else if (flocal <= in->size + orig_tries) 428 else if (map->choose_local_fallback_tries > 0 &&
429 flocal <= in->size + map->choose_local_fallback_tries)
429 /* exhaustive bucket search */ 430 /* exhaustive bucket search */
430 retry_bucket = 1; 431 retry_bucket = 1;
431 else if (ftotal < 20) 432 else if (ftotal <= map->choose_total_tries)
432 /* then retry descent */ 433 /* then retry descent */
433 retry_descent = 1; 434 retry_descent = 1;
434 else 435 else
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 10255e81be79..b9796750034a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -29,6 +29,74 @@
29 * the sender. 29 * the sender.
30 */ 30 */
31 31
32/*
33 * We track the state of the socket on a given connection using
34 * values defined below. The transition to a new socket state is
35 * handled by a function which verifies we aren't coming from an
36 * unexpected state.
37 *
38 * --------
39 * | NEW* | transient initial state
40 * --------
41 * | con_sock_state_init()
42 * v
43 * ----------
44 * | CLOSED | initialized, but no socket (and no
45 * ---------- TCP connection)
46 * ^ \
47 * | \ con_sock_state_connecting()
48 * | ----------------------
49 * | \
50 * + con_sock_state_closed() \
51 * |+--------------------------- \
52 * | \ \ \
53 * | ----------- \ \
54 * | | CLOSING | socket event; \ \
55 * | ----------- await close \ \
56 * | ^ \ |
57 * | | \ |
58 * | + con_sock_state_closing() \ |
59 * | / \ | |
60 * | / --------------- | |
61 * | / \ v v
62 * | / --------------
63 * | / -----------------| CONNECTING | socket created, TCP
64 * | | / -------------- connect initiated
65 * | | | con_sock_state_connected()
66 * | | v
67 * -------------
68 * | CONNECTED | TCP connection established
69 * -------------
70 *
71 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
72 */
73
74#define CON_SOCK_STATE_NEW 0 /* -> CLOSED */
75#define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */
76#define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */
77#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
78#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
79
80/*
81 * connection states
82 */
83#define CON_STATE_CLOSED 1 /* -> PREOPEN */
84#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
85#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
86#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
87#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
88#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
89
90/*
91 * ceph_connection flag bits
92 */
93#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop
94 * messages on errors */
95#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */
96#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */
97#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
98#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
99
32/* static tag bytes (protocol control messages) */ 100/* static tag bytes (protocol control messages) */
33static char tag_msg = CEPH_MSGR_TAG_MSG; 101static char tag_msg = CEPH_MSGR_TAG_MSG;
34static char tag_ack = CEPH_MSGR_TAG_ACK; 102static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -147,72 +215,130 @@ void ceph_msgr_flush(void)
147} 215}
148EXPORT_SYMBOL(ceph_msgr_flush); 216EXPORT_SYMBOL(ceph_msgr_flush);
149 217
218/* Connection socket state transition functions */
219
220static void con_sock_state_init(struct ceph_connection *con)
221{
222 int old_state;
223
224 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
225 if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
226 printk("%s: unexpected old state %d\n", __func__, old_state);
227 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
228 CON_SOCK_STATE_CLOSED);
229}
230
231static void con_sock_state_connecting(struct ceph_connection *con)
232{
233 int old_state;
234
235 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
236 if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
237 printk("%s: unexpected old state %d\n", __func__, old_state);
238 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
239 CON_SOCK_STATE_CONNECTING);
240}
241
242static void con_sock_state_connected(struct ceph_connection *con)
243{
244 int old_state;
245
246 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
247 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
248 printk("%s: unexpected old state %d\n", __func__, old_state);
249 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
250 CON_SOCK_STATE_CONNECTED);
251}
252
253static void con_sock_state_closing(struct ceph_connection *con)
254{
255 int old_state;
256
257 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
258 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
259 old_state != CON_SOCK_STATE_CONNECTED &&
260 old_state != CON_SOCK_STATE_CLOSING))
261 printk("%s: unexpected old state %d\n", __func__, old_state);
262 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
263 CON_SOCK_STATE_CLOSING);
264}
265
266static void con_sock_state_closed(struct ceph_connection *con)
267{
268 int old_state;
269
270 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
271 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
272 old_state != CON_SOCK_STATE_CLOSING &&
273 old_state != CON_SOCK_STATE_CONNECTING &&
274 old_state != CON_SOCK_STATE_CLOSED))
275 printk("%s: unexpected old state %d\n", __func__, old_state);
276 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
277 CON_SOCK_STATE_CLOSED);
278}
150 279
151/* 280/*
152 * socket callback functions 281 * socket callback functions
153 */ 282 */
154 283
155/* data available on socket, or listen socket received a connect */ 284/* data available on socket, or listen socket received a connect */
156static void ceph_data_ready(struct sock *sk, int count_unused) 285static void ceph_sock_data_ready(struct sock *sk, int count_unused)
157{ 286{
158 struct ceph_connection *con = sk->sk_user_data; 287 struct ceph_connection *con = sk->sk_user_data;
288 if (atomic_read(&con->msgr->stopping)) {
289 return;
290 }
159 291
160 if (sk->sk_state != TCP_CLOSE_WAIT) { 292 if (sk->sk_state != TCP_CLOSE_WAIT) {
161 dout("ceph_data_ready on %p state = %lu, queueing work\n", 293 dout("%s on %p state = %lu, queueing work\n", __func__,
162 con, con->state); 294 con, con->state);
163 queue_con(con); 295 queue_con(con);
164 } 296 }
165} 297}
166 298
167/* socket has buffer space for writing */ 299/* socket has buffer space for writing */
168static void ceph_write_space(struct sock *sk) 300static void ceph_sock_write_space(struct sock *sk)
169{ 301{
170 struct ceph_connection *con = sk->sk_user_data; 302 struct ceph_connection *con = sk->sk_user_data;
171 303
172 /* only queue to workqueue if there is data we want to write, 304 /* only queue to workqueue if there is data we want to write,
173 * and there is sufficient space in the socket buffer to accept 305 * and there is sufficient space in the socket buffer to accept
174 * more data. clear SOCK_NOSPACE so that ceph_write_space() 306 * more data. clear SOCK_NOSPACE so that ceph_sock_write_space()
175 * doesn't get called again until try_write() fills the socket 307 * doesn't get called again until try_write() fills the socket
176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 308 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
177 * and net/core/stream.c:sk_stream_write_space(). 309 * and net/core/stream.c:sk_stream_write_space().
178 */ 310 */
179 if (test_bit(WRITE_PENDING, &con->state)) { 311 if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 312 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
181 dout("ceph_write_space %p queueing write work\n", con); 313 dout("%s %p queueing write work\n", __func__, con);
182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 314 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
183 queue_con(con); 315 queue_con(con);
184 } 316 }
185 } else { 317 } else {
186 dout("ceph_write_space %p nothing to write\n", con); 318 dout("%s %p nothing to write\n", __func__, con);
187 } 319 }
188} 320}
189 321
190/* socket's state has changed */ 322/* socket's state has changed */
191static void ceph_state_change(struct sock *sk) 323static void ceph_sock_state_change(struct sock *sk)
192{ 324{
193 struct ceph_connection *con = sk->sk_user_data; 325 struct ceph_connection *con = sk->sk_user_data;
194 326
195 dout("ceph_state_change %p state = %lu sk_state = %u\n", 327 dout("%s %p state = %lu sk_state = %u\n", __func__,
196 con, con->state, sk->sk_state); 328 con, con->state, sk->sk_state);
197 329
198 if (test_bit(CLOSED, &con->state))
199 return;
200
201 switch (sk->sk_state) { 330 switch (sk->sk_state) {
202 case TCP_CLOSE: 331 case TCP_CLOSE:
203 dout("ceph_state_change TCP_CLOSE\n"); 332 dout("%s TCP_CLOSE\n", __func__);
204 case TCP_CLOSE_WAIT: 333 case TCP_CLOSE_WAIT:
205 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 334 dout("%s TCP_CLOSE_WAIT\n", __func__);
206 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 335 con_sock_state_closing(con);
207 if (test_bit(CONNECTING, &con->state)) 336 set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
208 con->error_msg = "connection failed"; 337 queue_con(con);
209 else
210 con->error_msg = "socket closed";
211 queue_con(con);
212 }
213 break; 338 break;
214 case TCP_ESTABLISHED: 339 case TCP_ESTABLISHED:
215 dout("ceph_state_change TCP_ESTABLISHED\n"); 340 dout("%s TCP_ESTABLISHED\n", __func__);
341 con_sock_state_connected(con);
216 queue_con(con); 342 queue_con(con);
217 break; 343 break;
218 default: /* Everything else is uninteresting */ 344 default: /* Everything else is uninteresting */
@@ -228,9 +354,9 @@ static void set_sock_callbacks(struct socket *sock,
228{ 354{
229 struct sock *sk = sock->sk; 355 struct sock *sk = sock->sk;
230 sk->sk_user_data = con; 356 sk->sk_user_data = con;
231 sk->sk_data_ready = ceph_data_ready; 357 sk->sk_data_ready = ceph_sock_data_ready;
232 sk->sk_write_space = ceph_write_space; 358 sk->sk_write_space = ceph_sock_write_space;
233 sk->sk_state_change = ceph_state_change; 359 sk->sk_state_change = ceph_sock_state_change;
234} 360}
235 361
236 362
@@ -262,6 +388,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
262 388
263 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 389 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
264 390
391 con_sock_state_connecting(con);
265 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 392 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
266 O_NONBLOCK); 393 O_NONBLOCK);
267 if (ret == -EINPROGRESS) { 394 if (ret == -EINPROGRESS) {
@@ -277,7 +404,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
277 return ret; 404 return ret;
278 } 405 }
279 con->sock = sock; 406 con->sock = sock;
280
281 return 0; 407 return 0;
282} 408}
283 409
@@ -333,16 +459,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
333 */ 459 */
334static int con_close_socket(struct ceph_connection *con) 460static int con_close_socket(struct ceph_connection *con)
335{ 461{
336 int rc; 462 int rc = 0;
337 463
338 dout("con_close_socket on %p sock %p\n", con, con->sock); 464 dout("con_close_socket on %p sock %p\n", con, con->sock);
339 if (!con->sock) 465 if (con->sock) {
340 return 0; 466 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
341 set_bit(SOCK_CLOSED, &con->state); 467 sock_release(con->sock);
342 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 468 con->sock = NULL;
343 sock_release(con->sock); 469 }
344 con->sock = NULL; 470
345 clear_bit(SOCK_CLOSED, &con->state); 471 /*
472 * Forcibly clear the SOCK_CLOSED flag. It gets set
473 * independent of the connection mutex, and we could have
474 * received a socket close event before we had the chance to
475 * shut the socket down.
476 */
477 clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
478
479 con_sock_state_closed(con);
346 return rc; 480 return rc;
347} 481}
348 482
@@ -353,6 +487,10 @@ static int con_close_socket(struct ceph_connection *con)
353static void ceph_msg_remove(struct ceph_msg *msg) 487static void ceph_msg_remove(struct ceph_msg *msg)
354{ 488{
355 list_del_init(&msg->list_head); 489 list_del_init(&msg->list_head);
490 BUG_ON(msg->con == NULL);
491 msg->con->ops->put(msg->con);
492 msg->con = NULL;
493
356 ceph_msg_put(msg); 494 ceph_msg_put(msg);
357} 495}
358static void ceph_msg_remove_list(struct list_head *head) 496static void ceph_msg_remove_list(struct list_head *head)
@@ -372,8 +510,11 @@ static void reset_connection(struct ceph_connection *con)
372 ceph_msg_remove_list(&con->out_sent); 510 ceph_msg_remove_list(&con->out_sent);
373 511
374 if (con->in_msg) { 512 if (con->in_msg) {
513 BUG_ON(con->in_msg->con != con);
514 con->in_msg->con = NULL;
375 ceph_msg_put(con->in_msg); 515 ceph_msg_put(con->in_msg);
376 con->in_msg = NULL; 516 con->in_msg = NULL;
517 con->ops->put(con);
377 } 518 }
378 519
379 con->connect_seq = 0; 520 con->connect_seq = 0;
@@ -391,32 +532,44 @@ static void reset_connection(struct ceph_connection *con)
391 */ 532 */
392void ceph_con_close(struct ceph_connection *con) 533void ceph_con_close(struct ceph_connection *con)
393{ 534{
535 mutex_lock(&con->mutex);
394 dout("con_close %p peer %s\n", con, 536 dout("con_close %p peer %s\n", con,
395 ceph_pr_addr(&con->peer_addr.in_addr)); 537 ceph_pr_addr(&con->peer_addr.in_addr));
396 set_bit(CLOSED, &con->state); /* in case there's queued work */ 538 con->state = CON_STATE_CLOSED;
397 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 539
398 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 540 clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
399 clear_bit(KEEPALIVE_PENDING, &con->state); 541 clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
400 clear_bit(WRITE_PENDING, &con->state); 542 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
401 mutex_lock(&con->mutex); 543 clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
544 clear_bit(CON_FLAG_BACKOFF, &con->flags);
545
402 reset_connection(con); 546 reset_connection(con);
403 con->peer_global_seq = 0; 547 con->peer_global_seq = 0;
404 cancel_delayed_work(&con->work); 548 cancel_delayed_work(&con->work);
549 con_close_socket(con);
405 mutex_unlock(&con->mutex); 550 mutex_unlock(&con->mutex);
406 queue_con(con);
407} 551}
408EXPORT_SYMBOL(ceph_con_close); 552EXPORT_SYMBOL(ceph_con_close);
409 553
410/* 554/*
411 * Reopen a closed connection, with a new peer address. 555 * Reopen a closed connection, with a new peer address.
412 */ 556 */
413void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 557void ceph_con_open(struct ceph_connection *con,
558 __u8 entity_type, __u64 entity_num,
559 struct ceph_entity_addr *addr)
414{ 560{
561 mutex_lock(&con->mutex);
415 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 562 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
416 set_bit(OPENING, &con->state); 563
417 clear_bit(CLOSED, &con->state); 564 BUG_ON(con->state != CON_STATE_CLOSED);
565 con->state = CON_STATE_PREOPEN;
566
567 con->peer_name.type = (__u8) entity_type;
568 con->peer_name.num = cpu_to_le64(entity_num);
569
418 memcpy(&con->peer_addr, addr, sizeof(*addr)); 570 memcpy(&con->peer_addr, addr, sizeof(*addr));
419 con->delay = 0; /* reset backoff memory */ 571 con->delay = 0; /* reset backoff memory */
572 mutex_unlock(&con->mutex);
420 queue_con(con); 573 queue_con(con);
421} 574}
422EXPORT_SYMBOL(ceph_con_open); 575EXPORT_SYMBOL(ceph_con_open);
@@ -430,42 +583,26 @@ bool ceph_con_opened(struct ceph_connection *con)
430} 583}
431 584
432/* 585/*
433 * generic get/put
434 */
435struct ceph_connection *ceph_con_get(struct ceph_connection *con)
436{
437 int nref = __atomic_add_unless(&con->nref, 1, 0);
438
439 dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
440
441 return nref ? con : NULL;
442}
443
444void ceph_con_put(struct ceph_connection *con)
445{
446 int nref = atomic_dec_return(&con->nref);
447
448 BUG_ON(nref < 0);
449 if (nref == 0) {
450 BUG_ON(con->sock);
451 kfree(con);
452 }
453 dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
454}
455
456/*
457 * initialize a new connection. 586 * initialize a new connection.
458 */ 587 */
459void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 588void ceph_con_init(struct ceph_connection *con, void *private,
589 const struct ceph_connection_operations *ops,
590 struct ceph_messenger *msgr)
460{ 591{
461 dout("con_init %p\n", con); 592 dout("con_init %p\n", con);
462 memset(con, 0, sizeof(*con)); 593 memset(con, 0, sizeof(*con));
463 atomic_set(&con->nref, 1); 594 con->private = private;
595 con->ops = ops;
464 con->msgr = msgr; 596 con->msgr = msgr;
597
598 con_sock_state_init(con);
599
465 mutex_init(&con->mutex); 600 mutex_init(&con->mutex);
466 INIT_LIST_HEAD(&con->out_queue); 601 INIT_LIST_HEAD(&con->out_queue);
467 INIT_LIST_HEAD(&con->out_sent); 602 INIT_LIST_HEAD(&con->out_sent);
468 INIT_DELAYED_WORK(&con->work, con_work); 603 INIT_DELAYED_WORK(&con->work, con_work);
604
605 con->state = CON_STATE_CLOSED;
469} 606}
470EXPORT_SYMBOL(ceph_con_init); 607EXPORT_SYMBOL(ceph_con_init);
471 608
@@ -486,14 +623,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
486 return ret; 623 return ret;
487} 624}
488 625
489static void ceph_con_out_kvec_reset(struct ceph_connection *con) 626static void con_out_kvec_reset(struct ceph_connection *con)
490{ 627{
491 con->out_kvec_left = 0; 628 con->out_kvec_left = 0;
492 con->out_kvec_bytes = 0; 629 con->out_kvec_bytes = 0;
493 con->out_kvec_cur = &con->out_kvec[0]; 630 con->out_kvec_cur = &con->out_kvec[0];
494} 631}
495 632
496static void ceph_con_out_kvec_add(struct ceph_connection *con, 633static void con_out_kvec_add(struct ceph_connection *con,
497 size_t size, void *data) 634 size_t size, void *data)
498{ 635{
499 int index; 636 int index;
@@ -507,6 +644,53 @@ static void ceph_con_out_kvec_add(struct ceph_connection *con,
507 con->out_kvec_bytes += size; 644 con->out_kvec_bytes += size;
508} 645}
509 646
647#ifdef CONFIG_BLOCK
648static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
649{
650 if (!bio) {
651 *iter = NULL;
652 *seg = 0;
653 return;
654 }
655 *iter = bio;
656 *seg = bio->bi_idx;
657}
658
659static void iter_bio_next(struct bio **bio_iter, int *seg)
660{
661 if (*bio_iter == NULL)
662 return;
663
664 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
665
666 (*seg)++;
667 if (*seg == (*bio_iter)->bi_vcnt)
668 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
669}
670#endif
671
672static void prepare_write_message_data(struct ceph_connection *con)
673{
674 struct ceph_msg *msg = con->out_msg;
675
676 BUG_ON(!msg);
677 BUG_ON(!msg->hdr.data_len);
678
679 /* initialize page iterator */
680 con->out_msg_pos.page = 0;
681 if (msg->pages)
682 con->out_msg_pos.page_pos = msg->page_alignment;
683 else
684 con->out_msg_pos.page_pos = 0;
685#ifdef CONFIG_BLOCK
686 if (msg->bio)
687 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
688#endif
689 con->out_msg_pos.data_pos = 0;
690 con->out_msg_pos.did_page_crc = false;
691 con->out_more = 1; /* data + footer will follow */
692}
693
510/* 694/*
511 * Prepare footer for currently outgoing message, and finish things 695 * Prepare footer for currently outgoing message, and finish things
512 * off. Assumes out_kvec* are already valid.. we just add on to the end. 696 * off. Assumes out_kvec* are already valid.. we just add on to the end.
@@ -516,6 +700,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
516 struct ceph_msg *m = con->out_msg; 700 struct ceph_msg *m = con->out_msg;
517 int v = con->out_kvec_left; 701 int v = con->out_kvec_left;
518 702
703 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
704
519 dout("prepare_write_message_footer %p\n", con); 705 dout("prepare_write_message_footer %p\n", con);
520 con->out_kvec_is_msg = true; 706 con->out_kvec_is_msg = true;
521 con->out_kvec[v].iov_base = &m->footer; 707 con->out_kvec[v].iov_base = &m->footer;
@@ -534,7 +720,7 @@ static void prepare_write_message(struct ceph_connection *con)
534 struct ceph_msg *m; 720 struct ceph_msg *m;
535 u32 crc; 721 u32 crc;
536 722
537 ceph_con_out_kvec_reset(con); 723 con_out_kvec_reset(con);
538 con->out_kvec_is_msg = true; 724 con->out_kvec_is_msg = true;
539 con->out_msg_done = false; 725 con->out_msg_done = false;
540 726
@@ -542,14 +728,16 @@ static void prepare_write_message(struct ceph_connection *con)
542 * TCP packet that's a good thing. */ 728 * TCP packet that's a good thing. */
543 if (con->in_seq > con->in_seq_acked) { 729 if (con->in_seq > con->in_seq_acked) {
544 con->in_seq_acked = con->in_seq; 730 con->in_seq_acked = con->in_seq;
545 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 731 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
546 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 732 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
547 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 733 con_out_kvec_add(con, sizeof (con->out_temp_ack),
548 &con->out_temp_ack); 734 &con->out_temp_ack);
549 } 735 }
550 736
737 BUG_ON(list_empty(&con->out_queue));
551 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 738 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
552 con->out_msg = m; 739 con->out_msg = m;
740 BUG_ON(m->con != con);
553 741
554 /* put message on sent list */ 742 /* put message on sent list */
555 ceph_msg_get(m); 743 ceph_msg_get(m);
@@ -576,18 +764,18 @@ static void prepare_write_message(struct ceph_connection *con)
576 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 764 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
577 765
578 /* tag + hdr + front + middle */ 766 /* tag + hdr + front + middle */
579 ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 767 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
580 ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); 768 con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
581 ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 769 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
582 770
583 if (m->middle) 771 if (m->middle)
584 ceph_con_out_kvec_add(con, m->middle->vec.iov_len, 772 con_out_kvec_add(con, m->middle->vec.iov_len,
585 m->middle->vec.iov_base); 773 m->middle->vec.iov_base);
586 774
587 /* fill in crc (except data pages), footer */ 775 /* fill in crc (except data pages), footer */
588 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 776 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
589 con->out_msg->hdr.crc = cpu_to_le32(crc); 777 con->out_msg->hdr.crc = cpu_to_le32(crc);
590 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 778 con->out_msg->footer.flags = 0;
591 779
592 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 780 crc = crc32c(0, m->front.iov_base, m->front.iov_len);
593 con->out_msg->footer.front_crc = cpu_to_le32(crc); 781 con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -597,28 +785,19 @@ static void prepare_write_message(struct ceph_connection *con)
597 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 785 con->out_msg->footer.middle_crc = cpu_to_le32(crc);
598 } else 786 } else
599 con->out_msg->footer.middle_crc = 0; 787 con->out_msg->footer.middle_crc = 0;
600 con->out_msg->footer.data_crc = 0; 788 dout("%s front_crc %u middle_crc %u\n", __func__,
601 dout("prepare_write_message front_crc %u data_crc %u\n",
602 le32_to_cpu(con->out_msg->footer.front_crc), 789 le32_to_cpu(con->out_msg->footer.front_crc),
603 le32_to_cpu(con->out_msg->footer.middle_crc)); 790 le32_to_cpu(con->out_msg->footer.middle_crc));
604 791
605 /* is there a data payload? */ 792 /* is there a data payload? */
606 if (le32_to_cpu(m->hdr.data_len) > 0) { 793 con->out_msg->footer.data_crc = 0;
607 /* initialize page iterator */ 794 if (m->hdr.data_len)
608 con->out_msg_pos.page = 0; 795 prepare_write_message_data(con);
609 if (m->pages) 796 else
610 con->out_msg_pos.page_pos = m->page_alignment;
611 else
612 con->out_msg_pos.page_pos = 0;
613 con->out_msg_pos.data_pos = 0;
614 con->out_msg_pos.did_page_crc = false;
615 con->out_more = 1; /* data + footer will follow */
616 } else {
617 /* no, queue up footer too and be done */ 797 /* no, queue up footer too and be done */
618 prepare_write_message_footer(con); 798 prepare_write_message_footer(con);
619 }
620 799
621 set_bit(WRITE_PENDING, &con->state); 800 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
622} 801}
623 802
624/* 803/*
@@ -630,16 +809,16 @@ static void prepare_write_ack(struct ceph_connection *con)
630 con->in_seq_acked, con->in_seq); 809 con->in_seq_acked, con->in_seq);
631 con->in_seq_acked = con->in_seq; 810 con->in_seq_acked = con->in_seq;
632 811
633 ceph_con_out_kvec_reset(con); 812 con_out_kvec_reset(con);
634 813
635 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 814 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
636 815
637 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 816 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
638 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 817 con_out_kvec_add(con, sizeof (con->out_temp_ack),
639 &con->out_temp_ack); 818 &con->out_temp_ack);
640 819
641 con->out_more = 1; /* more will follow.. eventually.. */ 820 con->out_more = 1; /* more will follow.. eventually.. */
642 set_bit(WRITE_PENDING, &con->state); 821 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
643} 822}
644 823
645/* 824/*
@@ -648,9 +827,9 @@ static void prepare_write_ack(struct ceph_connection *con)
648static void prepare_write_keepalive(struct ceph_connection *con) 827static void prepare_write_keepalive(struct ceph_connection *con)
649{ 828{
650 dout("prepare_write_keepalive %p\n", con); 829 dout("prepare_write_keepalive %p\n", con);
651 ceph_con_out_kvec_reset(con); 830 con_out_kvec_reset(con);
652 ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 831 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
653 set_bit(WRITE_PENDING, &con->state); 832 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
654} 833}
655 834
656/* 835/*
@@ -665,27 +844,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
665 if (!con->ops->get_authorizer) { 844 if (!con->ops->get_authorizer) {
666 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 845 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
667 con->out_connect.authorizer_len = 0; 846 con->out_connect.authorizer_len = 0;
668
669 return NULL; 847 return NULL;
670 } 848 }
671 849
672 /* Can't hold the mutex while getting authorizer */ 850 /* Can't hold the mutex while getting authorizer */
673
674 mutex_unlock(&con->mutex); 851 mutex_unlock(&con->mutex);
675
676 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 852 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
677
678 mutex_lock(&con->mutex); 853 mutex_lock(&con->mutex);
679 854
680 if (IS_ERR(auth)) 855 if (IS_ERR(auth))
681 return auth; 856 return auth;
682 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) 857 if (con->state != CON_STATE_NEGOTIATING)
683 return ERR_PTR(-EAGAIN); 858 return ERR_PTR(-EAGAIN);
684 859
685 con->auth_reply_buf = auth->authorizer_reply_buf; 860 con->auth_reply_buf = auth->authorizer_reply_buf;
686 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 861 con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
687
688
689 return auth; 862 return auth;
690} 863}
691 864
@@ -694,12 +867,12 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
694 */ 867 */
695static void prepare_write_banner(struct ceph_connection *con) 868static void prepare_write_banner(struct ceph_connection *con)
696{ 869{
697 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 870 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
698 ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 871 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
699 &con->msgr->my_enc_addr); 872 &con->msgr->my_enc_addr);
700 873
701 con->out_more = 0; 874 con->out_more = 0;
702 set_bit(WRITE_PENDING, &con->state); 875 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
703} 876}
704 877
705static int prepare_write_connect(struct ceph_connection *con) 878static int prepare_write_connect(struct ceph_connection *con)
@@ -742,14 +915,15 @@ static int prepare_write_connect(struct ceph_connection *con)
742 con->out_connect.authorizer_len = auth ? 915 con->out_connect.authorizer_len = auth ?
743 cpu_to_le32(auth->authorizer_buf_len) : 0; 916 cpu_to_le32(auth->authorizer_buf_len) : 0;
744 917
745 ceph_con_out_kvec_add(con, sizeof (con->out_connect), 918 con_out_kvec_reset(con);
919 con_out_kvec_add(con, sizeof (con->out_connect),
746 &con->out_connect); 920 &con->out_connect);
747 if (auth && auth->authorizer_buf_len) 921 if (auth && auth->authorizer_buf_len)
748 ceph_con_out_kvec_add(con, auth->authorizer_buf_len, 922 con_out_kvec_add(con, auth->authorizer_buf_len,
749 auth->authorizer_buf); 923 auth->authorizer_buf);
750 924
751 con->out_more = 0; 925 con->out_more = 0;
752 set_bit(WRITE_PENDING, &con->state); 926 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
753 927
754 return 0; 928 return 0;
755} 929}
@@ -797,30 +971,34 @@ out:
797 return ret; /* done! */ 971 return ret; /* done! */
798} 972}
799 973
800#ifdef CONFIG_BLOCK 974static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
801static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 975 size_t len, size_t sent, bool in_trail)
802{ 976{
803 if (!bio) { 977 struct ceph_msg *msg = con->out_msg;
804 *iter = NULL;
805 *seg = 0;
806 return;
807 }
808 *iter = bio;
809 *seg = bio->bi_idx;
810}
811 978
812static void iter_bio_next(struct bio **bio_iter, int *seg) 979 BUG_ON(!msg);
813{ 980 BUG_ON(!sent);
814 if (*bio_iter == NULL)
815 return;
816 981
817 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 982 con->out_msg_pos.data_pos += sent;
983 con->out_msg_pos.page_pos += sent;
984 if (sent < len)
985 return;
818 986
819 (*seg)++; 987 BUG_ON(sent != len);
820 if (*seg == (*bio_iter)->bi_vcnt) 988 con->out_msg_pos.page_pos = 0;
821 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 989 con->out_msg_pos.page++;
822} 990 con->out_msg_pos.did_page_crc = false;
991 if (in_trail)
992 list_move_tail(&page->lru,
993 &msg->trail->head);
994 else if (msg->pagelist)
995 list_move_tail(&page->lru,
996 &msg->pagelist->head);
997#ifdef CONFIG_BLOCK
998 else if (msg->bio)
999 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
823#endif 1000#endif
1001}
824 1002
825/* 1003/*
826 * Write as much message data payload as we can. If we finish, queue 1004 * Write as much message data payload as we can. If we finish, queue
@@ -837,41 +1015,36 @@ static int write_partial_msg_pages(struct ceph_connection *con)
837 bool do_datacrc = !con->msgr->nocrc; 1015 bool do_datacrc = !con->msgr->nocrc;
838 int ret; 1016 int ret;
839 int total_max_write; 1017 int total_max_write;
840 int in_trail = 0; 1018 bool in_trail = false;
841 size_t trail_len = (msg->trail ? msg->trail->length : 0); 1019 const size_t trail_len = (msg->trail ? msg->trail->length : 0);
1020 const size_t trail_off = data_len - trail_len;
842 1021
843 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 1022 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
844 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 1023 con, msg, con->out_msg_pos.page, msg->nr_pages,
845 con->out_msg_pos.page_pos); 1024 con->out_msg_pos.page_pos);
846 1025
847#ifdef CONFIG_BLOCK 1026 /*
848 if (msg->bio && !msg->bio_iter) 1027 * Iterate through each page that contains data to be
849 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 1028 * written, and send as much as possible for each.
850#endif 1029 *
851 1030 * If we are calculating the data crc (the default), we will
1031 * need to map the page. If we have no pages, they have
1032 * been revoked, so use the zero page.
1033 */
852 while (data_len > con->out_msg_pos.data_pos) { 1034 while (data_len > con->out_msg_pos.data_pos) {
853 struct page *page = NULL; 1035 struct page *page = NULL;
854 int max_write = PAGE_SIZE; 1036 int max_write = PAGE_SIZE;
855 int bio_offset = 0; 1037 int bio_offset = 0;
856 1038
857 total_max_write = data_len - trail_len - 1039 in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
858 con->out_msg_pos.data_pos; 1040 if (!in_trail)
859 1041 total_max_write = trail_off - con->out_msg_pos.data_pos;
860 /*
861 * if we are calculating the data crc (the default), we need
862 * to map the page. if our pages[] has been revoked, use the
863 * zero page.
864 */
865
866 /* have we reached the trail part of the data? */
867 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
868 in_trail = 1;
869 1042
1043 if (in_trail) {
870 total_max_write = data_len - con->out_msg_pos.data_pos; 1044 total_max_write = data_len - con->out_msg_pos.data_pos;
871 1045
872 page = list_first_entry(&msg->trail->head, 1046 page = list_first_entry(&msg->trail->head,
873 struct page, lru); 1047 struct page, lru);
874 max_write = PAGE_SIZE;
875 } else if (msg->pages) { 1048 } else if (msg->pages) {
876 page = msg->pages[con->out_msg_pos.page]; 1049 page = msg->pages[con->out_msg_pos.page];
877 } else if (msg->pagelist) { 1050 } else if (msg->pagelist) {
@@ -894,15 +1067,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
894 1067
895 if (do_datacrc && !con->out_msg_pos.did_page_crc) { 1068 if (do_datacrc && !con->out_msg_pos.did_page_crc) {
896 void *base; 1069 void *base;
897 u32 crc; 1070 u32 crc = le32_to_cpu(msg->footer.data_crc);
898 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
899 char *kaddr; 1071 char *kaddr;
900 1072
901 kaddr = kmap(page); 1073 kaddr = kmap(page);
902 BUG_ON(kaddr == NULL); 1074 BUG_ON(kaddr == NULL);
903 base = kaddr + con->out_msg_pos.page_pos + bio_offset; 1075 base = kaddr + con->out_msg_pos.page_pos + bio_offset;
904 crc = crc32c(tmpcrc, base, len); 1076 crc = crc32c(crc, base, len);
905 con->out_msg->footer.data_crc = cpu_to_le32(crc); 1077 msg->footer.data_crc = cpu_to_le32(crc);
906 con->out_msg_pos.did_page_crc = true; 1078 con->out_msg_pos.did_page_crc = true;
907 } 1079 }
908 ret = ceph_tcp_sendpage(con->sock, page, 1080 ret = ceph_tcp_sendpage(con->sock, page,
@@ -915,31 +1087,15 @@ static int write_partial_msg_pages(struct ceph_connection *con)
915 if (ret <= 0) 1087 if (ret <= 0)
916 goto out; 1088 goto out;
917 1089
918 con->out_msg_pos.data_pos += ret; 1090 out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
919 con->out_msg_pos.page_pos += ret;
920 if (ret == len) {
921 con->out_msg_pos.page_pos = 0;
922 con->out_msg_pos.page++;
923 con->out_msg_pos.did_page_crc = false;
924 if (in_trail)
925 list_move_tail(&page->lru,
926 &msg->trail->head);
927 else if (msg->pagelist)
928 list_move_tail(&page->lru,
929 &msg->pagelist->head);
930#ifdef CONFIG_BLOCK
931 else if (msg->bio)
932 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
933#endif
934 }
935 } 1091 }
936 1092
937 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 1093 dout("write_partial_msg_pages %p msg %p done\n", con, msg);
938 1094
939 /* prepare and queue up footer, too */ 1095 /* prepare and queue up footer, too */
940 if (!do_datacrc) 1096 if (!do_datacrc)
941 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1097 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
942 ceph_con_out_kvec_reset(con); 1098 con_out_kvec_reset(con);
943 prepare_write_message_footer(con); 1099 prepare_write_message_footer(con);
944 ret = 1; 1100 ret = 1;
945out: 1101out:
@@ -1351,20 +1507,14 @@ static int process_banner(struct ceph_connection *con)
1351 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1507 ceph_pr_addr(&con->msgr->inst.addr.in_addr));
1352 } 1508 }
1353 1509
1354 set_bit(NEGOTIATING, &con->state);
1355 prepare_read_connect(con);
1356 return 0; 1510 return 0;
1357} 1511}
1358 1512
1359static void fail_protocol(struct ceph_connection *con) 1513static void fail_protocol(struct ceph_connection *con)
1360{ 1514{
1361 reset_connection(con); 1515 reset_connection(con);
1362 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1516 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1363 1517 con->state = CON_STATE_CLOSED;
1364 mutex_unlock(&con->mutex);
1365 if (con->ops->bad_proto)
1366 con->ops->bad_proto(con);
1367 mutex_lock(&con->mutex);
1368} 1518}
1369 1519
1370static int process_connect(struct ceph_connection *con) 1520static int process_connect(struct ceph_connection *con)
@@ -1407,7 +1557,6 @@ static int process_connect(struct ceph_connection *con)
1407 return -1; 1557 return -1;
1408 } 1558 }
1409 con->auth_retry = 1; 1559 con->auth_retry = 1;
1410 ceph_con_out_kvec_reset(con);
1411 ret = prepare_write_connect(con); 1560 ret = prepare_write_connect(con);
1412 if (ret < 0) 1561 if (ret < 0)
1413 return ret; 1562 return ret;
@@ -1428,7 +1577,6 @@ static int process_connect(struct ceph_connection *con)
1428 ENTITY_NAME(con->peer_name), 1577 ENTITY_NAME(con->peer_name),
1429 ceph_pr_addr(&con->peer_addr.in_addr)); 1578 ceph_pr_addr(&con->peer_addr.in_addr));
1430 reset_connection(con); 1579 reset_connection(con);
1431 ceph_con_out_kvec_reset(con);
1432 ret = prepare_write_connect(con); 1580 ret = prepare_write_connect(con);
1433 if (ret < 0) 1581 if (ret < 0)
1434 return ret; 1582 return ret;
@@ -1440,8 +1588,7 @@ static int process_connect(struct ceph_connection *con)
1440 if (con->ops->peer_reset) 1588 if (con->ops->peer_reset)
1441 con->ops->peer_reset(con); 1589 con->ops->peer_reset(con);
1442 mutex_lock(&con->mutex); 1590 mutex_lock(&con->mutex);
1443 if (test_bit(CLOSED, &con->state) || 1591 if (con->state != CON_STATE_NEGOTIATING)
1444 test_bit(OPENING, &con->state))
1445 return -EAGAIN; 1592 return -EAGAIN;
1446 break; 1593 break;
1447 1594
@@ -1454,7 +1601,6 @@ static int process_connect(struct ceph_connection *con)
1454 le32_to_cpu(con->out_connect.connect_seq), 1601 le32_to_cpu(con->out_connect.connect_seq),
1455 le32_to_cpu(con->in_reply.connect_seq)); 1602 le32_to_cpu(con->in_reply.connect_seq));
1456 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); 1603 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
1457 ceph_con_out_kvec_reset(con);
1458 ret = prepare_write_connect(con); 1604 ret = prepare_write_connect(con);
1459 if (ret < 0) 1605 if (ret < 0)
1460 return ret; 1606 return ret;
@@ -1471,7 +1617,6 @@ static int process_connect(struct ceph_connection *con)
1471 le32_to_cpu(con->in_reply.global_seq)); 1617 le32_to_cpu(con->in_reply.global_seq));
1472 get_global_seq(con->msgr, 1618 get_global_seq(con->msgr,
1473 le32_to_cpu(con->in_reply.global_seq)); 1619 le32_to_cpu(con->in_reply.global_seq));
1474 ceph_con_out_kvec_reset(con);
1475 ret = prepare_write_connect(con); 1620 ret = prepare_write_connect(con);
1476 if (ret < 0) 1621 if (ret < 0)
1477 return ret; 1622 return ret;
@@ -1489,7 +1634,10 @@ static int process_connect(struct ceph_connection *con)
1489 fail_protocol(con); 1634 fail_protocol(con);
1490 return -1; 1635 return -1;
1491 } 1636 }
1492 clear_bit(CONNECTING, &con->state); 1637
1638 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1639 con->state = CON_STATE_OPEN;
1640
1493 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1641 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1494 con->connect_seq++; 1642 con->connect_seq++;
1495 con->peer_features = server_feat; 1643 con->peer_features = server_feat;
@@ -1501,7 +1649,9 @@ static int process_connect(struct ceph_connection *con)
1501 le32_to_cpu(con->in_reply.connect_seq)); 1649 le32_to_cpu(con->in_reply.connect_seq));
1502 1650
1503 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1651 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1504 set_bit(LOSSYTX, &con->state); 1652 set_bit(CON_FLAG_LOSSYTX, &con->flags);
1653
1654 con->delay = 0; /* reset backoff memory */
1505 1655
1506 prepare_read_tag(con); 1656 prepare_read_tag(con);
1507 break; 1657 break;
@@ -1587,10 +1737,7 @@ static int read_partial_message_section(struct ceph_connection *con,
1587 return 1; 1737 return 1;
1588} 1738}
1589 1739
1590static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1740static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
1591 struct ceph_msg_header *hdr,
1592 int *skip);
1593
1594 1741
1595static int read_partial_message_pages(struct ceph_connection *con, 1742static int read_partial_message_pages(struct ceph_connection *con,
1596 struct page **pages, 1743 struct page **pages,
@@ -1633,9 +1780,6 @@ static int read_partial_message_bio(struct ceph_connection *con,
1633 void *p; 1780 void *p;
1634 int ret, left; 1781 int ret, left;
1635 1782
1636 if (IS_ERR(bv))
1637 return PTR_ERR(bv);
1638
1639 left = min((int)(data_len - con->in_msg_pos.data_pos), 1783 left = min((int)(data_len - con->in_msg_pos.data_pos),
1640 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1784 (int)(bv->bv_len - con->in_msg_pos.page_pos));
1641 1785
@@ -1672,7 +1816,6 @@ static int read_partial_message(struct ceph_connection *con)
1672 int ret; 1816 int ret;
1673 unsigned int front_len, middle_len, data_len; 1817 unsigned int front_len, middle_len, data_len;
1674 bool do_datacrc = !con->msgr->nocrc; 1818 bool do_datacrc = !con->msgr->nocrc;
1675 int skip;
1676 u64 seq; 1819 u64 seq;
1677 u32 crc; 1820 u32 crc;
1678 1821
@@ -1723,10 +1866,13 @@ static int read_partial_message(struct ceph_connection *con)
1723 1866
1724 /* allocate message? */ 1867 /* allocate message? */
1725 if (!con->in_msg) { 1868 if (!con->in_msg) {
1869 int skip = 0;
1870
1726 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1871 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1727 con->in_hdr.front_len, con->in_hdr.data_len); 1872 con->in_hdr.front_len, con->in_hdr.data_len);
1728 skip = 0; 1873 ret = ceph_con_in_msg_alloc(con, &skip);
1729 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1874 if (ret < 0)
1875 return ret;
1730 if (skip) { 1876 if (skip) {
1731 /* skip this message */ 1877 /* skip this message */
1732 dout("alloc_msg said skip message\n"); 1878 dout("alloc_msg said skip message\n");
@@ -1737,11 +1883,9 @@ static int read_partial_message(struct ceph_connection *con)
1737 con->in_seq++; 1883 con->in_seq++;
1738 return 0; 1884 return 0;
1739 } 1885 }
1740 if (!con->in_msg) { 1886
1741 con->error_msg = 1887 BUG_ON(!con->in_msg);
1742 "error allocating memory for incoming message"; 1888 BUG_ON(con->in_msg->con != con);
1743 return -ENOMEM;
1744 }
1745 m = con->in_msg; 1889 m = con->in_msg;
1746 m->front.iov_len = 0; /* haven't read it yet */ 1890 m->front.iov_len = 0; /* haven't read it yet */
1747 if (m->middle) 1891 if (m->middle)
@@ -1753,6 +1897,11 @@ static int read_partial_message(struct ceph_connection *con)
1753 else 1897 else
1754 con->in_msg_pos.page_pos = 0; 1898 con->in_msg_pos.page_pos = 0;
1755 con->in_msg_pos.data_pos = 0; 1899 con->in_msg_pos.data_pos = 0;
1900
1901#ifdef CONFIG_BLOCK
1902 if (m->bio)
1903 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1904#endif
1756 } 1905 }
1757 1906
1758 /* front */ 1907 /* front */
@@ -1769,10 +1918,6 @@ static int read_partial_message(struct ceph_connection *con)
1769 if (ret <= 0) 1918 if (ret <= 0)
1770 return ret; 1919 return ret;
1771 } 1920 }
1772#ifdef CONFIG_BLOCK
1773 if (m->bio && !m->bio_iter)
1774 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1775#endif
1776 1921
1777 /* (page) data */ 1922 /* (page) data */
1778 while (con->in_msg_pos.data_pos < data_len) { 1923 while (con->in_msg_pos.data_pos < data_len) {
@@ -1783,7 +1928,7 @@ static int read_partial_message(struct ceph_connection *con)
1783 return ret; 1928 return ret;
1784#ifdef CONFIG_BLOCK 1929#ifdef CONFIG_BLOCK
1785 } else if (m->bio) { 1930 } else if (m->bio) {
1786 1931 BUG_ON(!m->bio_iter);
1787 ret = read_partial_message_bio(con, 1932 ret = read_partial_message_bio(con,
1788 &m->bio_iter, &m->bio_seg, 1933 &m->bio_iter, &m->bio_seg,
1789 data_len, do_datacrc); 1934 data_len, do_datacrc);
@@ -1837,8 +1982,11 @@ static void process_message(struct ceph_connection *con)
1837{ 1982{
1838 struct ceph_msg *msg; 1983 struct ceph_msg *msg;
1839 1984
1985 BUG_ON(con->in_msg->con != con);
1986 con->in_msg->con = NULL;
1840 msg = con->in_msg; 1987 msg = con->in_msg;
1841 con->in_msg = NULL; 1988 con->in_msg = NULL;
1989 con->ops->put(con);
1842 1990
1843 /* if first message, set peer_name */ 1991 /* if first message, set peer_name */
1844 if (con->peer_name.type == 0) 1992 if (con->peer_name.type == 0)
@@ -1858,7 +2006,6 @@ static void process_message(struct ceph_connection *con)
1858 con->ops->dispatch(con, msg); 2006 con->ops->dispatch(con, msg);
1859 2007
1860 mutex_lock(&con->mutex); 2008 mutex_lock(&con->mutex);
1861 prepare_read_tag(con);
1862} 2009}
1863 2010
1864 2011
@@ -1870,22 +2017,19 @@ static int try_write(struct ceph_connection *con)
1870{ 2017{
1871 int ret = 1; 2018 int ret = 1;
1872 2019
1873 dout("try_write start %p state %lu nref %d\n", con, con->state, 2020 dout("try_write start %p state %lu\n", con, con->state);
1874 atomic_read(&con->nref));
1875 2021
1876more: 2022more:
1877 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 2023 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1878 2024
1879 /* open the socket first? */ 2025 /* open the socket first? */
1880 if (con->sock == NULL) { 2026 if (con->state == CON_STATE_PREOPEN) {
1881 ceph_con_out_kvec_reset(con); 2027 BUG_ON(con->sock);
2028 con->state = CON_STATE_CONNECTING;
2029
2030 con_out_kvec_reset(con);
1882 prepare_write_banner(con); 2031 prepare_write_banner(con);
1883 ret = prepare_write_connect(con);
1884 if (ret < 0)
1885 goto out;
1886 prepare_read_banner(con); 2032 prepare_read_banner(con);
1887 set_bit(CONNECTING, &con->state);
1888 clear_bit(NEGOTIATING, &con->state);
1889 2033
1890 BUG_ON(con->in_msg); 2034 BUG_ON(con->in_msg);
1891 con->in_tag = CEPH_MSGR_TAG_READY; 2035 con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1932,7 +2076,7 @@ more_kvec:
1932 } 2076 }
1933 2077
1934do_next: 2078do_next:
1935 if (!test_bit(CONNECTING, &con->state)) { 2079 if (con->state == CON_STATE_OPEN) {
1936 /* is anything else pending? */ 2080 /* is anything else pending? */
1937 if (!list_empty(&con->out_queue)) { 2081 if (!list_empty(&con->out_queue)) {
1938 prepare_write_message(con); 2082 prepare_write_message(con);
@@ -1942,14 +2086,15 @@ do_next:
1942 prepare_write_ack(con); 2086 prepare_write_ack(con);
1943 goto more; 2087 goto more;
1944 } 2088 }
1945 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 2089 if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
2090 &con->flags)) {
1946 prepare_write_keepalive(con); 2091 prepare_write_keepalive(con);
1947 goto more; 2092 goto more;
1948 } 2093 }
1949 } 2094 }
1950 2095
1951 /* Nothing to do! */ 2096 /* Nothing to do! */
1952 clear_bit(WRITE_PENDING, &con->state); 2097 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
1953 dout("try_write nothing else to write.\n"); 2098 dout("try_write nothing else to write.\n");
1954 ret = 0; 2099 ret = 0;
1955out: 2100out:
@@ -1966,38 +2111,42 @@ static int try_read(struct ceph_connection *con)
1966{ 2111{
1967 int ret = -1; 2112 int ret = -1;
1968 2113
1969 if (!con->sock) 2114more:
1970 return 0; 2115 dout("try_read start on %p state %lu\n", con, con->state);
1971 2116 if (con->state != CON_STATE_CONNECTING &&
1972 if (test_bit(STANDBY, &con->state)) 2117 con->state != CON_STATE_NEGOTIATING &&
2118 con->state != CON_STATE_OPEN)
1973 return 0; 2119 return 0;
1974 2120
1975 dout("try_read start on %p\n", con); 2121 BUG_ON(!con->sock);
1976 2122
1977more:
1978 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2123 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1979 con->in_base_pos); 2124 con->in_base_pos);
1980 2125
1981 /* 2126 if (con->state == CON_STATE_CONNECTING) {
1982 * process_connect and process_message drop and re-take 2127 dout("try_read connecting\n");
1983 * con->mutex. make sure we handle a racing close or reopen. 2128 ret = read_partial_banner(con);
1984 */ 2129 if (ret <= 0)
1985 if (test_bit(CLOSED, &con->state) || 2130 goto out;
1986 test_bit(OPENING, &con->state)) { 2131 ret = process_banner(con);
1987 ret = -EAGAIN; 2132 if (ret < 0)
2133 goto out;
2134
2135 BUG_ON(con->state != CON_STATE_CONNECTING);
2136 con->state = CON_STATE_NEGOTIATING;
2137
2138 /* Banner is good, exchange connection info */
2139 ret = prepare_write_connect(con);
2140 if (ret < 0)
2141 goto out;
2142 prepare_read_connect(con);
2143
2144 /* Send connection info before awaiting response */
1988 goto out; 2145 goto out;
1989 } 2146 }
1990 2147
1991 if (test_bit(CONNECTING, &con->state)) { 2148 if (con->state == CON_STATE_NEGOTIATING) {
1992 if (!test_bit(NEGOTIATING, &con->state)) { 2149 dout("try_read negotiating\n");
1993 dout("try_read connecting\n");
1994 ret = read_partial_banner(con);
1995 if (ret <= 0)
1996 goto out;
1997 ret = process_banner(con);
1998 if (ret < 0)
1999 goto out;
2000 }
2001 ret = read_partial_connect(con); 2150 ret = read_partial_connect(con);
2002 if (ret <= 0) 2151 if (ret <= 0)
2003 goto out; 2152 goto out;
@@ -2007,6 +2156,8 @@ more:
2007 goto more; 2156 goto more;
2008 } 2157 }
2009 2158
2159 BUG_ON(con->state != CON_STATE_OPEN);
2160
2010 if (con->in_base_pos < 0) { 2161 if (con->in_base_pos < 0) {
2011 /* 2162 /*
2012 * skipping + discarding content. 2163 * skipping + discarding content.
@@ -2040,7 +2191,8 @@ more:
2040 prepare_read_ack(con); 2191 prepare_read_ack(con);
2041 break; 2192 break;
2042 case CEPH_MSGR_TAG_CLOSE: 2193 case CEPH_MSGR_TAG_CLOSE:
2043 set_bit(CLOSED, &con->state); /* fixme */ 2194 con_close_socket(con);
2195 con->state = CON_STATE_CLOSED;
2044 goto out; 2196 goto out;
2045 default: 2197 default:
2046 goto bad_tag; 2198 goto bad_tag;
@@ -2063,6 +2215,8 @@ more:
2063 if (con->in_tag == CEPH_MSGR_TAG_READY) 2215 if (con->in_tag == CEPH_MSGR_TAG_READY)
2064 goto more; 2216 goto more;
2065 process_message(con); 2217 process_message(con);
2218 if (con->state == CON_STATE_OPEN)
2219 prepare_read_tag(con);
2066 goto more; 2220 goto more;
2067 } 2221 }
2068 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2222 if (con->in_tag == CEPH_MSGR_TAG_ACK) {
@@ -2091,12 +2245,6 @@ bad_tag:
2091 */ 2245 */
2092static void queue_con(struct ceph_connection *con) 2246static void queue_con(struct ceph_connection *con)
2093{ 2247{
2094 if (test_bit(DEAD, &con->state)) {
2095 dout("queue_con %p ignoring: DEAD\n",
2096 con);
2097 return;
2098 }
2099
2100 if (!con->ops->get(con)) { 2248 if (!con->ops->get(con)) {
2101 dout("queue_con %p ref count 0\n", con); 2249 dout("queue_con %p ref count 0\n", con);
2102 return; 2250 return;
@@ -2121,7 +2269,26 @@ static void con_work(struct work_struct *work)
2121 2269
2122 mutex_lock(&con->mutex); 2270 mutex_lock(&con->mutex);
2123restart: 2271restart:
2124 if (test_and_clear_bit(BACKOFF, &con->state)) { 2272 if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
2273 switch (con->state) {
2274 case CON_STATE_CONNECTING:
2275 con->error_msg = "connection failed";
2276 break;
2277 case CON_STATE_NEGOTIATING:
2278 con->error_msg = "negotiation failed";
2279 break;
2280 case CON_STATE_OPEN:
2281 con->error_msg = "socket closed";
2282 break;
2283 default:
2284 dout("unrecognized con state %d\n", (int)con->state);
2285 con->error_msg = "unrecognized con state";
2286 BUG();
2287 }
2288 goto fault;
2289 }
2290
2291 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
2125 dout("con_work %p backing off\n", con); 2292 dout("con_work %p backing off\n", con);
2126 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2293 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2127 round_jiffies_relative(con->delay))) { 2294 round_jiffies_relative(con->delay))) {
@@ -2135,35 +2302,35 @@ restart:
2135 } 2302 }
2136 } 2303 }
2137 2304
2138 if (test_bit(STANDBY, &con->state)) { 2305 if (con->state == CON_STATE_STANDBY) {
2139 dout("con_work %p STANDBY\n", con); 2306 dout("con_work %p STANDBY\n", con);
2140 goto done; 2307 goto done;
2141 } 2308 }
2142 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 2309 if (con->state == CON_STATE_CLOSED) {
2143 dout("con_work CLOSED\n"); 2310 dout("con_work %p CLOSED\n", con);
2144 con_close_socket(con); 2311 BUG_ON(con->sock);
2145 goto done; 2312 goto done;
2146 } 2313 }
2147 if (test_and_clear_bit(OPENING, &con->state)) { 2314 if (con->state == CON_STATE_PREOPEN) {
2148 /* reopen w/ new peer */
2149 dout("con_work OPENING\n"); 2315 dout("con_work OPENING\n");
2150 con_close_socket(con); 2316 BUG_ON(con->sock);
2151 } 2317 }
2152 2318
2153 if (test_and_clear_bit(SOCK_CLOSED, &con->state))
2154 goto fault;
2155
2156 ret = try_read(con); 2319 ret = try_read(con);
2157 if (ret == -EAGAIN) 2320 if (ret == -EAGAIN)
2158 goto restart; 2321 goto restart;
2159 if (ret < 0) 2322 if (ret < 0) {
2323 con->error_msg = "socket error on read";
2160 goto fault; 2324 goto fault;
2325 }
2161 2326
2162 ret = try_write(con); 2327 ret = try_write(con);
2163 if (ret == -EAGAIN) 2328 if (ret == -EAGAIN)
2164 goto restart; 2329 goto restart;
2165 if (ret < 0) 2330 if (ret < 0) {
2331 con->error_msg = "socket error on write";
2166 goto fault; 2332 goto fault;
2333 }
2167 2334
2168done: 2335done:
2169 mutex_unlock(&con->mutex); 2336 mutex_unlock(&con->mutex);
@@ -2172,7 +2339,6 @@ done_unlocked:
2172 return; 2339 return;
2173 2340
2174fault: 2341fault:
2175 mutex_unlock(&con->mutex);
2176 ceph_fault(con); /* error/fault path */ 2342 ceph_fault(con); /* error/fault path */
2177 goto done_unlocked; 2343 goto done_unlocked;
2178} 2344}
@@ -2183,26 +2349,31 @@ fault:
2183 * exponential backoff 2349 * exponential backoff
2184 */ 2350 */
2185static void ceph_fault(struct ceph_connection *con) 2351static void ceph_fault(struct ceph_connection *con)
2352 __releases(con->mutex)
2186{ 2353{
2187 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2354 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2188 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2355 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2189 dout("fault %p state %lu to peer %s\n", 2356 dout("fault %p state %lu to peer %s\n",
2190 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2357 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2191 2358
2192 if (test_bit(LOSSYTX, &con->state)) { 2359 BUG_ON(con->state != CON_STATE_CONNECTING &&
2193 dout("fault on LOSSYTX channel\n"); 2360 con->state != CON_STATE_NEGOTIATING &&
2194 goto out; 2361 con->state != CON_STATE_OPEN);
2195 }
2196
2197 mutex_lock(&con->mutex);
2198 if (test_bit(CLOSED, &con->state))
2199 goto out_unlock;
2200 2362
2201 con_close_socket(con); 2363 con_close_socket(con);
2202 2364
2365 if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
2366 dout("fault on LOSSYTX channel, marking CLOSED\n");
2367 con->state = CON_STATE_CLOSED;
2368 goto out_unlock;
2369 }
2370
2203 if (con->in_msg) { 2371 if (con->in_msg) {
2372 BUG_ON(con->in_msg->con != con);
2373 con->in_msg->con = NULL;
2204 ceph_msg_put(con->in_msg); 2374 ceph_msg_put(con->in_msg);
2205 con->in_msg = NULL; 2375 con->in_msg = NULL;
2376 con->ops->put(con);
2206 } 2377 }
2207 2378
2208 /* Requeue anything that hasn't been acked */ 2379 /* Requeue anything that hasn't been acked */
@@ -2211,12 +2382,13 @@ static void ceph_fault(struct ceph_connection *con)
2211 /* If there are no messages queued or keepalive pending, place 2382 /* If there are no messages queued or keepalive pending, place
2212 * the connection in a STANDBY state */ 2383 * the connection in a STANDBY state */
2213 if (list_empty(&con->out_queue) && 2384 if (list_empty(&con->out_queue) &&
2214 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2385 !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
2215 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2386 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2216 clear_bit(WRITE_PENDING, &con->state); 2387 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
2217 set_bit(STANDBY, &con->state); 2388 con->state = CON_STATE_STANDBY;
2218 } else { 2389 } else {
2219 /* retry after a delay. */ 2390 /* retry after a delay. */
2391 con->state = CON_STATE_PREOPEN;
2220 if (con->delay == 0) 2392 if (con->delay == 0)
2221 con->delay = BASE_DELAY_INTERVAL; 2393 con->delay = BASE_DELAY_INTERVAL;
2222 else if (con->delay < MAX_DELAY_INTERVAL) 2394 else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2237,13 +2409,12 @@ static void ceph_fault(struct ceph_connection *con)
2237 * that when con_work restarts we schedule the 2409 * that when con_work restarts we schedule the
2238 * delay then. 2410 * delay then.
2239 */ 2411 */
2240 set_bit(BACKOFF, &con->state); 2412 set_bit(CON_FLAG_BACKOFF, &con->flags);
2241 } 2413 }
2242 } 2414 }
2243 2415
2244out_unlock: 2416out_unlock:
2245 mutex_unlock(&con->mutex); 2417 mutex_unlock(&con->mutex);
2246out:
2247 /* 2418 /*
2248 * in case we faulted due to authentication, invalidate our 2419 * in case we faulted due to authentication, invalidate our
2249 * current tickets so that we can get new ones. 2420 * current tickets so that we can get new ones.
@@ -2260,18 +2431,14 @@ out:
2260 2431
2261 2432
2262/* 2433/*
2263 * create a new messenger instance 2434 * initialize a new messenger instance
2264 */ 2435 */
2265struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2436void ceph_messenger_init(struct ceph_messenger *msgr,
2266 u32 supported_features, 2437 struct ceph_entity_addr *myaddr,
2267 u32 required_features) 2438 u32 supported_features,
2439 u32 required_features,
2440 bool nocrc)
2268{ 2441{
2269 struct ceph_messenger *msgr;
2270
2271 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
2272 if (msgr == NULL)
2273 return ERR_PTR(-ENOMEM);
2274
2275 msgr->supported_features = supported_features; 2442 msgr->supported_features = supported_features;
2276 msgr->required_features = required_features; 2443 msgr->required_features = required_features;
2277 2444
@@ -2284,30 +2451,23 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
2284 msgr->inst.addr.type = 0; 2451 msgr->inst.addr.type = 0;
2285 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2452 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2286 encode_my_addr(msgr); 2453 encode_my_addr(msgr);
2454 msgr->nocrc = nocrc;
2287 2455
2288 dout("messenger_create %p\n", msgr); 2456 atomic_set(&msgr->stopping, 0);
2289 return msgr;
2290}
2291EXPORT_SYMBOL(ceph_messenger_create);
2292 2457
2293void ceph_messenger_destroy(struct ceph_messenger *msgr) 2458 dout("%s %p\n", __func__, msgr);
2294{
2295 dout("destroy %p\n", msgr);
2296 kfree(msgr);
2297 dout("destroyed messenger %p\n", msgr);
2298} 2459}
2299EXPORT_SYMBOL(ceph_messenger_destroy); 2460EXPORT_SYMBOL(ceph_messenger_init);
2300 2461
2301static void clear_standby(struct ceph_connection *con) 2462static void clear_standby(struct ceph_connection *con)
2302{ 2463{
2303 /* come back from STANDBY? */ 2464 /* come back from STANDBY? */
2304 if (test_and_clear_bit(STANDBY, &con->state)) { 2465 if (con->state == CON_STATE_STANDBY) {
2305 mutex_lock(&con->mutex);
2306 dout("clear_standby %p and ++connect_seq\n", con); 2466 dout("clear_standby %p and ++connect_seq\n", con);
2467 con->state = CON_STATE_PREOPEN;
2307 con->connect_seq++; 2468 con->connect_seq++;
2308 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2469 WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
2309 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2470 WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
2310 mutex_unlock(&con->mutex);
2311 } 2471 }
2312} 2472}
2313 2473
@@ -2316,21 +2476,24 @@ static void clear_standby(struct ceph_connection *con)
2316 */ 2476 */
2317void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2477void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2318{ 2478{
2319 if (test_bit(CLOSED, &con->state)) {
2320 dout("con_send %p closed, dropping %p\n", con, msg);
2321 ceph_msg_put(msg);
2322 return;
2323 }
2324
2325 /* set src+dst */ 2479 /* set src+dst */
2326 msg->hdr.src = con->msgr->inst.name; 2480 msg->hdr.src = con->msgr->inst.name;
2327
2328 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2481 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
2329
2330 msg->needs_out_seq = true; 2482 msg->needs_out_seq = true;
2331 2483
2332 /* queue */
2333 mutex_lock(&con->mutex); 2484 mutex_lock(&con->mutex);
2485
2486 if (con->state == CON_STATE_CLOSED) {
2487 dout("con_send %p closed, dropping %p\n", con, msg);
2488 ceph_msg_put(msg);
2489 mutex_unlock(&con->mutex);
2490 return;
2491 }
2492
2493 BUG_ON(msg->con != NULL);
2494 msg->con = con->ops->get(con);
2495 BUG_ON(msg->con == NULL);
2496
2334 BUG_ON(!list_empty(&msg->list_head)); 2497 BUG_ON(!list_empty(&msg->list_head));
2335 list_add_tail(&msg->list_head, &con->out_queue); 2498 list_add_tail(&msg->list_head, &con->out_queue);
2336 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2499 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2339,12 +2502,13 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2339 le32_to_cpu(msg->hdr.front_len), 2502 le32_to_cpu(msg->hdr.front_len),
2340 le32_to_cpu(msg->hdr.middle_len), 2503 le32_to_cpu(msg->hdr.middle_len),
2341 le32_to_cpu(msg->hdr.data_len)); 2504 le32_to_cpu(msg->hdr.data_len));
2505
2506 clear_standby(con);
2342 mutex_unlock(&con->mutex); 2507 mutex_unlock(&con->mutex);
2343 2508
2344 /* if there wasn't anything waiting to send before, queue 2509 /* if there wasn't anything waiting to send before, queue
2345 * new work */ 2510 * new work */
2346 clear_standby(con); 2511 if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
2347 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2348 queue_con(con); 2512 queue_con(con);
2349} 2513}
2350EXPORT_SYMBOL(ceph_con_send); 2514EXPORT_SYMBOL(ceph_con_send);
@@ -2352,24 +2516,34 @@ EXPORT_SYMBOL(ceph_con_send);
2352/* 2516/*
2353 * Revoke a message that was previously queued for send 2517 * Revoke a message that was previously queued for send
2354 */ 2518 */
2355void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2519void ceph_msg_revoke(struct ceph_msg *msg)
2356{ 2520{
2521 struct ceph_connection *con = msg->con;
2522
2523 if (!con)
2524 return; /* Message not in our possession */
2525
2357 mutex_lock(&con->mutex); 2526 mutex_lock(&con->mutex);
2358 if (!list_empty(&msg->list_head)) { 2527 if (!list_empty(&msg->list_head)) {
2359 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2528 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
2360 list_del_init(&msg->list_head); 2529 list_del_init(&msg->list_head);
2361 ceph_msg_put(msg); 2530 BUG_ON(msg->con == NULL);
2531 msg->con->ops->put(msg->con);
2532 msg->con = NULL;
2362 msg->hdr.seq = 0; 2533 msg->hdr.seq = 0;
2534
2535 ceph_msg_put(msg);
2363 } 2536 }
2364 if (con->out_msg == msg) { 2537 if (con->out_msg == msg) {
2365 dout("con_revoke %p msg %p - was sending\n", con, msg); 2538 dout("%s %p msg %p - was sending\n", __func__, con, msg);
2366 con->out_msg = NULL; 2539 con->out_msg = NULL;
2367 if (con->out_kvec_is_msg) { 2540 if (con->out_kvec_is_msg) {
2368 con->out_skip = con->out_kvec_bytes; 2541 con->out_skip = con->out_kvec_bytes;
2369 con->out_kvec_is_msg = false; 2542 con->out_kvec_is_msg = false;
2370 } 2543 }
2371 ceph_msg_put(msg);
2372 msg->hdr.seq = 0; 2544 msg->hdr.seq = 0;
2545
2546 ceph_msg_put(msg);
2373 } 2547 }
2374 mutex_unlock(&con->mutex); 2548 mutex_unlock(&con->mutex);
2375} 2549}
@@ -2377,17 +2551,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2377/* 2551/*
2378 * Revoke a message that we may be reading data into 2552 * Revoke a message that we may be reading data into
2379 */ 2553 */
2380void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2554void ceph_msg_revoke_incoming(struct ceph_msg *msg)
2381{ 2555{
2556 struct ceph_connection *con;
2557
2558 BUG_ON(msg == NULL);
2559 if (!msg->con) {
2560 dout("%s msg %p null con\n", __func__, msg);
2561
2562 return; /* Message not in our possession */
2563 }
2564
2565 con = msg->con;
2382 mutex_lock(&con->mutex); 2566 mutex_lock(&con->mutex);
2383 if (con->in_msg && con->in_msg == msg) { 2567 if (con->in_msg == msg) {
2384 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 2568 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
2385 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 2569 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
2386 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 2570 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
2387 2571
2388 /* skip rest of message */ 2572 /* skip rest of message */
2389 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2573 dout("%s %p msg %p revoked\n", __func__, con, msg);
2390 con->in_base_pos = con->in_base_pos - 2574 con->in_base_pos = con->in_base_pos -
2391 sizeof(struct ceph_msg_header) - 2575 sizeof(struct ceph_msg_header) -
2392 front_len - 2576 front_len -
2393 middle_len - 2577 middle_len -
@@ -2398,8 +2582,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2398 con->in_tag = CEPH_MSGR_TAG_READY; 2582 con->in_tag = CEPH_MSGR_TAG_READY;
2399 con->in_seq++; 2583 con->in_seq++;
2400 } else { 2584 } else {
2401 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2585 dout("%s %p in_msg %p msg %p no-op\n",
2402 con, con->in_msg, msg); 2586 __func__, con, con->in_msg, msg);
2403 } 2587 }
2404 mutex_unlock(&con->mutex); 2588 mutex_unlock(&con->mutex);
2405} 2589}
@@ -2410,9 +2594,11 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2410void ceph_con_keepalive(struct ceph_connection *con) 2594void ceph_con_keepalive(struct ceph_connection *con)
2411{ 2595{
2412 dout("con_keepalive %p\n", con); 2596 dout("con_keepalive %p\n", con);
2597 mutex_lock(&con->mutex);
2413 clear_standby(con); 2598 clear_standby(con);
2414 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2599 mutex_unlock(&con->mutex);
2415 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2600 if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
2601 test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
2416 queue_con(con); 2602 queue_con(con);
2417} 2603}
2418EXPORT_SYMBOL(ceph_con_keepalive); 2604EXPORT_SYMBOL(ceph_con_keepalive);
@@ -2431,6 +2617,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2431 if (m == NULL) 2617 if (m == NULL)
2432 goto out; 2618 goto out;
2433 kref_init(&m->kref); 2619 kref_init(&m->kref);
2620
2621 m->con = NULL;
2434 INIT_LIST_HEAD(&m->list_head); 2622 INIT_LIST_HEAD(&m->list_head);
2435 2623
2436 m->hdr.tid = 0; 2624 m->hdr.tid = 0;
@@ -2526,46 +2714,77 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2526} 2714}
2527 2715
2528/* 2716/*
2529 * Generic message allocator, for incoming messages. 2717 * Allocate a message for receiving an incoming message on a
2718 * connection, and save the result in con->in_msg. Uses the
2719 * connection's private alloc_msg op if available.
2720 *
2721 * Returns 0 on success, or a negative error code.
2722 *
2723 * On success, if we set *skip = 1:
2724 * - the next message should be skipped and ignored.
2725 * - con->in_msg == NULL
2726 * or if we set *skip = 0:
2727 * - con->in_msg is non-null.
2728 * On error (ENOMEM, EAGAIN, ...),
2729 * - con->in_msg == NULL
2530 */ 2730 */
2531static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2731static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
2532 struct ceph_msg_header *hdr,
2533 int *skip)
2534{ 2732{
2733 struct ceph_msg_header *hdr = &con->in_hdr;
2535 int type = le16_to_cpu(hdr->type); 2734 int type = le16_to_cpu(hdr->type);
2536 int front_len = le32_to_cpu(hdr->front_len); 2735 int front_len = le32_to_cpu(hdr->front_len);
2537 int middle_len = le32_to_cpu(hdr->middle_len); 2736 int middle_len = le32_to_cpu(hdr->middle_len);
2538 struct ceph_msg *msg = NULL; 2737 int ret = 0;
2539 int ret; 2738
2739 BUG_ON(con->in_msg != NULL);
2540 2740
2541 if (con->ops->alloc_msg) { 2741 if (con->ops->alloc_msg) {
2742 struct ceph_msg *msg;
2743
2542 mutex_unlock(&con->mutex); 2744 mutex_unlock(&con->mutex);
2543 msg = con->ops->alloc_msg(con, hdr, skip); 2745 msg = con->ops->alloc_msg(con, hdr, skip);
2544 mutex_lock(&con->mutex); 2746 mutex_lock(&con->mutex);
2545 if (!msg || *skip) 2747 if (con->state != CON_STATE_OPEN) {
2546 return NULL; 2748 ceph_msg_put(msg);
2749 return -EAGAIN;
2750 }
2751 con->in_msg = msg;
2752 if (con->in_msg) {
2753 con->in_msg->con = con->ops->get(con);
2754 BUG_ON(con->in_msg->con == NULL);
2755 }
2756 if (*skip) {
2757 con->in_msg = NULL;
2758 return 0;
2759 }
2760 if (!con->in_msg) {
2761 con->error_msg =
2762 "error allocating memory for incoming message";
2763 return -ENOMEM;
2764 }
2547 } 2765 }
2548 if (!msg) { 2766 if (!con->in_msg) {
2549 *skip = 0; 2767 con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2550 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 2768 if (!con->in_msg) {
2551 if (!msg) {
2552 pr_err("unable to allocate msg type %d len %d\n", 2769 pr_err("unable to allocate msg type %d len %d\n",
2553 type, front_len); 2770 type, front_len);
2554 return NULL; 2771 return -ENOMEM;
2555 } 2772 }
2556 msg->page_alignment = le16_to_cpu(hdr->data_off); 2773 con->in_msg->con = con->ops->get(con);
2774 BUG_ON(con->in_msg->con == NULL);
2775 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
2557 } 2776 }
2558 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2777 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2559 2778
2560 if (middle_len && !msg->middle) { 2779 if (middle_len && !con->in_msg->middle) {
2561 ret = ceph_alloc_middle(con, msg); 2780 ret = ceph_alloc_middle(con, con->in_msg);
2562 if (ret < 0) { 2781 if (ret < 0) {
2563 ceph_msg_put(msg); 2782 ceph_msg_put(con->in_msg);
2564 return NULL; 2783 con->in_msg = NULL;
2565 } 2784 }
2566 } 2785 }
2567 2786
2568 return msg; 2787 return ret;
2569} 2788}
2570 2789
2571 2790
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index d0649a9655be..105d533b55f3 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
106 monc->pending_auth = 1; 106 monc->pending_auth = 1;
107 monc->m_auth->front.iov_len = len; 107 monc->m_auth->front.iov_len = len;
108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 108 monc->m_auth->hdr.front_len = cpu_to_le32(len);
109 ceph_con_revoke(monc->con, monc->m_auth); 109 ceph_msg_revoke(monc->m_auth);
110 ceph_msg_get(monc->m_auth); /* keep our ref */ 110 ceph_msg_get(monc->m_auth); /* keep our ref */
111 ceph_con_send(monc->con, monc->m_auth); 111 ceph_con_send(&monc->con, monc->m_auth);
112} 112}
113 113
114/* 114/*
@@ -117,8 +117,11 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
117static void __close_session(struct ceph_mon_client *monc) 117static void __close_session(struct ceph_mon_client *monc)
118{ 118{
119 dout("__close_session closing mon%d\n", monc->cur_mon); 119 dout("__close_session closing mon%d\n", monc->cur_mon);
120 ceph_con_revoke(monc->con, monc->m_auth); 120 ceph_msg_revoke(monc->m_auth);
121 ceph_con_close(monc->con); 121 ceph_msg_revoke_incoming(monc->m_auth_reply);
122 ceph_msg_revoke(monc->m_subscribe);
123 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
124 ceph_con_close(&monc->con);
122 monc->cur_mon = -1; 125 monc->cur_mon = -1;
123 monc->pending_auth = 0; 126 monc->pending_auth = 0;
124 ceph_auth_reset(monc->auth); 127 ceph_auth_reset(monc->auth);
@@ -142,9 +145,8 @@ static int __open_session(struct ceph_mon_client *monc)
142 monc->want_next_osdmap = !!monc->want_next_osdmap; 145 monc->want_next_osdmap = !!monc->want_next_osdmap;
143 146
144 dout("open_session mon%d opening\n", monc->cur_mon); 147 dout("open_session mon%d opening\n", monc->cur_mon);
145 monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; 148 ceph_con_open(&monc->con,
146 monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); 149 CEPH_ENTITY_TYPE_MON, monc->cur_mon,
147 ceph_con_open(monc->con,
148 &monc->monmap->mon_inst[monc->cur_mon].addr); 150 &monc->monmap->mon_inst[monc->cur_mon].addr);
149 151
150 /* initiatiate authentication handshake */ 152 /* initiatiate authentication handshake */
@@ -226,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
226 228
227 msg->front.iov_len = p - msg->front.iov_base; 229 msg->front.iov_len = p - msg->front.iov_base;
228 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 230 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
229 ceph_con_revoke(monc->con, msg); 231 ceph_msg_revoke(msg);
230 ceph_con_send(monc->con, ceph_msg_get(msg)); 232 ceph_con_send(&monc->con, ceph_msg_get(msg));
231 233
232 monc->sub_sent = jiffies | 1; /* never 0 */ 234 monc->sub_sent = jiffies | 1; /* never 0 */
233 } 235 }
@@ -247,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
247 if (monc->hunting) { 249 if (monc->hunting) {
248 pr_info("mon%d %s session established\n", 250 pr_info("mon%d %s session established\n",
249 monc->cur_mon, 251 monc->cur_mon,
250 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 252 ceph_pr_addr(&monc->con.peer_addr.in_addr));
251 monc->hunting = false; 253 monc->hunting = false;
252 } 254 }
253 dout("handle_subscribe_ack after %d seconds\n", seconds); 255 dout("handle_subscribe_ack after %d seconds\n", seconds);
@@ -439,6 +441,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
439 m = NULL; 441 m = NULL;
440 } else { 442 } else {
441 dout("get_generic_reply %lld got %p\n", tid, req->reply); 443 dout("get_generic_reply %lld got %p\n", tid, req->reply);
444 *skip = 0;
442 m = ceph_msg_get(req->reply); 445 m = ceph_msg_get(req->reply);
443 /* 446 /*
444 * we don't need to track the connection reading into 447 * we don't need to track the connection reading into
@@ -461,7 +464,7 @@ static int do_generic_request(struct ceph_mon_client *monc,
461 req->request->hdr.tid = cpu_to_le64(req->tid); 464 req->request->hdr.tid = cpu_to_le64(req->tid);
462 __insert_generic_request(monc, req); 465 __insert_generic_request(monc, req);
463 monc->num_generic_requests++; 466 monc->num_generic_requests++;
464 ceph_con_send(monc->con, ceph_msg_get(req->request)); 467 ceph_con_send(&monc->con, ceph_msg_get(req->request));
465 mutex_unlock(&monc->mutex); 468 mutex_unlock(&monc->mutex);
466 469
467 err = wait_for_completion_interruptible(&req->completion); 470 err = wait_for_completion_interruptible(&req->completion);
@@ -684,8 +687,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc)
684 687
685 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 688 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
686 req = rb_entry(p, struct ceph_mon_generic_request, node); 689 req = rb_entry(p, struct ceph_mon_generic_request, node);
687 ceph_con_revoke(monc->con, req->request); 690 ceph_msg_revoke(req->request);
688 ceph_con_send(monc->con, ceph_msg_get(req->request)); 691 ceph_msg_revoke_incoming(req->reply);
692 ceph_con_send(&monc->con, ceph_msg_get(req->request));
689 } 693 }
690} 694}
691 695
@@ -705,7 +709,7 @@ static void delayed_work(struct work_struct *work)
705 __close_session(monc); 709 __close_session(monc);
706 __open_session(monc); /* continue hunting */ 710 __open_session(monc); /* continue hunting */
707 } else { 711 } else {
708 ceph_con_keepalive(monc->con); 712 ceph_con_keepalive(&monc->con);
709 713
710 __validate_auth(monc); 714 __validate_auth(monc);
711 715
@@ -760,19 +764,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
760 goto out; 764 goto out;
761 765
762 /* connection */ 766 /* connection */
763 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
764 if (!monc->con)
765 goto out_monmap;
766 ceph_con_init(monc->client->msgr, monc->con);
767 monc->con->private = monc;
768 monc->con->ops = &mon_con_ops;
769
770 /* authentication */ 767 /* authentication */
771 monc->auth = ceph_auth_init(cl->options->name, 768 monc->auth = ceph_auth_init(cl->options->name,
772 cl->options->key); 769 cl->options->key);
773 if (IS_ERR(monc->auth)) { 770 if (IS_ERR(monc->auth)) {
774 err = PTR_ERR(monc->auth); 771 err = PTR_ERR(monc->auth);
775 goto out_con; 772 goto out_monmap;
776 } 773 }
777 monc->auth->want_keys = 774 monc->auth->want_keys =
778 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 775 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
@@ -801,6 +798,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
801 if (!monc->m_auth) 798 if (!monc->m_auth)
802 goto out_auth_reply; 799 goto out_auth_reply;
803 800
801 ceph_con_init(&monc->con, monc, &mon_con_ops,
802 &monc->client->msgr);
803
804 monc->cur_mon = -1; 804 monc->cur_mon = -1;
805 monc->hunting = true; 805 monc->hunting = true;
806 monc->sub_renew_after = jiffies; 806 monc->sub_renew_after = jiffies;
@@ -824,8 +824,6 @@ out_subscribe_ack:
824 ceph_msg_put(monc->m_subscribe_ack); 824 ceph_msg_put(monc->m_subscribe_ack);
825out_auth: 825out_auth:
826 ceph_auth_destroy(monc->auth); 826 ceph_auth_destroy(monc->auth);
827out_con:
828 monc->con->ops->put(monc->con);
829out_monmap: 827out_monmap:
830 kfree(monc->monmap); 828 kfree(monc->monmap);
831out: 829out:
@@ -841,10 +839,6 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
841 mutex_lock(&monc->mutex); 839 mutex_lock(&monc->mutex);
842 __close_session(monc); 840 __close_session(monc);
843 841
844 monc->con->private = NULL;
845 monc->con->ops->put(monc->con);
846 monc->con = NULL;
847
848 mutex_unlock(&monc->mutex); 842 mutex_unlock(&monc->mutex);
849 843
850 /* 844 /*
@@ -888,8 +882,8 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
888 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 882 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
889 dout("authenticated, starting session\n"); 883 dout("authenticated, starting session\n");
890 884
891 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 885 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
892 monc->client->msgr->inst.name.num = 886 monc->client->msgr.inst.name.num =
893 cpu_to_le64(monc->auth->global_id); 887 cpu_to_le64(monc->auth->global_id);
894 888
895 __send_subscribe(monc); 889 __send_subscribe(monc);
@@ -1000,6 +994,8 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1000 case CEPH_MSG_MDS_MAP: 994 case CEPH_MSG_MDS_MAP:
1001 case CEPH_MSG_OSD_MAP: 995 case CEPH_MSG_OSD_MAP:
1002 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 996 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
997 if (!m)
998 return NULL; /* ENOMEM--return skip == 0 */
1003 break; 999 break;
1004 } 1000 }
1005 1001
@@ -1029,7 +1025,7 @@ static void mon_fault(struct ceph_connection *con)
1029 if (!monc->hunting) 1025 if (!monc->hunting)
1030 pr_info("mon%d %s session lost, " 1026 pr_info("mon%d %s session lost, "
1031 "hunting for new mon\n", monc->cur_mon, 1027 "hunting for new mon\n", monc->cur_mon,
1032 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 1028 ceph_pr_addr(&monc->con.peer_addr.in_addr));
1033 1029
1034 __close_session(monc); 1030 __close_session(monc);
1035 if (!monc->hunting) { 1031 if (!monc->hunting) {
@@ -1044,9 +1040,23 @@ out:
1044 mutex_unlock(&monc->mutex); 1040 mutex_unlock(&monc->mutex);
1045} 1041}
1046 1042
1043/*
1044 * We can ignore refcounting on the connection struct, as all references
1045 * will come from the messenger workqueue, which is drained prior to
1046 * mon_client destruction.
1047 */
1048static struct ceph_connection *con_get(struct ceph_connection *con)
1049{
1050 return con;
1051}
1052
1053static void con_put(struct ceph_connection *con)
1054{
1055}
1056
1047static const struct ceph_connection_operations mon_con_ops = { 1057static const struct ceph_connection_operations mon_con_ops = {
1048 .get = ceph_con_get, 1058 .get = con_get,
1049 .put = ceph_con_put, 1059 .put = con_put,
1050 .dispatch = dispatch, 1060 .dispatch = dispatch,
1051 .fault = mon_fault, 1061 .fault = mon_fault,
1052 .alloc_msg = mon_alloc_msg, 1062 .alloc_msg = mon_alloc_msg,
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 11d5f4196a73..ddec1c10ac80 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
12 struct ceph_msgpool *pool = arg; 12 struct ceph_msgpool *pool = arg;
13 struct ceph_msg *msg; 13 struct ceph_msg *msg;
14 14
15 msg = ceph_msg_new(0, pool->front_len, gfp_mask, true); 15 msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
16 if (!msg) { 16 if (!msg) {
17 dout("msgpool_alloc %s failed\n", pool->name); 17 dout("msgpool_alloc %s failed\n", pool->name);
18 } else { 18 } else {
@@ -32,10 +32,11 @@ static void msgpool_free(void *element, void *arg)
32 ceph_msg_put(msg); 32 ceph_msg_put(msg);
33} 33}
34 34
35int ceph_msgpool_init(struct ceph_msgpool *pool, 35int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
36 int front_len, int size, bool blocking, const char *name) 36 int front_len, int size, bool blocking, const char *name)
37{ 37{
38 dout("msgpool %s init\n", name); 38 dout("msgpool %s init\n", name);
39 pool->type = type;
39 pool->front_len = front_len; 40 pool->front_len = front_len;
40 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); 41 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
41 if (!pool->pool) 42 if (!pool->pool)
@@ -61,7 +62,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
61 WARN_ON(1); 62 WARN_ON(1);
62 63
63 /* try to alloc a fresh message */ 64 /* try to alloc a fresh message */
64 return ceph_msg_new(0, front_len, GFP_NOFS, false); 65 return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
65 } 66 }
66 67
67 msg = mempool_alloc(pool->pool, GFP_NOFS); 68 msg = mempool_alloc(pool->pool, GFP_NOFS);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ca59e66c9787..42119c05e82c 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
140 if (req->r_request) 140 if (req->r_request)
141 ceph_msg_put(req->r_request); 141 ceph_msg_put(req->r_request);
142 if (req->r_con_filling_msg) { 142 if (req->r_con_filling_msg) {
143 dout("release_request revoking pages %p from con %p\n", 143 dout("%s revoking pages %p from con %p\n", __func__,
144 req->r_pages, req->r_con_filling_msg); 144 req->r_pages, req->r_con_filling_msg);
145 ceph_con_revoke_message(req->r_con_filling_msg, 145 ceph_msg_revoke_incoming(req->r_reply);
146 req->r_reply);
147 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 146 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
148 } 147 }
149 if (req->r_reply) 148 if (req->r_reply)
@@ -214,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
214 kref_init(&req->r_kref); 213 kref_init(&req->r_kref);
215 init_completion(&req->r_completion); 214 init_completion(&req->r_completion);
216 init_completion(&req->r_safe_completion); 215 init_completion(&req->r_safe_completion);
216 rb_init_node(&req->r_node);
217 INIT_LIST_HEAD(&req->r_unsafe_item); 217 INIT_LIST_HEAD(&req->r_unsafe_item);
218 INIT_LIST_HEAD(&req->r_linger_item); 218 INIT_LIST_HEAD(&req->r_linger_item);
219 INIT_LIST_HEAD(&req->r_linger_osd); 219 INIT_LIST_HEAD(&req->r_linger_osd);
220 INIT_LIST_HEAD(&req->r_req_lru_item); 220 INIT_LIST_HEAD(&req->r_req_lru_item);
221 INIT_LIST_HEAD(&req->r_osd_item);
222
221 req->r_flags = flags; 223 req->r_flags = flags;
222 224
223 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 225 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -243,6 +245,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
243 } 245 }
244 ceph_pagelist_init(req->r_trail); 246 ceph_pagelist_init(req->r_trail);
245 } 247 }
248
246 /* create request message; allow space for oid */ 249 /* create request message; allow space for oid */
247 msg_size += MAX_OBJ_NAME_SIZE; 250 msg_size += MAX_OBJ_NAME_SIZE;
248 if (snapc) 251 if (snapc)
@@ -256,7 +259,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
256 return NULL; 259 return NULL;
257 } 260 }
258 261
259 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
260 memset(msg->front.iov_base, 0, msg->front.iov_len); 262 memset(msg->front.iov_base, 0, msg->front.iov_len);
261 263
262 req->r_request = msg; 264 req->r_request = msg;
@@ -624,7 +626,7 @@ static void osd_reset(struct ceph_connection *con)
624/* 626/*
625 * Track open sessions with osds. 627 * Track open sessions with osds.
626 */ 628 */
627static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) 629static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
628{ 630{
629 struct ceph_osd *osd; 631 struct ceph_osd *osd;
630 632
@@ -634,15 +636,13 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
634 636
635 atomic_set(&osd->o_ref, 1); 637 atomic_set(&osd->o_ref, 1);
636 osd->o_osdc = osdc; 638 osd->o_osdc = osdc;
639 osd->o_osd = onum;
637 INIT_LIST_HEAD(&osd->o_requests); 640 INIT_LIST_HEAD(&osd->o_requests);
638 INIT_LIST_HEAD(&osd->o_linger_requests); 641 INIT_LIST_HEAD(&osd->o_linger_requests);
639 INIT_LIST_HEAD(&osd->o_osd_lru); 642 INIT_LIST_HEAD(&osd->o_osd_lru);
640 osd->o_incarnation = 1; 643 osd->o_incarnation = 1;
641 644
642 ceph_con_init(osdc->client->msgr, &osd->o_con); 645 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
643 osd->o_con.private = osd;
644 osd->o_con.ops = &osd_con_ops;
645 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
646 646
647 INIT_LIST_HEAD(&osd->o_keepalive_item); 647 INIT_LIST_HEAD(&osd->o_keepalive_item);
648 return osd; 648 return osd;
@@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
688 688
689static void remove_all_osds(struct ceph_osd_client *osdc) 689static void remove_all_osds(struct ceph_osd_client *osdc)
690{ 690{
691 dout("__remove_old_osds %p\n", osdc); 691 dout("%s %p\n", __func__, osdc);
692 mutex_lock(&osdc->request_mutex); 692 mutex_lock(&osdc->request_mutex);
693 while (!RB_EMPTY_ROOT(&osdc->osds)) { 693 while (!RB_EMPTY_ROOT(&osdc->osds)) {
694 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 694 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
@@ -752,7 +752,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
752 ret = -EAGAIN; 752 ret = -EAGAIN;
753 } else { 753 } else {
754 ceph_con_close(&osd->o_con); 754 ceph_con_close(&osd->o_con);
755 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); 755 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
756 &osdc->osdmap->osd_addr[osd->o_osd]);
756 osd->o_incarnation++; 757 osd->o_incarnation++;
757 } 758 }
758 return ret; 759 return ret;
@@ -853,7 +854,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
853 854
854 if (req->r_osd) { 855 if (req->r_osd) {
855 /* make sure the original request isn't in flight. */ 856 /* make sure the original request isn't in flight. */
856 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 857 ceph_msg_revoke(req->r_request);
857 858
858 list_del_init(&req->r_osd_item); 859 list_del_init(&req->r_osd_item);
859 if (list_empty(&req->r_osd->o_requests) && 860 if (list_empty(&req->r_osd->o_requests) &&
@@ -880,7 +881,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
880static void __cancel_request(struct ceph_osd_request *req) 881static void __cancel_request(struct ceph_osd_request *req)
881{ 882{
882 if (req->r_sent && req->r_osd) { 883 if (req->r_sent && req->r_osd) {
883 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 884 ceph_msg_revoke(req->r_request);
884 req->r_sent = 0; 885 req->r_sent = 0;
885 } 886 }
886} 887}
@@ -890,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
890{ 891{
891 dout("__register_linger_request %p\n", req); 892 dout("__register_linger_request %p\n", req);
892 list_add_tail(&req->r_linger_item, &osdc->req_linger); 893 list_add_tail(&req->r_linger_item, &osdc->req_linger);
893 list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); 894 if (req->r_osd)
895 list_add_tail(&req->r_linger_osd,
896 &req->r_osd->o_linger_requests);
894} 897}
895 898
896static void __unregister_linger_request(struct ceph_osd_client *osdc, 899static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -998,18 +1001,18 @@ static int __map_request(struct ceph_osd_client *osdc,
998 req->r_osd = __lookup_osd(osdc, o); 1001 req->r_osd = __lookup_osd(osdc, o);
999 if (!req->r_osd && o >= 0) { 1002 if (!req->r_osd && o >= 0) {
1000 err = -ENOMEM; 1003 err = -ENOMEM;
1001 req->r_osd = create_osd(osdc); 1004 req->r_osd = create_osd(osdc, o);
1002 if (!req->r_osd) { 1005 if (!req->r_osd) {
1003 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1006 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1004 goto out; 1007 goto out;
1005 } 1008 }
1006 1009
1007 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1010 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1008 req->r_osd->o_osd = o;
1009 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
1010 __insert_osd(osdc, req->r_osd); 1011 __insert_osd(osdc, req->r_osd);
1011 1012
1012 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); 1013 ceph_con_open(&req->r_osd->o_con,
1014 CEPH_ENTITY_TYPE_OSD, o,
1015 &osdc->osdmap->osd_addr[o]);
1013 } 1016 }
1014 1017
1015 if (req->r_osd) { 1018 if (req->r_osd) {
@@ -1304,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1304 1307
1305 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1308 dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1306 mutex_lock(&osdc->request_mutex); 1309 mutex_lock(&osdc->request_mutex);
1307 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 1310 for (p = rb_first(&osdc->requests); p; ) {
1308 req = rb_entry(p, struct ceph_osd_request, r_node); 1311 req = rb_entry(p, struct ceph_osd_request, r_node);
1312 p = rb_next(p);
1309 err = __map_request(osdc, req, force_resend); 1313 err = __map_request(osdc, req, force_resend);
1310 if (err < 0) 1314 if (err < 0)
1311 continue; /* error */ 1315 continue; /* error */
@@ -1313,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1313 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1317 dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1314 needmap++; /* request a newer map */ 1318 needmap++; /* request a newer map */
1315 } else if (err > 0) { 1319 } else if (err > 0) {
1316 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, 1320 if (!req->r_linger) {
1317 req->r_osd ? req->r_osd->o_osd : -1); 1321 dout("%p tid %llu requeued on osd%d\n", req,
1318 if (!req->r_linger) 1322 req->r_tid,
1323 req->r_osd ? req->r_osd->o_osd : -1);
1319 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1324 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1325 }
1326 }
1327 if (req->r_linger && list_empty(&req->r_linger_item)) {
1328 /*
1329 * register as a linger so that we will
1330 * re-submit below and get a new tid
1331 */
1332 dout("%p tid %llu restart on osd%d\n",
1333 req, req->r_tid,
1334 req->r_osd ? req->r_osd->o_osd : -1);
1335 __register_linger_request(osdc, req);
1336 __unregister_request(osdc, req);
1320 } 1337 }
1321 } 1338 }
1322 1339
@@ -1391,7 +1408,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1391 epoch, maplen); 1408 epoch, maplen);
1392 newmap = osdmap_apply_incremental(&p, next, 1409 newmap = osdmap_apply_incremental(&p, next,
1393 osdc->osdmap, 1410 osdc->osdmap,
1394 osdc->client->msgr); 1411 &osdc->client->msgr);
1395 if (IS_ERR(newmap)) { 1412 if (IS_ERR(newmap)) {
1396 err = PTR_ERR(newmap); 1413 err = PTR_ERR(newmap);
1397 goto bad; 1414 goto bad;
@@ -1839,11 +1856,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1839 if (!osdc->req_mempool) 1856 if (!osdc->req_mempool)
1840 goto out; 1857 goto out;
1841 1858
1842 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, 1859 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1860 OSD_OP_FRONT_LEN, 10, true,
1843 "osd_op"); 1861 "osd_op");
1844 if (err < 0) 1862 if (err < 0)
1845 goto out_mempool; 1863 goto out_mempool;
1846 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 1864 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1847 OSD_OPREPLY_FRONT_LEN, 10, true, 1865 OSD_OPREPLY_FRONT_LEN, 10, true,
1848 "osd_op_reply"); 1866 "osd_op_reply");
1849 if (err < 0) 1867 if (err < 0)
@@ -2019,15 +2037,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2019 if (!req) { 2037 if (!req) {
2020 *skip = 1; 2038 *skip = 1;
2021 m = NULL; 2039 m = NULL;
2022 pr_info("get_reply unknown tid %llu from osd%d\n", tid, 2040 dout("get_reply unknown tid %llu from osd%d\n", tid,
2023 osd->o_osd); 2041 osd->o_osd);
2024 goto out; 2042 goto out;
2025 } 2043 }
2026 2044
2027 if (req->r_con_filling_msg) { 2045 if (req->r_con_filling_msg) {
2028 dout("get_reply revoking msg %p from old con %p\n", 2046 dout("%s revoking msg %p from old con %p\n", __func__,
2029 req->r_reply, req->r_con_filling_msg); 2047 req->r_reply, req->r_con_filling_msg);
2030 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); 2048 ceph_msg_revoke_incoming(req->r_reply);
2031 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2049 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2032 req->r_con_filling_msg = NULL; 2050 req->r_con_filling_msg = NULL;
2033 } 2051 }
@@ -2080,6 +2098,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2080 int type = le16_to_cpu(hdr->type); 2098 int type = le16_to_cpu(hdr->type);
2081 int front = le32_to_cpu(hdr->front_len); 2099 int front = le32_to_cpu(hdr->front_len);
2082 2100
2101 *skip = 0;
2083 switch (type) { 2102 switch (type) {
2084 case CEPH_MSG_OSD_MAP: 2103 case CEPH_MSG_OSD_MAP:
2085 case CEPH_MSG_WATCH_NOTIFY: 2104 case CEPH_MSG_WATCH_NOTIFY:
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 81e3b84a77ef..3124b71a8883 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -135,6 +135,21 @@ bad:
135 return -EINVAL; 135 return -EINVAL;
136} 136}
137 137
138static int skip_name_map(void **p, void *end)
139{
140 int len;
141 ceph_decode_32_safe(p, end, len ,bad);
142 while (len--) {
143 int strlen;
144 *p += sizeof(u32);
145 ceph_decode_32_safe(p, end, strlen, bad);
146 *p += strlen;
147}
148 return 0;
149bad:
150 return -EINVAL;
151}
152
138static struct crush_map *crush_decode(void *pbyval, void *end) 153static struct crush_map *crush_decode(void *pbyval, void *end)
139{ 154{
140 struct crush_map *c; 155 struct crush_map *c;
@@ -143,6 +158,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
143 void **p = &pbyval; 158 void **p = &pbyval;
144 void *start = pbyval; 159 void *start = pbyval;
145 u32 magic; 160 u32 magic;
161 u32 num_name_maps;
146 162
147 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 163 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
148 164
@@ -150,6 +166,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
150 if (c == NULL) 166 if (c == NULL)
151 return ERR_PTR(-ENOMEM); 167 return ERR_PTR(-ENOMEM);
152 168
169 /* set tunables to default values */
170 c->choose_local_tries = 2;
171 c->choose_local_fallback_tries = 5;
172 c->choose_total_tries = 19;
173
153 ceph_decode_need(p, end, 4*sizeof(u32), bad); 174 ceph_decode_need(p, end, 4*sizeof(u32), bad);
154 magic = ceph_decode_32(p); 175 magic = ceph_decode_32(p);
155 if (magic != CRUSH_MAGIC) { 176 if (magic != CRUSH_MAGIC) {
@@ -297,7 +318,25 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
297 } 318 }
298 319
299 /* ignore trailing name maps. */ 320 /* ignore trailing name maps. */
321 for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
322 err = skip_name_map(p, end);
323 if (err < 0)
324 goto done;
325 }
326
327 /* tunables */
328 ceph_decode_need(p, end, 3*sizeof(u32), done);
329 c->choose_local_tries = ceph_decode_32(p);
330 c->choose_local_fallback_tries = ceph_decode_32(p);
331 c->choose_total_tries = ceph_decode_32(p);
332 dout("crush decode tunable choose_local_tries = %d",
333 c->choose_local_tries);
334 dout("crush decode tunable choose_local_fallback_tries = %d",
335 c->choose_local_fallback_tries);
336 dout("crush decode tunable choose_total_tries = %d",
337 c->choose_total_tries);
300 338
339done:
301 dout("crush_decode success\n"); 340 dout("crush_decode success\n");
302 return c; 341 return c;
303 342
@@ -488,15 +527,16 @@ static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
488 ceph_decode_32_safe(p, end, pool, bad); 527 ceph_decode_32_safe(p, end, pool, bad);
489 ceph_decode_32_safe(p, end, len, bad); 528 ceph_decode_32_safe(p, end, len, bad);
490 dout(" pool %d len %d\n", pool, len); 529 dout(" pool %d len %d\n", pool, len);
530 ceph_decode_need(p, end, len, bad);
491 pi = __lookup_pg_pool(&map->pg_pools, pool); 531 pi = __lookup_pg_pool(&map->pg_pools, pool);
492 if (pi) { 532 if (pi) {
533 char *name = kstrndup(*p, len, GFP_NOFS);
534
535 if (!name)
536 return -ENOMEM;
493 kfree(pi->name); 537 kfree(pi->name);
494 pi->name = kmalloc(len + 1, GFP_NOFS); 538 pi->name = name;
495 if (pi->name) { 539 dout(" name is %s\n", pi->name);
496 memcpy(pi->name, *p, len);
497 pi->name[len] = '\0';
498 dout(" name is %s\n", pi->name);
499 }
500 } 540 }
501 *p += len; 541 *p += len;
502 } 542 }
@@ -666,6 +706,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
666 ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); 706 ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
667 ceph_decode_copy(p, &pgid, sizeof(pgid)); 707 ceph_decode_copy(p, &pgid, sizeof(pgid));
668 n = ceph_decode_32(p); 708 n = ceph_decode_32(p);
709 err = -EINVAL;
710 if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
711 goto bad;
669 ceph_decode_need(p, end, n * sizeof(u32), bad); 712 ceph_decode_need(p, end, n * sizeof(u32), bad);
670 err = -ENOMEM; 713 err = -ENOMEM;
671 pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); 714 pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
@@ -889,6 +932,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
889 (void) __remove_pg_mapping(&map->pg_temp, pgid); 932 (void) __remove_pg_mapping(&map->pg_temp, pgid);
890 933
891 /* insert */ 934 /* insert */
935 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
936 err = -EINVAL;
937 goto bad;
938 }
892 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 939 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
893 if (!pg) { 940 if (!pg) {
894 err = -ENOMEM; 941 err = -ENOMEM;