aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-03-28 13:01:29 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-03-28 13:01:29 -0400
commit56b59b429b4c26e5e730bc8c3d837de9f7d0a966 (patch)
tree191bf87e438a3985ccb7e3c5382fab8d31f94edb
parent9a7259d5c8978bbeb5fdcf64b168f8470d8208a6 (diff)
parentc666601a935b94cc0f3310339411b6940de751ba (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates for 3.4-rc1 from Sage Weil: "Alex has been busy. There are a range of rbd and libceph cleanups, especially surrounding device setup and teardown, and a few critical fixes in that code. There are more cleanups in the messenger code, virtual xattrs, a fix for CRC calculation/checks, and lots of other miscellaneous stuff. There's a patch from Amon Ott to make inos behave a bit better on 32-bit boxes, some decode check fixes from Xi Wang, and network throttling fix from Jim Schutt, and a couple RBD fixes from Josh Durgin. No new functionality, just a lot of cleanup and bug fixing." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (65 commits) rbd: move snap_rwsem to the device, rename to header_rwsem ceph: fix three bugs, two in ceph_vxattrcb_file_layout() libceph: isolate kmap() call in write_partial_msg_pages() libceph: rename "page_shift" variable to something sensible libceph: get rid of zero_page_address libceph: only call kernel_sendpage() via helper libceph: use kernel_sendpage() for sending zeroes libceph: fix inverted crc option logic libceph: some simple changes libceph: small refactor in write_partial_kvec() libceph: do crc calculations outside loop libceph: separate CRC calculation from byte swapping libceph: use "do" in CRC-related Boolean variables ceph: ensure Boolean options support both senses libceph: a few small changes libceph: make ceph_tcp_connect() return int libceph: encapsulate some messenger cleanup code libceph: make ceph_msgr_wq private libceph: encapsulate connection kvec operations libceph: move prepare_write_banner() ...
-rw-r--r--drivers/block/rbd.c730
-rw-r--r--drivers/block/rbd_types.h4
-rw-r--r--fs/ceph/inode.c11
-rw-r--r--fs/ceph/mds_client.c7
-rw-r--r--fs/ceph/snap.c2
-rw-r--r--fs/ceph/super.c19
-rw-r--r--fs/ceph/super.h4
-rw-r--r--fs/ceph/xattr.c202
-rw-r--r--include/linux/ceph/libceph.h2
-rw-r--r--include/linux/ceph/messenger.h5
-rw-r--r--net/ceph/ceph_common.c26
-rw-r--r--net/ceph/messenger.c456
-rw-r--r--net/ceph/osdmap.c3
13 files changed, 868 insertions, 603 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index a6278e7e61a0..013c7a549fb6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -41,19 +41,35 @@
41 41
42#include "rbd_types.h" 42#include "rbd_types.h"
43 43
44#define DRV_NAME "rbd" 44/*
45#define DRV_NAME_LONG "rbd (rados block device)" 45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
46 55
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48 57
49#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX)) 58#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50#define RBD_MAX_POOL_NAME_LEN 64 59#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32 60#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024 61#define RBD_MAX_OPT_LEN 1024
53 62
54#define RBD_SNAP_HEAD_NAME "-" 63#define RBD_SNAP_HEAD_NAME "-"
55 64
65/*
66 * An RBD device name will be "rbd#", where the "rbd" comes from
67 * RBD_DRV_NAME above, and # is a unique integer identifier.
68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69 * enough to hold all possible device names.
70 */
56#define DEV_NAME_LEN 32 71#define DEV_NAME_LEN 32
72#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
57 73
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10 74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59 75
@@ -66,7 +82,6 @@ struct rbd_image_header {
66 __u8 obj_order; 82 __u8 obj_order;
67 __u8 crypt_type; 83 __u8 crypt_type;
68 __u8 comp_type; 84 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc; 85 struct ceph_snap_context *snapc;
71 size_t snap_names_len; 86 size_t snap_names_len;
72 u64 snap_seq; 87 u64 snap_seq;
@@ -83,7 +98,7 @@ struct rbd_options {
83}; 98};
84 99
85/* 100/*
86 * an instance of the client. multiple devices may share a client. 101 * an instance of the client. multiple devices may share an rbd client.
87 */ 102 */
88struct rbd_client { 103struct rbd_client {
89 struct ceph_client *client; 104 struct ceph_client *client;
@@ -92,20 +107,9 @@ struct rbd_client {
92 struct list_head node; 107 struct list_head node;
93}; 108};
94 109
95struct rbd_req_coll;
96
97/* 110/*
98 * a single io request 111 * a request completion status
99 */ 112 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status { 113struct rbd_req_status {
110 int done; 114 int done;
111 int rc; 115 int rc;
@@ -122,6 +126,18 @@ struct rbd_req_coll {
122 struct rbd_req_status status[0]; 126 struct rbd_req_status status[0];
123}; 127};
124 128
129/*
130 * a single io request
131 */
132struct rbd_request {
133 struct request *rq; /* blk layer request */
134 struct bio *bio; /* cloned bio */
135 struct page **pages; /* list of used pages */
136 u64 len;
137 int coll_index;
138 struct rbd_req_coll *coll;
139};
140
125struct rbd_snap { 141struct rbd_snap {
126 struct device dev; 142 struct device dev;
127 const char *name; 143 const char *name;
@@ -140,7 +156,6 @@ struct rbd_device {
140 struct gendisk *disk; /* blkdev's gendisk and rq */ 156 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q; 157 struct request_queue *q;
142 158
143 struct ceph_client *client;
144 struct rbd_client *rbd_client; 159 struct rbd_client *rbd_client;
145 160
146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 161 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -157,6 +172,8 @@ struct rbd_device {
157 struct ceph_osd_event *watch_event; 172 struct ceph_osd_event *watch_event;
158 struct ceph_osd_request *watch_request; 173 struct ceph_osd_request *watch_request;
159 174
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem;
160 char snap_name[RBD_MAX_SNAP_NAME_LEN]; 177 char snap_name[RBD_MAX_SNAP_NAME_LEN];
161 u32 cur_snap; /* index+1 of current snapshot within snap context 178 u32 cur_snap; /* index+1 of current snapshot within snap context
162 0 - for the head */ 179 0 - for the head */
@@ -171,15 +188,13 @@ struct rbd_device {
171 struct device dev; 188 struct device dev;
172}; 189};
173 190
174static struct bus_type rbd_bus_type = {
175 .name = "rbd",
176};
177
178static spinlock_t node_lock; /* protects client get/put */
179
180static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192
181static LIST_HEAD(rbd_dev_list); /* devices */ 193static LIST_HEAD(rbd_dev_list); /* devices */
182static LIST_HEAD(rbd_client_list); /* clients */ 194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
183 198
184static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); 199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185static void rbd_dev_release(struct device *dev); 200static void rbd_dev_release(struct device *dev);
@@ -190,12 +205,32 @@ static ssize_t rbd_snap_add(struct device *dev,
190static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, 205static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191 struct rbd_snap *snap); 206 struct rbd_snap *snap);
192 207
208static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 size_t count);
210static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 size_t count);
193 212
194static struct rbd_device *dev_to_rbd(struct device *dev) 213static struct bus_attribute rbd_bus_attrs[] = {
214 __ATTR(add, S_IWUSR, NULL, rbd_add),
215 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 __ATTR_NULL
217};
218
219static struct bus_type rbd_bus_type = {
220 .name = "rbd",
221 .bus_attrs = rbd_bus_attrs,
222};
223
224static void rbd_root_dev_release(struct device *dev)
195{ 225{
196 return container_of(dev, struct rbd_device, dev);
197} 226}
198 227
228static struct device rbd_root_dev = {
229 .init_name = "rbd",
230 .release = rbd_root_dev_release,
231};
232
233
199static struct device *rbd_get_dev(struct rbd_device *rbd_dev) 234static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200{ 235{
201 return get_device(&rbd_dev->dev); 236 return get_device(&rbd_dev->dev);
@@ -210,8 +245,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210 245
211static int rbd_open(struct block_device *bdev, fmode_t mode) 246static int rbd_open(struct block_device *bdev, fmode_t mode)
212{ 247{
213 struct gendisk *disk = bdev->bd_disk; 248 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
214 struct rbd_device *rbd_dev = disk->private_data;
215 249
216 rbd_get_dev(rbd_dev); 250 rbd_get_dev(rbd_dev);
217 251
@@ -256,9 +290,11 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
256 kref_init(&rbdc->kref); 290 kref_init(&rbdc->kref);
257 INIT_LIST_HEAD(&rbdc->node); 291 INIT_LIST_HEAD(&rbdc->node);
258 292
293 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
259 rbdc->client = ceph_create_client(opt, rbdc, 0, 0); 295 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260 if (IS_ERR(rbdc->client)) 296 if (IS_ERR(rbdc->client))
261 goto out_rbdc; 297 goto out_mutex;
262 opt = NULL; /* Now rbdc->client is responsible for opt */ 298 opt = NULL; /* Now rbdc->client is responsible for opt */
263 299
264 ret = ceph_open_session(rbdc->client); 300 ret = ceph_open_session(rbdc->client);
@@ -267,16 +303,19 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
267 303
268 rbdc->rbd_opts = rbd_opts; 304 rbdc->rbd_opts = rbd_opts;
269 305
270 spin_lock(&node_lock); 306 spin_lock(&rbd_client_list_lock);
271 list_add_tail(&rbdc->node, &rbd_client_list); 307 list_add_tail(&rbdc->node, &rbd_client_list);
272 spin_unlock(&node_lock); 308 spin_unlock(&rbd_client_list_lock);
309
310 mutex_unlock(&ctl_mutex);
273 311
274 dout("rbd_client_create created %p\n", rbdc); 312 dout("rbd_client_create created %p\n", rbdc);
275 return rbdc; 313 return rbdc;
276 314
277out_err: 315out_err:
278 ceph_destroy_client(rbdc->client); 316 ceph_destroy_client(rbdc->client);
279out_rbdc: 317out_mutex:
318 mutex_unlock(&ctl_mutex);
280 kfree(rbdc); 319 kfree(rbdc);
281out_opt: 320out_opt:
282 if (opt) 321 if (opt)
@@ -324,7 +363,7 @@ static int parse_rbd_opts_token(char *c, void *private)
324 substring_t argstr[MAX_OPT_ARGS]; 363 substring_t argstr[MAX_OPT_ARGS];
325 int token, intval, ret; 364 int token, intval, ret;
326 365
327 token = match_token((char *)c, rbdopt_tokens, argstr); 366 token = match_token(c, rbdopt_tokens, argstr);
328 if (token < 0) 367 if (token < 0)
329 return -EINVAL; 368 return -EINVAL;
330 369
@@ -357,58 +396,54 @@ static int parse_rbd_opts_token(char *c, void *private)
357 * Get a ceph client with specific addr and configuration, if one does 396 * Get a ceph client with specific addr and configuration, if one does
358 * not exist create it. 397 * not exist create it.
359 */ 398 */
360static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, 399static struct rbd_client *rbd_get_client(const char *mon_addr,
361 char *options) 400 size_t mon_addr_len,
401 char *options)
362{ 402{
363 struct rbd_client *rbdc; 403 struct rbd_client *rbdc;
364 struct ceph_options *opt; 404 struct ceph_options *opt;
365 int ret;
366 struct rbd_options *rbd_opts; 405 struct rbd_options *rbd_opts;
367 406
368 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); 407 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369 if (!rbd_opts) 408 if (!rbd_opts)
370 return -ENOMEM; 409 return ERR_PTR(-ENOMEM);
371 410
372 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; 411 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373 412
374 ret = ceph_parse_options(&opt, options, mon_addr, 413 opt = ceph_parse_options(options, mon_addr,
375 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts); 414 mon_addr + mon_addr_len,
376 if (ret < 0) 415 parse_rbd_opts_token, rbd_opts);
377 goto done_err; 416 if (IS_ERR(opt)) {
417 kfree(rbd_opts);
418 return ERR_CAST(opt);
419 }
378 420
379 spin_lock(&node_lock); 421 spin_lock(&rbd_client_list_lock);
380 rbdc = __rbd_client_find(opt); 422 rbdc = __rbd_client_find(opt);
381 if (rbdc) { 423 if (rbdc) {
424 /* using an existing client */
425 kref_get(&rbdc->kref);
426 spin_unlock(&rbd_client_list_lock);
427
382 ceph_destroy_options(opt); 428 ceph_destroy_options(opt);
383 kfree(rbd_opts); 429 kfree(rbd_opts);
384 430
385 /* using an existing client */ 431 return rbdc;
386 kref_get(&rbdc->kref);
387 rbd_dev->rbd_client = rbdc;
388 rbd_dev->client = rbdc->client;
389 spin_unlock(&node_lock);
390 return 0;
391 } 432 }
392 spin_unlock(&node_lock); 433 spin_unlock(&rbd_client_list_lock);
393 434
394 rbdc = rbd_client_create(opt, rbd_opts); 435 rbdc = rbd_client_create(opt, rbd_opts);
395 if (IS_ERR(rbdc)) {
396 ret = PTR_ERR(rbdc);
397 goto done_err;
398 }
399 436
400 rbd_dev->rbd_client = rbdc; 437 if (IS_ERR(rbdc))
401 rbd_dev->client = rbdc->client; 438 kfree(rbd_opts);
402 return 0; 439
403done_err: 440 return rbdc;
404 kfree(rbd_opts);
405 return ret;
406} 441}
407 442
408/* 443/*
409 * Destroy ceph client 444 * Destroy ceph client
410 * 445 *
411 * Caller must hold node_lock. 446 * Caller must hold rbd_client_list_lock.
412 */ 447 */
413static void rbd_client_release(struct kref *kref) 448static void rbd_client_release(struct kref *kref)
414{ 449{
@@ -428,11 +463,10 @@ static void rbd_client_release(struct kref *kref)
428 */ 463 */
429static void rbd_put_client(struct rbd_device *rbd_dev) 464static void rbd_put_client(struct rbd_device *rbd_dev)
430{ 465{
431 spin_lock(&node_lock); 466 spin_lock(&rbd_client_list_lock);
432 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); 467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
433 spin_unlock(&node_lock); 468 spin_unlock(&rbd_client_list_lock);
434 rbd_dev->rbd_client = NULL; 469 rbd_dev->rbd_client = NULL;
435 rbd_dev->client = NULL;
436} 470}
437 471
438/* 472/*
@@ -457,21 +491,19 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
457 gfp_t gfp_flags) 491 gfp_t gfp_flags)
458{ 492{
459 int i; 493 int i;
460 u32 snap_count = le32_to_cpu(ondisk->snap_count); 494 u32 snap_count;
461 int ret = -ENOMEM;
462 495
463 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) { 496 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
464 return -ENXIO; 497 return -ENXIO;
465 }
466 498
467 init_rwsem(&header->snap_rwsem); 499 snap_count = le32_to_cpu(ondisk->snap_count);
468 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469 header->snapc = kmalloc(sizeof(struct ceph_snap_context) + 500 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470 snap_count * 501 snap_count * sizeof (*ondisk),
471 sizeof(struct rbd_image_snap_ondisk),
472 gfp_flags); 502 gfp_flags);
473 if (!header->snapc) 503 if (!header->snapc)
474 return -ENOMEM; 504 return -ENOMEM;
505
506 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
475 if (snap_count) { 507 if (snap_count) {
476 header->snap_names = kmalloc(header->snap_names_len, 508 header->snap_names = kmalloc(header->snap_names_len,
477 GFP_KERNEL); 509 GFP_KERNEL);
@@ -498,8 +530,7 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
498 header->snapc->num_snaps = snap_count; 530 header->snapc->num_snaps = snap_count;
499 header->total_snaps = snap_count; 531 header->total_snaps = snap_count;
500 532
501 if (snap_count && 533 if (snap_count && allocated_snaps == snap_count) {
502 allocated_snaps == snap_count) {
503 for (i = 0; i < snap_count; i++) { 534 for (i = 0; i < snap_count; i++) {
504 header->snapc->snaps[i] = 535 header->snapc->snaps[i] =
505 le64_to_cpu(ondisk->snaps[i].id); 536 le64_to_cpu(ondisk->snaps[i].id);
@@ -518,7 +549,7 @@ err_names:
518 kfree(header->snap_names); 549 kfree(header->snap_names);
519err_snapc: 550err_snapc:
520 kfree(header->snapc); 551 kfree(header->snapc);
521 return ret; 552 return -ENOMEM;
522} 553}
523 554
524static int snap_index(struct rbd_image_header *header, int snap_num) 555static int snap_index(struct rbd_image_header *header, int snap_num)
@@ -542,35 +573,34 @@ static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
542 int i; 573 int i;
543 char *p = header->snap_names; 574 char *p = header->snap_names;
544 575
545 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) { 576 for (i = 0; i < header->total_snaps; i++) {
546 if (strcmp(snap_name, p) == 0) 577 if (!strcmp(snap_name, p)) {
547 break;
548 }
549 if (i == header->total_snaps)
550 return -ENOENT;
551 if (seq)
552 *seq = header->snapc->snaps[i];
553 578
554 if (size) 579 /* Found it. Pass back its id and/or size */
555 *size = header->snap_sizes[i];
556 580
557 return i; 581 if (seq)
582 *seq = header->snapc->snaps[i];
583 if (size)
584 *size = header->snap_sizes[i];
585 return i;
586 }
587 p += strlen(p) + 1; /* Skip ahead to the next name */
588 }
589 return -ENOENT;
558} 590}
559 591
560static int rbd_header_set_snap(struct rbd_device *dev, 592static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
561 const char *snap_name,
562 u64 *size)
563{ 593{
564 struct rbd_image_header *header = &dev->header; 594 struct rbd_image_header *header = &dev->header;
565 struct ceph_snap_context *snapc = header->snapc; 595 struct ceph_snap_context *snapc = header->snapc;
566 int ret = -ENOENT; 596 int ret = -ENOENT;
567 597
568 down_write(&header->snap_rwsem); 598 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
569 599
570 if (!snap_name || 600 down_write(&dev->header_rwsem);
571 !*snap_name || 601
572 strcmp(snap_name, "-") == 0 || 602 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
573 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) { 603 sizeof (RBD_SNAP_HEAD_NAME))) {
574 if (header->total_snaps) 604 if (header->total_snaps)
575 snapc->seq = header->snap_seq; 605 snapc->seq = header->snap_seq;
576 else 606 else
@@ -580,7 +610,7 @@ static int rbd_header_set_snap(struct rbd_device *dev,
580 if (size) 610 if (size)
581 *size = header->image_size; 611 *size = header->image_size;
582 } else { 612 } else {
583 ret = snap_by_name(header, snap_name, &snapc->seq, size); 613 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
584 if (ret < 0) 614 if (ret < 0)
585 goto done; 615 goto done;
586 616
@@ -590,7 +620,7 @@ static int rbd_header_set_snap(struct rbd_device *dev,
590 620
591 ret = 0; 621 ret = 0;
592done: 622done:
593 up_write(&header->snap_rwsem); 623 up_write(&dev->header_rwsem);
594 return ret; 624 return ret;
595} 625}
596 626
@@ -717,7 +747,7 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
717 747
718 /* split the bio. We'll release it either in the next 748 /* split the bio. We'll release it either in the next
719 call, or it will have to be released outside */ 749 call, or it will have to be released outside */
720 bp = bio_split(old_chain, (len - total) / 512ULL); 750 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
721 if (!bp) 751 if (!bp)
722 goto err_out; 752 goto err_out;
723 753
@@ -857,7 +887,7 @@ static int rbd_do_request(struct request *rq,
857 struct timespec mtime = CURRENT_TIME; 887 struct timespec mtime = CURRENT_TIME;
858 struct rbd_request *req_data; 888 struct rbd_request *req_data;
859 struct ceph_osd_request_head *reqhead; 889 struct ceph_osd_request_head *reqhead;
860 struct rbd_image_header *header = &dev->header; 890 struct ceph_osd_client *osdc;
861 891
862 req_data = kzalloc(sizeof(*req_data), GFP_NOIO); 892 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
863 if (!req_data) { 893 if (!req_data) {
@@ -874,15 +904,13 @@ static int rbd_do_request(struct request *rq,
874 904
875 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs); 905 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
876 906
877 down_read(&header->snap_rwsem); 907 down_read(&dev->header_rwsem);
878 908
879 req = ceph_osdc_alloc_request(&dev->client->osdc, flags, 909 osdc = &dev->rbd_client->client->osdc;
880 snapc, 910 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
881 ops, 911 false, GFP_NOIO, pages, bio);
882 false,
883 GFP_NOIO, pages, bio);
884 if (!req) { 912 if (!req) {
885 up_read(&header->snap_rwsem); 913 up_read(&dev->header_rwsem);
886 ret = -ENOMEM; 914 ret = -ENOMEM;
887 goto done_pages; 915 goto done_pages;
888 } 916 }
@@ -909,27 +937,27 @@ static int rbd_do_request(struct request *rq,
909 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 937 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
910 layout->fl_pg_preferred = cpu_to_le32(-1); 938 layout->fl_pg_preferred = cpu_to_le32(-1);
911 layout->fl_pg_pool = cpu_to_le32(dev->poolid); 939 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
912 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid, 940 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
913 ofs, &len, &bno, req, ops); 941 req, ops);
914 942
915 ceph_osdc_build_request(req, ofs, &len, 943 ceph_osdc_build_request(req, ofs, &len,
916 ops, 944 ops,
917 snapc, 945 snapc,
918 &mtime, 946 &mtime,
919 req->r_oid, req->r_oid_len); 947 req->r_oid, req->r_oid_len);
920 up_read(&header->snap_rwsem); 948 up_read(&dev->header_rwsem);
921 949
922 if (linger_req) { 950 if (linger_req) {
923 ceph_osdc_set_request_linger(&dev->client->osdc, req); 951 ceph_osdc_set_request_linger(osdc, req);
924 *linger_req = req; 952 *linger_req = req;
925 } 953 }
926 954
927 ret = ceph_osdc_start_request(&dev->client->osdc, req, false); 955 ret = ceph_osdc_start_request(osdc, req, false);
928 if (ret < 0) 956 if (ret < 0)
929 goto done_err; 957 goto done_err;
930 958
931 if (!rbd_cb) { 959 if (!rbd_cb) {
932 ret = ceph_osdc_wait_request(&dev->client->osdc, req); 960 ret = ceph_osdc_wait_request(osdc, req);
933 if (ver) 961 if (ver)
934 *ver = le64_to_cpu(req->r_reassert_version.version); 962 *ver = le64_to_cpu(req->r_reassert_version.version);
935 dout("reassert_ver=%lld\n", 963 dout("reassert_ver=%lld\n",
@@ -1213,8 +1241,8 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213 rc = __rbd_update_snaps(dev); 1241 rc = __rbd_update_snaps(dev);
1214 mutex_unlock(&ctl_mutex); 1242 mutex_unlock(&ctl_mutex);
1215 if (rc) 1243 if (rc)
1216 pr_warning(DRV_NAME "%d got notification but failed to update" 1244 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1217 " snaps: %d\n", dev->major, rc); 1245 " update snaps: %d\n", dev->major, rc);
1218 1246
1219 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); 1247 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1220} 1248}
@@ -1227,7 +1255,7 @@ static int rbd_req_sync_watch(struct rbd_device *dev,
1227 u64 ver) 1255 u64 ver)
1228{ 1256{
1229 struct ceph_osd_req_op *ops; 1257 struct ceph_osd_req_op *ops;
1230 struct ceph_osd_client *osdc = &dev->client->osdc; 1258 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1231 1259
1232 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); 1260 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1233 if (ret < 0) 1261 if (ret < 0)
@@ -1314,7 +1342,7 @@ static int rbd_req_sync_notify(struct rbd_device *dev,
1314 const char *obj) 1342 const char *obj)
1315{ 1343{
1316 struct ceph_osd_req_op *ops; 1344 struct ceph_osd_req_op *ops;
1317 struct ceph_osd_client *osdc = &dev->client->osdc; 1345 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1318 struct ceph_osd_event *event; 1346 struct ceph_osd_event *event;
1319 struct rbd_notify_info info; 1347 struct rbd_notify_info info;
1320 int payload_len = sizeof(u32) + sizeof(u32); 1348 int payload_len = sizeof(u32) + sizeof(u32);
@@ -1421,9 +1449,7 @@ static void rbd_rq_fn(struct request_queue *q)
1421 struct request *rq; 1449 struct request *rq;
1422 struct bio_pair *bp = NULL; 1450 struct bio_pair *bp = NULL;
1423 1451
1424 rq = blk_fetch_request(q); 1452 while ((rq = blk_fetch_request(q))) {
1425
1426 while (1) {
1427 struct bio *bio; 1453 struct bio *bio;
1428 struct bio *rq_bio, *next_bio = NULL; 1454 struct bio *rq_bio, *next_bio = NULL;
1429 bool do_write; 1455 bool do_write;
@@ -1441,32 +1467,32 @@ static void rbd_rq_fn(struct request_queue *q)
1441 /* filter out block requests we don't understand */ 1467 /* filter out block requests we don't understand */
1442 if ((rq->cmd_type != REQ_TYPE_FS)) { 1468 if ((rq->cmd_type != REQ_TYPE_FS)) {
1443 __blk_end_request_all(rq, 0); 1469 __blk_end_request_all(rq, 0);
1444 goto next; 1470 continue;
1445 } 1471 }
1446 1472
1447 /* deduce our operation (read, write) */ 1473 /* deduce our operation (read, write) */
1448 do_write = (rq_data_dir(rq) == WRITE); 1474 do_write = (rq_data_dir(rq) == WRITE);
1449 1475
1450 size = blk_rq_bytes(rq); 1476 size = blk_rq_bytes(rq);
1451 ofs = blk_rq_pos(rq) * 512ULL; 1477 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1452 rq_bio = rq->bio; 1478 rq_bio = rq->bio;
1453 if (do_write && rbd_dev->read_only) { 1479 if (do_write && rbd_dev->read_only) {
1454 __blk_end_request_all(rq, -EROFS); 1480 __blk_end_request_all(rq, -EROFS);
1455 goto next; 1481 continue;
1456 } 1482 }
1457 1483
1458 spin_unlock_irq(q->queue_lock); 1484 spin_unlock_irq(q->queue_lock);
1459 1485
1460 dout("%s 0x%x bytes at 0x%llx\n", 1486 dout("%s 0x%x bytes at 0x%llx\n",
1461 do_write ? "write" : "read", 1487 do_write ? "write" : "read",
1462 size, blk_rq_pos(rq) * 512ULL); 1488 size, blk_rq_pos(rq) * SECTOR_SIZE);
1463 1489
1464 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); 1490 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1465 coll = rbd_alloc_coll(num_segs); 1491 coll = rbd_alloc_coll(num_segs);
1466 if (!coll) { 1492 if (!coll) {
1467 spin_lock_irq(q->queue_lock); 1493 spin_lock_irq(q->queue_lock);
1468 __blk_end_request_all(rq, -ENOMEM); 1494 __blk_end_request_all(rq, -ENOMEM);
1469 goto next; 1495 continue;
1470 } 1496 }
1471 1497
1472 do { 1498 do {
@@ -1512,8 +1538,6 @@ next_seg:
1512 if (bp) 1538 if (bp)
1513 bio_pair_release(bp); 1539 bio_pair_release(bp);
1514 spin_lock_irq(q->queue_lock); 1540 spin_lock_irq(q->queue_lock);
1515next:
1516 rq = blk_fetch_request(q);
1517 } 1541 }
1518} 1542}
1519 1543
@@ -1526,13 +1550,17 @@ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1526 struct bio_vec *bvec) 1550 struct bio_vec *bvec)
1527{ 1551{
1528 struct rbd_device *rbd_dev = q->queuedata; 1552 struct rbd_device *rbd_dev = q->queuedata;
1529 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9); 1553 unsigned int chunk_sectors;
1530 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); 1554 sector_t sector;
1531 unsigned int bio_sectors = bmd->bi_size >> 9; 1555 unsigned int bio_sectors;
1532 int max; 1556 int max;
1533 1557
1558 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1559 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1560 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1561
1534 max = (chunk_sectors - ((sector & (chunk_sectors - 1)) 1562 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1535 + bio_sectors)) << 9; 1563 + bio_sectors)) << SECTOR_SHIFT;
1536 if (max < 0) 1564 if (max < 0)
1537 max = 0; /* bio_add cannot handle a negative return */ 1565 max = 0; /* bio_add cannot handle a negative return */
1538 if (max <= bvec->bv_len && bio_sectors == 0) 1566 if (max <= bvec->bv_len && bio_sectors == 0)
@@ -1565,15 +1593,16 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1565 ssize_t rc; 1593 ssize_t rc;
1566 struct rbd_image_header_ondisk *dh; 1594 struct rbd_image_header_ondisk *dh;
1567 int snap_count = 0; 1595 int snap_count = 0;
1568 u64 snap_names_len = 0;
1569 u64 ver; 1596 u64 ver;
1597 size_t len;
1570 1598
1599 /*
1600 * First reads the fixed-size header to determine the number
1601 * of snapshots, then re-reads it, along with all snapshot
1602 * records as well as their stored names.
1603 */
1604 len = sizeof (*dh);
1571 while (1) { 1605 while (1) {
1572 int len = sizeof(*dh) +
1573 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1574 snap_names_len;
1575
1576 rc = -ENOMEM;
1577 dh = kmalloc(len, GFP_KERNEL); 1606 dh = kmalloc(len, GFP_KERNEL);
1578 if (!dh) 1607 if (!dh)
1579 return -ENOMEM; 1608 return -ENOMEM;
@@ -1588,21 +1617,22 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
1588 1617
1589 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); 1618 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1590 if (rc < 0) { 1619 if (rc < 0) {
1591 if (rc == -ENXIO) { 1620 if (rc == -ENXIO)
1592 pr_warning("unrecognized header format" 1621 pr_warning("unrecognized header format"
1593 " for image %s", rbd_dev->obj); 1622 " for image %s", rbd_dev->obj);
1594 }
1595 goto out_dh; 1623 goto out_dh;
1596 } 1624 }
1597 1625
1598 if (snap_count != header->total_snaps) { 1626 if (snap_count == header->total_snaps)
1599 snap_count = header->total_snaps; 1627 break;
1600 snap_names_len = header->snap_names_len; 1628
1601 rbd_header_free(header); 1629 snap_count = header->total_snaps;
1602 kfree(dh); 1630 len = sizeof (*dh) +
1603 continue; 1631 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1604 } 1632 header->snap_names_len;
1605 break; 1633
1634 rbd_header_free(header);
1635 kfree(dh);
1606 } 1636 }
1607 header->obj_version = ver; 1637 header->obj_version = ver;
1608 1638
@@ -1623,13 +1653,14 @@ static int rbd_header_add_snap(struct rbd_device *dev,
1623 int ret; 1653 int ret;
1624 void *data, *p, *e; 1654 void *data, *p, *e;
1625 u64 ver; 1655 u64 ver;
1656 struct ceph_mon_client *monc;
1626 1657
1627 /* we should create a snapshot only if we're pointing at the head */ 1658 /* we should create a snapshot only if we're pointing at the head */
1628 if (dev->cur_snap) 1659 if (dev->cur_snap)
1629 return -EINVAL; 1660 return -EINVAL;
1630 1661
1631 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid, 1662 monc = &dev->rbd_client->client->monc;
1632 &new_snapid); 1663 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1633 dout("created snapid=%lld\n", new_snapid); 1664 dout("created snapid=%lld\n", new_snapid);
1634 if (ret < 0) 1665 if (ret < 0)
1635 return ret; 1666 return ret;
@@ -1684,9 +1715,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1684 return ret; 1715 return ret;
1685 1716
1686 /* resized? */ 1717 /* resized? */
1687 set_capacity(rbd_dev->disk, h.image_size / 512ULL); 1718 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1688 1719
1689 down_write(&rbd_dev->header.snap_rwsem); 1720 down_write(&rbd_dev->header_rwsem);
1690 1721
1691 snap_seq = rbd_dev->header.snapc->seq; 1722 snap_seq = rbd_dev->header.snapc->seq;
1692 if (rbd_dev->header.total_snaps && 1723 if (rbd_dev->header.total_snaps &&
@@ -1711,7 +1742,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1711 1742
1712 ret = __rbd_init_snaps_header(rbd_dev); 1743 ret = __rbd_init_snaps_header(rbd_dev);
1713 1744
1714 up_write(&rbd_dev->header.snap_rwsem); 1745 up_write(&rbd_dev->header_rwsem);
1715 1746
1716 return ret; 1747 return ret;
1717} 1748}
@@ -1721,6 +1752,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1721 struct gendisk *disk; 1752 struct gendisk *disk;
1722 struct request_queue *q; 1753 struct request_queue *q;
1723 int rc; 1754 int rc;
1755 u64 segment_size;
1724 u64 total_size = 0; 1756 u64 total_size = 0;
1725 1757
1726 /* contact OSD, request size info about the object being mapped */ 1758 /* contact OSD, request size info about the object being mapped */
@@ -1733,7 +1765,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1733 if (rc) 1765 if (rc)
1734 return rc; 1766 return rc;
1735 1767
1736 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size); 1768 rc = rbd_header_set_snap(rbd_dev, &total_size);
1737 if (rc) 1769 if (rc)
1738 return rc; 1770 return rc;
1739 1771
@@ -1743,7 +1775,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1743 if (!disk) 1775 if (!disk)
1744 goto out; 1776 goto out;
1745 1777
1746 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d", 1778 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1747 rbd_dev->id); 1779 rbd_dev->id);
1748 disk->major = rbd_dev->major; 1780 disk->major = rbd_dev->major;
1749 disk->first_minor = 0; 1781 disk->first_minor = 0;
@@ -1756,11 +1788,15 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1756 if (!q) 1788 if (!q)
1757 goto out_disk; 1789 goto out_disk;
1758 1790
1791 /* We use the default size, but let's be explicit about it. */
1792 blk_queue_physical_block_size(q, SECTOR_SIZE);
1793
1759 /* set io sizes to object size */ 1794 /* set io sizes to object size */
1760 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL); 1795 segment_size = rbd_obj_bytes(&rbd_dev->header);
1761 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header)); 1796 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1762 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header)); 1797 blk_queue_max_segment_size(q, segment_size);
1763 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header)); 1798 blk_queue_io_min(q, segment_size);
1799 blk_queue_io_opt(q, segment_size);
1764 1800
1765 blk_queue_merge_bvec(q, rbd_merge_bvec); 1801 blk_queue_merge_bvec(q, rbd_merge_bvec);
1766 disk->queue = q; 1802 disk->queue = q;
@@ -1771,7 +1807,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
1771 rbd_dev->q = q; 1807 rbd_dev->q = q;
1772 1808
1773 /* finally, announce the disk to the world */ 1809 /* finally, announce the disk to the world */
1774 set_capacity(disk, total_size / 512ULL); 1810 set_capacity(disk, total_size / SECTOR_SIZE);
1775 add_disk(disk); 1811 add_disk(disk);
1776 1812
1777 pr_info("%s: added with size 0x%llx\n", 1813 pr_info("%s: added with size 0x%llx\n",
@@ -1788,10 +1824,15 @@ out:
1788 sysfs 1824 sysfs
1789*/ 1825*/
1790 1826
1827static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828{
1829 return container_of(dev, struct rbd_device, dev);
1830}
1831
1791static ssize_t rbd_size_show(struct device *dev, 1832static ssize_t rbd_size_show(struct device *dev,
1792 struct device_attribute *attr, char *buf) 1833 struct device_attribute *attr, char *buf)
1793{ 1834{
1794 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1835 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1795 1836
1796 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); 1837 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1797} 1838}
@@ -1799,7 +1840,7 @@ static ssize_t rbd_size_show(struct device *dev,
1799static ssize_t rbd_major_show(struct device *dev, 1840static ssize_t rbd_major_show(struct device *dev,
1800 struct device_attribute *attr, char *buf) 1841 struct device_attribute *attr, char *buf)
1801{ 1842{
1802 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1843 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1803 1844
1804 return sprintf(buf, "%d\n", rbd_dev->major); 1845 return sprintf(buf, "%d\n", rbd_dev->major);
1805} 1846}
@@ -1807,15 +1848,16 @@ static ssize_t rbd_major_show(struct device *dev,
1807static ssize_t rbd_client_id_show(struct device *dev, 1848static ssize_t rbd_client_id_show(struct device *dev,
1808 struct device_attribute *attr, char *buf) 1849 struct device_attribute *attr, char *buf)
1809{ 1850{
1810 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1851 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1811 1852
1812 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client)); 1853 return sprintf(buf, "client%lld\n",
1854 ceph_client_id(rbd_dev->rbd_client->client));
1813} 1855}
1814 1856
1815static ssize_t rbd_pool_show(struct device *dev, 1857static ssize_t rbd_pool_show(struct device *dev,
1816 struct device_attribute *attr, char *buf) 1858 struct device_attribute *attr, char *buf)
1817{ 1859{
1818 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1819 1861
1820 return sprintf(buf, "%s\n", rbd_dev->pool_name); 1862 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1821} 1863}
@@ -1823,7 +1865,7 @@ static ssize_t rbd_pool_show(struct device *dev,
1823static ssize_t rbd_name_show(struct device *dev, 1865static ssize_t rbd_name_show(struct device *dev,
1824 struct device_attribute *attr, char *buf) 1866 struct device_attribute *attr, char *buf)
1825{ 1867{
1826 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1827 1869
1828 return sprintf(buf, "%s\n", rbd_dev->obj); 1870 return sprintf(buf, "%s\n", rbd_dev->obj);
1829} 1871}
@@ -1832,7 +1874,7 @@ static ssize_t rbd_snap_show(struct device *dev,
1832 struct device_attribute *attr, 1874 struct device_attribute *attr,
1833 char *buf) 1875 char *buf)
1834{ 1876{
1835 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1836 1878
1837 return sprintf(buf, "%s\n", rbd_dev->snap_name); 1879 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1838} 1880}
@@ -1842,7 +1884,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
1842 const char *buf, 1884 const char *buf,
1843 size_t size) 1885 size_t size)
1844{ 1886{
1845 struct rbd_device *rbd_dev = dev_to_rbd(dev); 1887 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1846 int rc; 1888 int rc;
1847 int ret = size; 1889 int ret = size;
1848 1890
@@ -1907,7 +1949,7 @@ static ssize_t rbd_snap_size_show(struct device *dev,
1907{ 1949{
1908 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1950 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1909 1951
1910 return sprintf(buf, "%lld\n", (long long)snap->size); 1952 return sprintf(buf, "%zd\n", snap->size);
1911} 1953}
1912 1954
1913static ssize_t rbd_snap_id_show(struct device *dev, 1955static ssize_t rbd_snap_id_show(struct device *dev,
@@ -1916,7 +1958,7 @@ static ssize_t rbd_snap_id_show(struct device *dev,
1916{ 1958{
1917 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 1959 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1918 1960
1919 return sprintf(buf, "%lld\n", (long long)snap->id); 1961 return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1920} 1962}
1921 1963
1922static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 1964static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
@@ -2088,19 +2130,9 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2088 return 0; 2130 return 0;
2089} 2131}
2090 2132
2091
2092static void rbd_root_dev_release(struct device *dev)
2093{
2094}
2095
2096static struct device rbd_root_dev = {
2097 .init_name = "rbd",
2098 .release = rbd_root_dev_release,
2099};
2100
2101static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 2133static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2102{ 2134{
2103 int ret = -ENOMEM; 2135 int ret;
2104 struct device *dev; 2136 struct device *dev;
2105 struct rbd_snap *snap; 2137 struct rbd_snap *snap;
2106 2138
@@ -2114,7 +2146,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2114 dev_set_name(dev, "%d", rbd_dev->id); 2146 dev_set_name(dev, "%d", rbd_dev->id);
2115 ret = device_register(dev); 2147 ret = device_register(dev);
2116 if (ret < 0) 2148 if (ret < 0)
2117 goto done_free; 2149 goto out;
2118 2150
2119 list_for_each_entry(snap, &rbd_dev->snaps, node) { 2151 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2120 ret = rbd_register_snap_dev(rbd_dev, snap, 2152 ret = rbd_register_snap_dev(rbd_dev, snap,
@@ -2122,10 +2154,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2122 if (ret < 0) 2154 if (ret < 0)
2123 break; 2155 break;
2124 } 2156 }
2125 2157out:
2126 mutex_unlock(&ctl_mutex);
2127 return 0;
2128done_free:
2129 mutex_unlock(&ctl_mutex); 2158 mutex_unlock(&ctl_mutex);
2130 return ret; 2159 return ret;
2131} 2160}
@@ -2154,104 +2183,250 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154 return ret; 2183 return ret;
2155} 2184}
2156 2185
2186static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2187
2188/*
2189 * Get a unique rbd identifier for the given new rbd_dev, and add
2190 * the rbd_dev to the global list. The minimum rbd id is 1.
2191 */
2192static void rbd_id_get(struct rbd_device *rbd_dev)
2193{
2194 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2195
2196 spin_lock(&rbd_dev_list_lock);
2197 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2198 spin_unlock(&rbd_dev_list_lock);
2199}
2200
2201/*
2202 * Remove an rbd_dev from the global list, and record that its
2203 * identifier is no longer in use.
2204 */
2205static void rbd_id_put(struct rbd_device *rbd_dev)
2206{
2207 struct list_head *tmp;
2208 int rbd_id = rbd_dev->id;
2209 int max_id;
2210
2211 BUG_ON(rbd_id < 1);
2212
2213 spin_lock(&rbd_dev_list_lock);
2214 list_del_init(&rbd_dev->node);
2215
2216 /*
2217 * If the id being "put" is not the current maximum, there
2218 * is nothing special we need to do.
2219 */
2220 if (rbd_id != atomic64_read(&rbd_id_max)) {
2221 spin_unlock(&rbd_dev_list_lock);
2222 return;
2223 }
2224
2225 /*
2226 * We need to update the current maximum id. Search the
2227 * list to find out what it is. We're more likely to find
2228 * the maximum at the end, so search the list backward.
2229 */
2230 max_id = 0;
2231 list_for_each_prev(tmp, &rbd_dev_list) {
2232 struct rbd_device *rbd_dev;
2233
2234 rbd_dev = list_entry(tmp, struct rbd_device, node);
2235 if (rbd_id > max_id)
2236 max_id = rbd_id;
2237 }
2238 spin_unlock(&rbd_dev_list_lock);
2239
2240 /*
2241 * The max id could have been updated by rbd_id_get(), in
2242 * which case it now accurately reflects the new maximum.
2243 * Be careful not to overwrite the maximum value in that
2244 * case.
2245 */
2246 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2247}
2248
2249/*
2250 * Skips over white space at *buf, and updates *buf to point to the
2251 * first found non-space character (if any). Returns the length of
2252 * the token (string of non-white space characters) found. Note
2253 * that *buf must be terminated with '\0'.
2254 */
2255static inline size_t next_token(const char **buf)
2256{
2257 /*
2258 * These are the characters that produce nonzero for
2259 * isspace() in the "C" and "POSIX" locales.
2260 */
2261 const char *spaces = " \f\n\r\t\v";
2262
2263 *buf += strspn(*buf, spaces); /* Find start of token */
2264
2265 return strcspn(*buf, spaces); /* Return token length */
2266}
2267
2268/*
2269 * Finds the next token in *buf, and if the provided token buffer is
2270 * big enough, copies the found token into it. The result, if
2271 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2272 * must be terminated with '\0' on entry.
2273 *
2274 * Returns the length of the token found (not including the '\0').
2275 * Return value will be 0 if no token is found, and it will be >=
2276 * token_size if the token would not fit.
2277 *
2278 * The *buf pointer will be updated to point beyond the end of the
2279 * found token. Note that this occurs even if the token buffer is
2280 * too small to hold it.
2281 */
2282static inline size_t copy_token(const char **buf,
2283 char *token,
2284 size_t token_size)
2285{
2286 size_t len;
2287
2288 len = next_token(buf);
2289 if (len < token_size) {
2290 memcpy(token, *buf, len);
2291 *(token + len) = '\0';
2292 }
2293 *buf += len;
2294
2295 return len;
2296}
2297
2298/*
2299 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2300 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2301 * on the list of monitor addresses and other options provided via
2302 * /sys/bus/rbd/add.
2303 */
2304static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2305 const char *buf,
2306 const char **mon_addrs,
2307 size_t *mon_addrs_size,
2308 char *options,
2309 size_t options_size)
2310{
2311 size_t len;
2312
2313 /* The first four tokens are required */
2314
2315 len = next_token(&buf);
2316 if (!len)
2317 return -EINVAL;
2318 *mon_addrs_size = len + 1;
2319 *mon_addrs = buf;
2320
2321 buf += len;
2322
2323 len = copy_token(&buf, options, options_size);
2324 if (!len || len >= options_size)
2325 return -EINVAL;
2326
2327 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2328 if (!len || len >= sizeof (rbd_dev->pool_name))
2329 return -EINVAL;
2330
2331 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2332 if (!len || len >= sizeof (rbd_dev->obj))
2333 return -EINVAL;
2334
2335 /* We have the object length in hand, save it. */
2336
2337 rbd_dev->obj_len = len;
2338
2339 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2340 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2341 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2342
2343 /*
2344 * The snapshot name is optional, but it's an error if it's
2345 * too long. If no snapshot is supplied, fill in the default.
2346 */
2347 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2348 if (!len)
2349 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2350 sizeof (RBD_SNAP_HEAD_NAME));
2351 else if (len >= sizeof (rbd_dev->snap_name))
2352 return -EINVAL;
2353
2354 return 0;
2355}
2356
2157static ssize_t rbd_add(struct bus_type *bus, 2357static ssize_t rbd_add(struct bus_type *bus,
2158 const char *buf, 2358 const char *buf,
2159 size_t count) 2359 size_t count)
2160{ 2360{
2161 struct ceph_osd_client *osdc;
2162 struct rbd_device *rbd_dev; 2361 struct rbd_device *rbd_dev;
2163 ssize_t rc = -ENOMEM; 2362 const char *mon_addrs = NULL;
2164 int irc, new_id = 0; 2363 size_t mon_addrs_size = 0;
2165 struct list_head *tmp; 2364 char *options = NULL;
2166 char *mon_dev_name; 2365 struct ceph_osd_client *osdc;
2167 char *options; 2366 int rc = -ENOMEM;
2168 2367
2169 if (!try_module_get(THIS_MODULE)) 2368 if (!try_module_get(THIS_MODULE))
2170 return -ENODEV; 2369 return -ENODEV;
2171 2370
2172 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2173 if (!mon_dev_name)
2174 goto err_out_mod;
2175
2176 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2177 if (!options)
2178 goto err_mon_dev;
2179
2180 /* new rbd_device object */
2181 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); 2371 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2182 if (!rbd_dev) 2372 if (!rbd_dev)
2183 goto err_out_opt; 2373 goto err_nomem;
2374 options = kmalloc(count, GFP_KERNEL);
2375 if (!options)
2376 goto err_nomem;
2184 2377
2185 /* static rbd_device initialization */ 2378 /* static rbd_device initialization */
2186 spin_lock_init(&rbd_dev->lock); 2379 spin_lock_init(&rbd_dev->lock);
2187 INIT_LIST_HEAD(&rbd_dev->node); 2380 INIT_LIST_HEAD(&rbd_dev->node);
2188 INIT_LIST_HEAD(&rbd_dev->snaps); 2381 INIT_LIST_HEAD(&rbd_dev->snaps);
2382 init_rwsem(&rbd_dev->header_rwsem);
2189 2383
2190 init_rwsem(&rbd_dev->header.snap_rwsem); 2384 init_rwsem(&rbd_dev->header_rwsem);
2191 2385
2192 /* generate unique id: find highest unique id, add one */ 2386 /* generate unique id: find highest unique id, add one */
2193 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2387 rbd_id_get(rbd_dev);
2194
2195 list_for_each(tmp, &rbd_dev_list) {
2196 struct rbd_device *rbd_dev;
2197 2388
2198 rbd_dev = list_entry(tmp, struct rbd_device, node); 2389 /* Fill in the device name, now that we have its id. */
2199 if (rbd_dev->id >= new_id) 2390 BUILD_BUG_ON(DEV_NAME_LEN
2200 new_id = rbd_dev->id + 1; 2391 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2201 } 2392 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2202
2203 rbd_dev->id = new_id;
2204
2205 /* add to global list */
2206 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2207 2393
2208 /* parse add command */ 2394 /* parse add command */
2209 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s " 2395 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2210 "%" __stringify(RBD_MAX_OPT_LEN) "s " 2396 options, count);
2211 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s " 2397 if (rc)
2212 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s" 2398 goto err_put_id;
2213 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2214 mon_dev_name, options, rbd_dev->pool_name,
2215 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2216 rc = -EINVAL;
2217 goto err_out_slot;
2218 }
2219
2220 if (rbd_dev->snap_name[0] == 0)
2221 rbd_dev->snap_name[0] = '-';
2222
2223 rbd_dev->obj_len = strlen(rbd_dev->obj);
2224 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2225 rbd_dev->obj, RBD_SUFFIX);
2226
2227 /* initialize rest of new object */
2228 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2229 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2230 if (rc < 0)
2231 goto err_out_slot;
2232 2399
2233 mutex_unlock(&ctl_mutex); 2400 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2401 options);
2402 if (IS_ERR(rbd_dev->rbd_client)) {
2403 rc = PTR_ERR(rbd_dev->rbd_client);
2404 goto err_put_id;
2405 }
2234 2406
2235 /* pick the pool */ 2407 /* pick the pool */
2236 osdc = &rbd_dev->client->osdc; 2408 osdc = &rbd_dev->rbd_client->client->osdc;
2237 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); 2409 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2238 if (rc < 0) 2410 if (rc < 0)
2239 goto err_out_client; 2411 goto err_out_client;
2240 rbd_dev->poolid = rc; 2412 rbd_dev->poolid = rc;
2241 2413
2242 /* register our block device */ 2414 /* register our block device */
2243 irc = register_blkdev(0, rbd_dev->name); 2415 rc = register_blkdev(0, rbd_dev->name);
2244 if (irc < 0) { 2416 if (rc < 0)
2245 rc = irc;
2246 goto err_out_client; 2417 goto err_out_client;
2247 } 2418 rbd_dev->major = rc;
2248 rbd_dev->major = irc;
2249 2419
2250 rc = rbd_bus_add_dev(rbd_dev); 2420 rc = rbd_bus_add_dev(rbd_dev);
2251 if (rc) 2421 if (rc)
2252 goto err_out_blkdev; 2422 goto err_out_blkdev;
2253 2423
2254 /* set up and announce blkdev mapping */ 2424 /*
2425 * At this point cleanup in the event of an error is the job
2426 * of the sysfs code (initiated by rbd_bus_del_dev()).
2427 *
2428 * Set up and announce blkdev mapping.
2429 */
2255 rc = rbd_init_disk(rbd_dev); 2430 rc = rbd_init_disk(rbd_dev);
2256 if (rc) 2431 if (rc)
2257 goto err_out_bus; 2432 goto err_out_bus;
@@ -2263,35 +2438,26 @@ static ssize_t rbd_add(struct bus_type *bus,
2263 return count; 2438 return count;
2264 2439
2265err_out_bus: 2440err_out_bus:
2266 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2267 list_del_init(&rbd_dev->node);
2268 mutex_unlock(&ctl_mutex);
2269
2270 /* this will also clean up rest of rbd_dev stuff */ 2441 /* this will also clean up rest of rbd_dev stuff */
2271 2442
2272 rbd_bus_del_dev(rbd_dev); 2443 rbd_bus_del_dev(rbd_dev);
2273 kfree(options); 2444 kfree(options);
2274 kfree(mon_dev_name);
2275 return rc; 2445 return rc;
2276 2446
2277err_out_blkdev: 2447err_out_blkdev:
2278 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2448 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2279err_out_client: 2449err_out_client:
2280 rbd_put_client(rbd_dev); 2450 rbd_put_client(rbd_dev);
2281 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2451err_put_id:
2282err_out_slot: 2452 rbd_id_put(rbd_dev);
2283 list_del_init(&rbd_dev->node); 2453err_nomem:
2284 mutex_unlock(&ctl_mutex);
2285
2286 kfree(rbd_dev);
2287err_out_opt:
2288 kfree(options); 2454 kfree(options);
2289err_mon_dev: 2455 kfree(rbd_dev);
2290 kfree(mon_dev_name); 2456
2291err_out_mod:
2292 dout("Error adding device %s\n", buf); 2457 dout("Error adding device %s\n", buf);
2293 module_put(THIS_MODULE); 2458 module_put(THIS_MODULE);
2294 return rc; 2459
2460 return (ssize_t) rc;
2295} 2461}
2296 2462
2297static struct rbd_device *__rbd_get_dev(unsigned long id) 2463static struct rbd_device *__rbd_get_dev(unsigned long id)
@@ -2299,22 +2465,28 @@ static struct rbd_device *__rbd_get_dev(unsigned long id)
2299 struct list_head *tmp; 2465 struct list_head *tmp;
2300 struct rbd_device *rbd_dev; 2466 struct rbd_device *rbd_dev;
2301 2467
2468 spin_lock(&rbd_dev_list_lock);
2302 list_for_each(tmp, &rbd_dev_list) { 2469 list_for_each(tmp, &rbd_dev_list) {
2303 rbd_dev = list_entry(tmp, struct rbd_device, node); 2470 rbd_dev = list_entry(tmp, struct rbd_device, node);
2304 if (rbd_dev->id == id) 2471 if (rbd_dev->id == id) {
2472 spin_unlock(&rbd_dev_list_lock);
2305 return rbd_dev; 2473 return rbd_dev;
2474 }
2306 } 2475 }
2476 spin_unlock(&rbd_dev_list_lock);
2307 return NULL; 2477 return NULL;
2308} 2478}
2309 2479
2310static void rbd_dev_release(struct device *dev) 2480static void rbd_dev_release(struct device *dev)
2311{ 2481{
2312 struct rbd_device *rbd_dev = 2482 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2313 container_of(dev, struct rbd_device, dev);
2314 2483
2315 if (rbd_dev->watch_request) 2484 if (rbd_dev->watch_request) {
2316 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, 2485 struct ceph_client *client = rbd_dev->rbd_client->client;
2486
2487 ceph_osdc_unregister_linger_request(&client->osdc,
2317 rbd_dev->watch_request); 2488 rbd_dev->watch_request);
2489 }
2318 if (rbd_dev->watch_event) 2490 if (rbd_dev->watch_event)
2319 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name); 2491 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2320 2492
@@ -2323,6 +2495,9 @@ static void rbd_dev_release(struct device *dev)
2323 /* clean up and free blkdev */ 2495 /* clean up and free blkdev */
2324 rbd_free_disk(rbd_dev); 2496 rbd_free_disk(rbd_dev);
2325 unregister_blkdev(rbd_dev->major, rbd_dev->name); 2497 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2498
2499 /* done with the id, and with the rbd_dev */
2500 rbd_id_put(rbd_dev);
2326 kfree(rbd_dev); 2501 kfree(rbd_dev);
2327 2502
2328 /* release module ref */ 2503 /* release module ref */
@@ -2355,8 +2530,6 @@ static ssize_t rbd_remove(struct bus_type *bus,
2355 goto done; 2530 goto done;
2356 } 2531 }
2357 2532
2358 list_del_init(&rbd_dev->node);
2359
2360 __rbd_remove_all_snaps(rbd_dev); 2533 __rbd_remove_all_snaps(rbd_dev);
2361 rbd_bus_del_dev(rbd_dev); 2534 rbd_bus_del_dev(rbd_dev);
2362 2535
@@ -2370,7 +2543,7 @@ static ssize_t rbd_snap_add(struct device *dev,
2370 const char *buf, 2543 const char *buf,
2371 size_t count) 2544 size_t count)
2372{ 2545{
2373 struct rbd_device *rbd_dev = dev_to_rbd(dev); 2546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2374 int ret; 2547 int ret;
2375 char *name = kmalloc(count + 1, GFP_KERNEL); 2548 char *name = kmalloc(count + 1, GFP_KERNEL);
2376 if (!name) 2549 if (!name)
@@ -2406,12 +2579,6 @@ err_unlock:
2406 return ret; 2579 return ret;
2407} 2580}
2408 2581
2409static struct bus_attribute rbd_bus_attrs[] = {
2410 __ATTR(add, S_IWUSR, NULL, rbd_add),
2411 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2412 __ATTR_NULL
2413};
2414
2415/* 2582/*
2416 * create control files in sysfs 2583 * create control files in sysfs
2417 * /sys/bus/rbd/... 2584 * /sys/bus/rbd/...
@@ -2420,21 +2587,21 @@ static int rbd_sysfs_init(void)
2420{ 2587{
2421 int ret; 2588 int ret;
2422 2589
2423 rbd_bus_type.bus_attrs = rbd_bus_attrs; 2590 ret = device_register(&rbd_root_dev);
2424 2591 if (ret < 0)
2425 ret = bus_register(&rbd_bus_type);
2426 if (ret < 0)
2427 return ret; 2592 return ret;
2428 2593
2429 ret = device_register(&rbd_root_dev); 2594 ret = bus_register(&rbd_bus_type);
2595 if (ret < 0)
2596 device_unregister(&rbd_root_dev);
2430 2597
2431 return ret; 2598 return ret;
2432} 2599}
2433 2600
2434static void rbd_sysfs_cleanup(void) 2601static void rbd_sysfs_cleanup(void)
2435{ 2602{
2436 device_unregister(&rbd_root_dev);
2437 bus_unregister(&rbd_bus_type); 2603 bus_unregister(&rbd_bus_type);
2604 device_unregister(&rbd_root_dev);
2438} 2605}
2439 2606
2440int __init rbd_init(void) 2607int __init rbd_init(void)
@@ -2444,8 +2611,7 @@ int __init rbd_init(void)
2444 rc = rbd_sysfs_init(); 2611 rc = rbd_sysfs_init();
2445 if (rc) 2612 if (rc)
2446 return rc; 2613 return rc;
2447 spin_lock_init(&node_lock); 2614 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2448 pr_info("loaded " DRV_NAME_LONG "\n");
2449 return 0; 2615 return 0;
2450} 2616}
2451 2617
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index fc6c678aa2cb..950708688f17 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -41,10 +41,6 @@
41#define RBD_HEADER_SIGNATURE "RBD" 41#define RBD_HEADER_SIGNATURE "RBD"
42#define RBD_HEADER_VERSION "001.005" 42#define RBD_HEADER_VERSION "001.005"
43 43
44struct rbd_info {
45 __le64 max_id;
46} __attribute__ ((packed));
47
48struct rbd_image_snap_ondisk { 44struct rbd_image_snap_ondisk {
49 __le64 id; 45 __le64 id;
50 __le64 image_size; 46 __le64 image_size;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 2c489378b4cd..9fff9f3b17e4 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -677,18 +677,19 @@ static int fill_inode(struct inode *inode,
677 case S_IFLNK: 677 case S_IFLNK:
678 inode->i_op = &ceph_symlink_iops; 678 inode->i_op = &ceph_symlink_iops;
679 if (!ci->i_symlink) { 679 if (!ci->i_symlink) {
680 int symlen = iinfo->symlink_len; 680 u32 symlen = iinfo->symlink_len;
681 char *sym; 681 char *sym;
682 682
683 BUG_ON(symlen != inode->i_size);
684 spin_unlock(&ci->i_ceph_lock); 683 spin_unlock(&ci->i_ceph_lock);
685 684
685 err = -EINVAL;
686 if (WARN_ON(symlen != inode->i_size))
687 goto out;
688
686 err = -ENOMEM; 689 err = -ENOMEM;
687 sym = kmalloc(symlen+1, GFP_NOFS); 690 sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
688 if (!sym) 691 if (!sym)
689 goto out; 692 goto out;
690 memcpy(sym, iinfo->symlink, symlen);
691 sym[symlen] = 0;
692 693
693 spin_lock(&ci->i_ceph_lock); 694 spin_lock(&ci->i_ceph_lock);
694 if (!ci->i_symlink) 695 if (!ci->i_symlink)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 866e8d7ca37d..89971e137aab 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -402,7 +402,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
402 402
403 spin_lock_init(&s->s_gen_ttl_lock); 403 spin_lock_init(&s->s_gen_ttl_lock);
404 s->s_cap_gen = 0; 404 s->s_cap_gen = 0;
405 s->s_cap_ttl = 0; 405 s->s_cap_ttl = jiffies - 1;
406 406
407 spin_lock_init(&s->s_cap_lock); 407 spin_lock_init(&s->s_cap_lock);
408 s->s_renew_requested = 0; 408 s->s_renew_requested = 0;
@@ -1083,8 +1083,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
1083 int wake = 0; 1083 int wake = 0;
1084 1084
1085 spin_lock(&session->s_cap_lock); 1085 spin_lock(&session->s_cap_lock);
1086 was_stale = is_renew && (session->s_cap_ttl == 0 || 1086 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1087 time_after_eq(jiffies, session->s_cap_ttl));
1088 1087
1089 session->s_cap_ttl = session->s_renew_requested + 1088 session->s_cap_ttl = session->s_renew_requested +
1090 mdsc->mdsmap->m_session_timeout*HZ; 1089 mdsc->mdsmap->m_session_timeout*HZ;
@@ -2332,7 +2331,7 @@ static void handle_session(struct ceph_mds_session *session,
2332 session->s_mds); 2331 session->s_mds);
2333 spin_lock(&session->s_gen_ttl_lock); 2332 spin_lock(&session->s_gen_ttl_lock);
2334 session->s_cap_gen++; 2333 session->s_cap_gen++;
2335 session->s_cap_ttl = 0; 2334 session->s_cap_ttl = jiffies - 1;
2336 spin_unlock(&session->s_gen_ttl_lock); 2335 spin_unlock(&session->s_gen_ttl_lock);
2337 send_renew_caps(mdsc, session); 2336 send_renew_caps(mdsc, session);
2338 break; 2337 break;
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index a559c80f127a..f04c0961f993 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -331,7 +331,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
331 331
332 /* alloc new snap context */ 332 /* alloc new snap context */
333 err = -ENOMEM; 333 err = -ENOMEM;
334 if (num > ULONG_MAX / sizeof(u64) - sizeof(*snapc)) 334 if (num > (ULONG_MAX - sizeof(*snapc)) / sizeof(u64))
335 goto fail; 335 goto fail;
336 snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS); 336 snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS);
337 if (!snapc) 337 if (!snapc)
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 256f85221926..1e67dd7305a4 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -130,10 +130,12 @@ enum {
130 Opt_nodirstat, 130 Opt_nodirstat,
131 Opt_rbytes, 131 Opt_rbytes,
132 Opt_norbytes, 132 Opt_norbytes,
133 Opt_asyncreaddir,
133 Opt_noasyncreaddir, 134 Opt_noasyncreaddir,
134 Opt_dcache, 135 Opt_dcache,
135 Opt_nodcache, 136 Opt_nodcache,
136 Opt_ino32, 137 Opt_ino32,
138 Opt_noino32,
137}; 139};
138 140
139static match_table_t fsopt_tokens = { 141static match_table_t fsopt_tokens = {
@@ -153,10 +155,12 @@ static match_table_t fsopt_tokens = {
153 {Opt_nodirstat, "nodirstat"}, 155 {Opt_nodirstat, "nodirstat"},
154 {Opt_rbytes, "rbytes"}, 156 {Opt_rbytes, "rbytes"},
155 {Opt_norbytes, "norbytes"}, 157 {Opt_norbytes, "norbytes"},
158 {Opt_asyncreaddir, "asyncreaddir"},
156 {Opt_noasyncreaddir, "noasyncreaddir"}, 159 {Opt_noasyncreaddir, "noasyncreaddir"},
157 {Opt_dcache, "dcache"}, 160 {Opt_dcache, "dcache"},
158 {Opt_nodcache, "nodcache"}, 161 {Opt_nodcache, "nodcache"},
159 {Opt_ino32, "ino32"}, 162 {Opt_ino32, "ino32"},
163 {Opt_noino32, "noino32"},
160 {-1, NULL} 164 {-1, NULL}
161}; 165};
162 166
@@ -232,6 +236,9 @@ static int parse_fsopt_token(char *c, void *private)
232 case Opt_norbytes: 236 case Opt_norbytes:
233 fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES; 237 fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES;
234 break; 238 break;
239 case Opt_asyncreaddir:
240 fsopt->flags &= ~CEPH_MOUNT_OPT_NOASYNCREADDIR;
241 break;
235 case Opt_noasyncreaddir: 242 case Opt_noasyncreaddir:
236 fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR; 243 fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
237 break; 244 break;
@@ -244,6 +251,9 @@ static int parse_fsopt_token(char *c, void *private)
244 case Opt_ino32: 251 case Opt_ino32:
245 fsopt->flags |= CEPH_MOUNT_OPT_INO32; 252 fsopt->flags |= CEPH_MOUNT_OPT_INO32;
246 break; 253 break;
254 case Opt_noino32:
255 fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
256 break;
247 default: 257 default:
248 BUG_ON(token); 258 BUG_ON(token);
249 } 259 }
@@ -334,10 +344,12 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
334 *path += 2; 344 *path += 2;
335 dout("server path '%s'\n", *path); 345 dout("server path '%s'\n", *path);
336 346
337 err = ceph_parse_options(popt, options, dev_name, dev_name_end, 347 *popt = ceph_parse_options(options, dev_name, dev_name_end,
338 parse_fsopt_token, (void *)fsopt); 348 parse_fsopt_token, (void *)fsopt);
339 if (err) 349 if (IS_ERR(*popt)) {
350 err = PTR_ERR(*popt);
340 goto out; 351 goto out;
352 }
341 353
342 /* success */ 354 /* success */
343 *pfsopt = fsopt; 355 *pfsopt = fsopt;
@@ -926,6 +938,7 @@ static int __init init_ceph(void)
926 if (ret) 938 if (ret)
927 goto out; 939 goto out;
928 940
941 ceph_xattr_init();
929 ret = register_filesystem(&ceph_fs_type); 942 ret = register_filesystem(&ceph_fs_type);
930 if (ret) 943 if (ret)
931 goto out_icache; 944 goto out_icache;
@@ -935,6 +948,7 @@ static int __init init_ceph(void)
935 return 0; 948 return 0;
936 949
937out_icache: 950out_icache:
951 ceph_xattr_exit();
938 destroy_caches(); 952 destroy_caches();
939out: 953out:
940 return ret; 954 return ret;
@@ -944,6 +958,7 @@ static void __exit exit_ceph(void)
944{ 958{
945 dout("exit_ceph\n"); 959 dout("exit_ceph\n");
946 unregister_filesystem(&ceph_fs_type); 960 unregister_filesystem(&ceph_fs_type);
961 ceph_xattr_exit();
947 destroy_caches(); 962 destroy_caches();
948} 963}
949 964
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 1421f3d875a2..fc35036d258d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -367,7 +367,7 @@ static inline u32 ceph_ino_to_ino32(__u64 vino)
367 u32 ino = vino & 0xffffffff; 367 u32 ino = vino & 0xffffffff;
368 ino ^= vino >> 32; 368 ino ^= vino >> 32;
369 if (!ino) 369 if (!ino)
370 ino = 1; 370 ino = 2;
371 return ino; 371 return ino;
372} 372}
373 373
@@ -733,6 +733,8 @@ extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
733extern int ceph_removexattr(struct dentry *, const char *); 733extern int ceph_removexattr(struct dentry *, const char *);
734extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci); 734extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
735extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); 735extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
736extern void __init ceph_xattr_init(void);
737extern void ceph_xattr_exit(void);
736 738
737/* caps.c */ 739/* caps.c */
738extern const char *ceph_cap_string(int c); 740extern const char *ceph_cap_string(int c);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index a76f697303d9..35b86331d8a5 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -8,9 +8,12 @@
8#include <linux/xattr.h> 8#include <linux/xattr.h>
9#include <linux/slab.h> 9#include <linux/slab.h>
10 10
11#define XATTR_CEPH_PREFIX "ceph."
12#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1)
13
11static bool ceph_is_valid_xattr(const char *name) 14static bool ceph_is_valid_xattr(const char *name)
12{ 15{
13 return !strncmp(name, "ceph.", 5) || 16 return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
14 !strncmp(name, XATTR_SECURITY_PREFIX, 17 !strncmp(name, XATTR_SECURITY_PREFIX,
15 XATTR_SECURITY_PREFIX_LEN) || 18 XATTR_SECURITY_PREFIX_LEN) ||
16 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || 19 !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
@@ -21,79 +24,91 @@ static bool ceph_is_valid_xattr(const char *name)
21 * These define virtual xattrs exposing the recursive directory 24 * These define virtual xattrs exposing the recursive directory
22 * statistics and layout metadata. 25 * statistics and layout metadata.
23 */ 26 */
24struct ceph_vxattr_cb { 27struct ceph_vxattr {
25 bool readonly;
26 char *name; 28 char *name;
29 size_t name_size; /* strlen(name) + 1 (for '\0') */
27 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, 30 size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
28 size_t size); 31 size_t size);
32 bool readonly;
29}; 33};
30 34
31/* directories */ 35/* directories */
32 36
33static size_t ceph_vxattrcb_entries(struct ceph_inode_info *ci, char *val, 37static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
34 size_t size) 38 size_t size)
35{ 39{
36 return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs); 40 return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
37} 41}
38 42
39static size_t ceph_vxattrcb_files(struct ceph_inode_info *ci, char *val, 43static size_t ceph_vxattrcb_dir_files(struct ceph_inode_info *ci, char *val,
40 size_t size) 44 size_t size)
41{ 45{
42 return snprintf(val, size, "%lld", ci->i_files); 46 return snprintf(val, size, "%lld", ci->i_files);
43} 47}
44 48
45static size_t ceph_vxattrcb_subdirs(struct ceph_inode_info *ci, char *val, 49static size_t ceph_vxattrcb_dir_subdirs(struct ceph_inode_info *ci, char *val,
46 size_t size) 50 size_t size)
47{ 51{
48 return snprintf(val, size, "%lld", ci->i_subdirs); 52 return snprintf(val, size, "%lld", ci->i_subdirs);
49} 53}
50 54
51static size_t ceph_vxattrcb_rentries(struct ceph_inode_info *ci, char *val, 55static size_t ceph_vxattrcb_dir_rentries(struct ceph_inode_info *ci, char *val,
52 size_t size) 56 size_t size)
53{ 57{
54 return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs); 58 return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
55} 59}
56 60
57static size_t ceph_vxattrcb_rfiles(struct ceph_inode_info *ci, char *val, 61static size_t ceph_vxattrcb_dir_rfiles(struct ceph_inode_info *ci, char *val,
58 size_t size) 62 size_t size)
59{ 63{
60 return snprintf(val, size, "%lld", ci->i_rfiles); 64 return snprintf(val, size, "%lld", ci->i_rfiles);
61} 65}
62 66
63static size_t ceph_vxattrcb_rsubdirs(struct ceph_inode_info *ci, char *val, 67static size_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val,
64 size_t size) 68 size_t size)
65{ 69{
66 return snprintf(val, size, "%lld", ci->i_rsubdirs); 70 return snprintf(val, size, "%lld", ci->i_rsubdirs);
67} 71}
68 72
69static size_t ceph_vxattrcb_rbytes(struct ceph_inode_info *ci, char *val, 73static size_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val,
70 size_t size) 74 size_t size)
71{ 75{
72 return snprintf(val, size, "%lld", ci->i_rbytes); 76 return snprintf(val, size, "%lld", ci->i_rbytes);
73} 77}
74 78
75static size_t ceph_vxattrcb_rctime(struct ceph_inode_info *ci, char *val, 79static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
76 size_t size) 80 size_t size)
77{ 81{
78 return snprintf(val, size, "%ld.%ld", (long)ci->i_rctime.tv_sec, 82 return snprintf(val, size, "%ld.09%ld", (long)ci->i_rctime.tv_sec,
79 (long)ci->i_rctime.tv_nsec); 83 (long)ci->i_rctime.tv_nsec);
80} 84}
81 85
82static struct ceph_vxattr_cb ceph_dir_vxattrs[] = { 86#define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name
83 { true, "ceph.dir.entries", ceph_vxattrcb_entries}, 87
84 { true, "ceph.dir.files", ceph_vxattrcb_files}, 88#define XATTR_NAME_CEPH(_type, _name) \
85 { true, "ceph.dir.subdirs", ceph_vxattrcb_subdirs}, 89 { \
86 { true, "ceph.dir.rentries", ceph_vxattrcb_rentries}, 90 .name = CEPH_XATTR_NAME(_type, _name), \
87 { true, "ceph.dir.rfiles", ceph_vxattrcb_rfiles}, 91 .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
88 { true, "ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs}, 92 .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
89 { true, "ceph.dir.rbytes", ceph_vxattrcb_rbytes}, 93 .readonly = true, \
90 { true, "ceph.dir.rctime", ceph_vxattrcb_rctime}, 94 }
91 { true, NULL, NULL } 95
96static struct ceph_vxattr ceph_dir_vxattrs[] = {
97 XATTR_NAME_CEPH(dir, entries),
98 XATTR_NAME_CEPH(dir, files),
99 XATTR_NAME_CEPH(dir, subdirs),
100 XATTR_NAME_CEPH(dir, rentries),
101 XATTR_NAME_CEPH(dir, rfiles),
102 XATTR_NAME_CEPH(dir, rsubdirs),
103 XATTR_NAME_CEPH(dir, rbytes),
104 XATTR_NAME_CEPH(dir, rctime),
105 { 0 } /* Required table terminator */
92}; 106};
107static size_t ceph_dir_vxattrs_name_size; /* total size of all names */
93 108
94/* files */ 109/* files */
95 110
96static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, 111static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val,
97 size_t size) 112 size_t size)
98{ 113{
99 int ret; 114 int ret;
@@ -103,21 +118,32 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
103 (unsigned long long)ceph_file_layout_su(ci->i_layout), 118 (unsigned long long)ceph_file_layout_su(ci->i_layout),
104 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout), 119 (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
105 (unsigned long long)ceph_file_layout_object_size(ci->i_layout)); 120 (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
106 if (ceph_file_layout_pg_preferred(ci->i_layout)) 121
107 ret += snprintf(val + ret, size, "preferred_osd=%lld\n", 122 if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) {
123 val += ret;
124 size -= ret;
125 ret += snprintf(val, size, "preferred_osd=%lld\n",
108 (unsigned long long)ceph_file_layout_pg_preferred( 126 (unsigned long long)ceph_file_layout_pg_preferred(
109 ci->i_layout)); 127 ci->i_layout));
128 }
129
110 return ret; 130 return ret;
111} 131}
112 132
113static struct ceph_vxattr_cb ceph_file_vxattrs[] = { 133static struct ceph_vxattr ceph_file_vxattrs[] = {
114 { true, "ceph.file.layout", ceph_vxattrcb_layout}, 134 XATTR_NAME_CEPH(file, layout),
115 /* The following extended attribute name is deprecated */ 135 /* The following extended attribute name is deprecated */
116 { true, "ceph.layout", ceph_vxattrcb_layout}, 136 {
117 { true, NULL, NULL } 137 .name = XATTR_CEPH_PREFIX "layout",
138 .name_size = sizeof (XATTR_CEPH_PREFIX "layout"),
139 .getxattr_cb = ceph_vxattrcb_file_layout,
140 .readonly = true,
141 },
142 { 0 } /* Required table terminator */
118}; 143};
144static size_t ceph_file_vxattrs_name_size; /* total size of all names */
119 145
120static struct ceph_vxattr_cb *ceph_inode_vxattrs(struct inode *inode) 146static struct ceph_vxattr *ceph_inode_vxattrs(struct inode *inode)
121{ 147{
122 if (S_ISDIR(inode->i_mode)) 148 if (S_ISDIR(inode->i_mode))
123 return ceph_dir_vxattrs; 149 return ceph_dir_vxattrs;
@@ -126,14 +152,59 @@ static struct ceph_vxattr_cb *ceph_inode_vxattrs(struct inode *inode)
126 return NULL; 152 return NULL;
127} 153}
128 154
129static struct ceph_vxattr_cb *ceph_match_vxattr(struct ceph_vxattr_cb *vxattr, 155static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs)
156{
157 if (vxattrs == ceph_dir_vxattrs)
158 return ceph_dir_vxattrs_name_size;
159 if (vxattrs == ceph_file_vxattrs)
160 return ceph_file_vxattrs_name_size;
161 BUG();
162
163 return 0;
164}
165
166/*
167 * Compute the aggregate size (including terminating '\0') of all
168 * virtual extended attribute names in the given vxattr table.
169 */
170static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
171{
172 struct ceph_vxattr *vxattr;
173 size_t size = 0;
174
175 for (vxattr = vxattrs; vxattr->name; vxattr++)
176 size += vxattr->name_size;
177
178 return size;
179}
180
181/* Routines called at initialization and exit time */
182
183void __init ceph_xattr_init(void)
184{
185 ceph_dir_vxattrs_name_size = vxattrs_name_size(ceph_dir_vxattrs);
186 ceph_file_vxattrs_name_size = vxattrs_name_size(ceph_file_vxattrs);
187}
188
189void ceph_xattr_exit(void)
190{
191 ceph_dir_vxattrs_name_size = 0;
192 ceph_file_vxattrs_name_size = 0;
193}
194
195static struct ceph_vxattr *ceph_match_vxattr(struct inode *inode,
130 const char *name) 196 const char *name)
131{ 197{
132 do { 198 struct ceph_vxattr *vxattr = ceph_inode_vxattrs(inode);
133 if (strcmp(vxattr->name, name) == 0) 199
134 return vxattr; 200 if (vxattr) {
135 vxattr++; 201 while (vxattr->name) {
136 } while (vxattr->name); 202 if (!strcmp(vxattr->name, name))
203 return vxattr;
204 vxattr++;
205 }
206 }
207
137 return NULL; 208 return NULL;
138} 209}
139 210
@@ -502,17 +573,15 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
502{ 573{
503 struct inode *inode = dentry->d_inode; 574 struct inode *inode = dentry->d_inode;
504 struct ceph_inode_info *ci = ceph_inode(inode); 575 struct ceph_inode_info *ci = ceph_inode(inode);
505 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
506 int err; 576 int err;
507 struct ceph_inode_xattr *xattr; 577 struct ceph_inode_xattr *xattr;
508 struct ceph_vxattr_cb *vxattr = NULL; 578 struct ceph_vxattr *vxattr = NULL;
509 579
510 if (!ceph_is_valid_xattr(name)) 580 if (!ceph_is_valid_xattr(name))
511 return -ENODATA; 581 return -ENODATA;
512 582
513 /* let's see if a virtual xattr was requested */ 583 /* let's see if a virtual xattr was requested */
514 if (vxattrs) 584 vxattr = ceph_match_vxattr(inode, name);
515 vxattr = ceph_match_vxattr(vxattrs, name);
516 585
517 spin_lock(&ci->i_ceph_lock); 586 spin_lock(&ci->i_ceph_lock);
518 dout("getxattr %p ver=%lld index_ver=%lld\n", inode, 587 dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
@@ -568,7 +637,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
568{ 637{
569 struct inode *inode = dentry->d_inode; 638 struct inode *inode = dentry->d_inode;
570 struct ceph_inode_info *ci = ceph_inode(inode); 639 struct ceph_inode_info *ci = ceph_inode(inode);
571 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode); 640 struct ceph_vxattr *vxattrs = ceph_inode_vxattrs(inode);
572 u32 vir_namelen = 0; 641 u32 vir_namelen = 0;
573 u32 namelen; 642 u32 namelen;
574 int err; 643 int err;
@@ -596,11 +665,12 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
596 goto out; 665 goto out;
597 666
598list_xattr: 667list_xattr:
599 vir_namelen = 0; 668 /*
600 /* include virtual dir xattrs */ 669 * Start with virtual dir xattr names (if any) (including
601 if (vxattrs) 670 * terminating '\0' characters for each).
602 for (i = 0; vxattrs[i].name; i++) 671 */
603 vir_namelen += strlen(vxattrs[i].name) + 1; 672 vir_namelen = ceph_vxattrs_name_size(vxattrs);
673
604 /* adding 1 byte per each variable due to the null termination */ 674 /* adding 1 byte per each variable due to the null termination */
605 namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count; 675 namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count;
606 err = -ERANGE; 676 err = -ERANGE;
@@ -698,17 +768,17 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
698 const void *value, size_t size, int flags) 768 const void *value, size_t size, int flags)
699{ 769{
700 struct inode *inode = dentry->d_inode; 770 struct inode *inode = dentry->d_inode;
771 struct ceph_vxattr *vxattr;
701 struct ceph_inode_info *ci = ceph_inode(inode); 772 struct ceph_inode_info *ci = ceph_inode(inode);
702 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode); 773 int issued;
703 int err; 774 int err;
775 int dirty;
704 int name_len = strlen(name); 776 int name_len = strlen(name);
705 int val_len = size; 777 int val_len = size;
706 char *newname = NULL; 778 char *newname = NULL;
707 char *newval = NULL; 779 char *newval = NULL;
708 struct ceph_inode_xattr *xattr = NULL; 780 struct ceph_inode_xattr *xattr = NULL;
709 int issued;
710 int required_blob_size; 781 int required_blob_size;
711 int dirty;
712 782
713 if (ceph_snap(inode) != CEPH_NOSNAP) 783 if (ceph_snap(inode) != CEPH_NOSNAP)
714 return -EROFS; 784 return -EROFS;
@@ -716,12 +786,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
716 if (!ceph_is_valid_xattr(name)) 786 if (!ceph_is_valid_xattr(name))
717 return -EOPNOTSUPP; 787 return -EOPNOTSUPP;
718 788
719 if (vxattrs) { 789 vxattr = ceph_match_vxattr(inode, name);
720 struct ceph_vxattr_cb *vxattr = 790 if (vxattr && vxattr->readonly)
721 ceph_match_vxattr(vxattrs, name); 791 return -EOPNOTSUPP;
722 if (vxattr && vxattr->readonly)
723 return -EOPNOTSUPP;
724 }
725 792
726 /* preallocate memory for xattr name, value, index node */ 793 /* preallocate memory for xattr name, value, index node */
727 err = -ENOMEM; 794 err = -ENOMEM;
@@ -730,11 +797,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
730 goto out; 797 goto out;
731 798
732 if (val_len) { 799 if (val_len) {
733 newval = kmalloc(val_len + 1, GFP_NOFS); 800 newval = kmemdup(value, val_len, GFP_NOFS);
734 if (!newval) 801 if (!newval)
735 goto out; 802 goto out;
736 memcpy(newval, value, val_len);
737 newval[val_len] = '\0';
738 } 803 }
739 804
740 xattr = kmalloc(sizeof(struct ceph_inode_xattr), GFP_NOFS); 805 xattr = kmalloc(sizeof(struct ceph_inode_xattr), GFP_NOFS);
@@ -744,6 +809,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
744 spin_lock(&ci->i_ceph_lock); 809 spin_lock(&ci->i_ceph_lock);
745retry: 810retry:
746 issued = __ceph_caps_issued(ci, NULL); 811 issued = __ceph_caps_issued(ci, NULL);
812 dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
747 if (!(issued & CEPH_CAP_XATTR_EXCL)) 813 if (!(issued & CEPH_CAP_XATTR_EXCL))
748 goto do_sync; 814 goto do_sync;
749 __build_xattrs(inode); 815 __build_xattrs(inode);
@@ -752,7 +818,7 @@ retry:
752 818
753 if (!ci->i_xattrs.prealloc_blob || 819 if (!ci->i_xattrs.prealloc_blob ||
754 required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) { 820 required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
755 struct ceph_buffer *blob = NULL; 821 struct ceph_buffer *blob;
756 822
757 spin_unlock(&ci->i_ceph_lock); 823 spin_unlock(&ci->i_ceph_lock);
758 dout(" preaallocating new blob size=%d\n", required_blob_size); 824 dout(" preaallocating new blob size=%d\n", required_blob_size);
@@ -766,12 +832,13 @@ retry:
766 goto retry; 832 goto retry;
767 } 833 }
768 834
769 dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
770 err = __set_xattr(ci, newname, name_len, newval, 835 err = __set_xattr(ci, newname, name_len, newval,
771 val_len, 1, 1, 1, &xattr); 836 val_len, 1, 1, 1, &xattr);
837
772 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); 838 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
773 ci->i_xattrs.dirty = true; 839 ci->i_xattrs.dirty = true;
774 inode->i_ctime = CURRENT_TIME; 840 inode->i_ctime = CURRENT_TIME;
841
775 spin_unlock(&ci->i_ceph_lock); 842 spin_unlock(&ci->i_ceph_lock);
776 if (dirty) 843 if (dirty)
777 __mark_inode_dirty(inode, dirty); 844 __mark_inode_dirty(inode, dirty);
@@ -816,8 +883,8 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
816int ceph_removexattr(struct dentry *dentry, const char *name) 883int ceph_removexattr(struct dentry *dentry, const char *name)
817{ 884{
818 struct inode *inode = dentry->d_inode; 885 struct inode *inode = dentry->d_inode;
886 struct ceph_vxattr *vxattr;
819 struct ceph_inode_info *ci = ceph_inode(inode); 887 struct ceph_inode_info *ci = ceph_inode(inode);
820 struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
821 int issued; 888 int issued;
822 int err; 889 int err;
823 int required_blob_size; 890 int required_blob_size;
@@ -829,22 +896,19 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
829 if (!ceph_is_valid_xattr(name)) 896 if (!ceph_is_valid_xattr(name))
830 return -EOPNOTSUPP; 897 return -EOPNOTSUPP;
831 898
832 if (vxattrs) { 899 vxattr = ceph_match_vxattr(inode, name);
833 struct ceph_vxattr_cb *vxattr = 900 if (vxattr && vxattr->readonly)
834 ceph_match_vxattr(vxattrs, name); 901 return -EOPNOTSUPP;
835 if (vxattr && vxattr->readonly)
836 return -EOPNOTSUPP;
837 }
838 902
839 err = -ENOMEM; 903 err = -ENOMEM;
840 spin_lock(&ci->i_ceph_lock); 904 spin_lock(&ci->i_ceph_lock);
841 __build_xattrs(inode);
842retry: 905retry:
843 issued = __ceph_caps_issued(ci, NULL); 906 issued = __ceph_caps_issued(ci, NULL);
844 dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); 907 dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
845 908
846 if (!(issued & CEPH_CAP_XATTR_EXCL)) 909 if (!(issued & CEPH_CAP_XATTR_EXCL))
847 goto do_sync; 910 goto do_sync;
911 __build_xattrs(inode);
848 912
849 required_blob_size = __get_required_blob_size(ci, 0, 0); 913 required_blob_size = __get_required_blob_size(ci, 0, 0);
850 914
@@ -865,10 +929,10 @@ retry:
865 } 929 }
866 930
867 err = __remove_xattr_by_name(ceph_inode(inode), name); 931 err = __remove_xattr_by_name(ceph_inode(inode), name);
932
868 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); 933 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
869 ci->i_xattrs.dirty = true; 934 ci->i_xattrs.dirty = true;
870 inode->i_ctime = CURRENT_TIME; 935 inode->i_ctime = CURRENT_TIME;
871
872 spin_unlock(&ci->i_ceph_lock); 936 spin_unlock(&ci->i_ceph_lock);
873 if (dirty) 937 if (dirty)
874 __mark_inode_dirty(inode, dirty); 938 __mark_inode_dirty(inode, dirty);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index e8cf0ccd1a8d..e71d683982a6 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -208,7 +208,7 @@ extern struct kmem_cache *ceph_cap_cachep;
208extern struct kmem_cache *ceph_dentry_cachep; 208extern struct kmem_cache *ceph_dentry_cachep;
209extern struct kmem_cache *ceph_file_cachep; 209extern struct kmem_cache *ceph_file_cachep;
210 210
211extern int ceph_parse_options(struct ceph_options **popt, char *options, 211extern struct ceph_options *ceph_parse_options(char *options,
212 const char *dev_name, const char *dev_name_end, 212 const char *dev_name, const char *dev_name_end,
213 int (*parse_extra_token)(char *c, void *private), 213 int (*parse_extra_token)(char *c, void *private),
214 void *private); 214 void *private);
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index ffbeb2c217b4..3bff047f6b0f 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -14,8 +14,6 @@
14struct ceph_msg; 14struct ceph_msg;
15struct ceph_connection; 15struct ceph_connection;
16 16
17extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */
18
19/* 17/*
20 * Ceph defines these callbacks for handling connection events. 18 * Ceph defines these callbacks for handling connection events.
21 */ 19 */
@@ -54,7 +52,6 @@ struct ceph_connection_operations {
54struct ceph_messenger { 52struct ceph_messenger {
55 struct ceph_entity_inst inst; /* my name+address */ 53 struct ceph_entity_inst inst; /* my name+address */
56 struct ceph_entity_addr my_enc_addr; 54 struct ceph_entity_addr my_enc_addr;
57 struct page *zero_page; /* used in certain error cases */
58 55
59 bool nocrc; 56 bool nocrc;
60 57
@@ -101,7 +98,7 @@ struct ceph_msg {
101struct ceph_msg_pos { 98struct ceph_msg_pos {
102 int page, page_pos; /* which page; offset in page */ 99 int page, page_pos; /* which page; offset in page */
103 int data_pos; /* offset in data payload */ 100 int data_pos; /* offset in data payload */
104 int did_page_crc; /* true if we've calculated crc for current page */ 101 bool did_page_crc; /* true if we've calculated crc for current page */
105}; 102};
106 103
107/* ceph connection fault delay defaults, for exponential backoff */ 104/* ceph connection fault delay defaults, for exponential backoff */
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 761ad9d6cc3b..cc913193d992 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -201,7 +201,9 @@ enum {
201 Opt_ip, 201 Opt_ip,
202 Opt_last_string, 202 Opt_last_string,
203 /* string args above */ 203 /* string args above */
204 Opt_share,
204 Opt_noshare, 205 Opt_noshare,
206 Opt_crc,
205 Opt_nocrc, 207 Opt_nocrc,
206}; 208};
207 209
@@ -217,7 +219,9 @@ static match_table_t opt_tokens = {
217 {Opt_key, "key=%s"}, 219 {Opt_key, "key=%s"},
218 {Opt_ip, "ip=%s"}, 220 {Opt_ip, "ip=%s"},
219 /* string args above */ 221 /* string args above */
222 {Opt_share, "share"},
220 {Opt_noshare, "noshare"}, 223 {Opt_noshare, "noshare"},
224 {Opt_crc, "crc"},
221 {Opt_nocrc, "nocrc"}, 225 {Opt_nocrc, "nocrc"},
222 {-1, NULL} 226 {-1, NULL}
223}; 227};
@@ -277,10 +281,11 @@ out:
277 return err; 281 return err;
278} 282}
279 283
280int ceph_parse_options(struct ceph_options **popt, char *options, 284struct ceph_options *
281 const char *dev_name, const char *dev_name_end, 285ceph_parse_options(char *options, const char *dev_name,
282 int (*parse_extra_token)(char *c, void *private), 286 const char *dev_name_end,
283 void *private) 287 int (*parse_extra_token)(char *c, void *private),
288 void *private)
284{ 289{
285 struct ceph_options *opt; 290 struct ceph_options *opt;
286 const char *c; 291 const char *c;
@@ -289,7 +294,7 @@ int ceph_parse_options(struct ceph_options **popt, char *options,
289 294
290 opt = kzalloc(sizeof(*opt), GFP_KERNEL); 295 opt = kzalloc(sizeof(*opt), GFP_KERNEL);
291 if (!opt) 296 if (!opt)
292 return err; 297 return ERR_PTR(-ENOMEM);
293 opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr), 298 opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
294 GFP_KERNEL); 299 GFP_KERNEL);
295 if (!opt->mon_addr) 300 if (!opt->mon_addr)
@@ -398,10 +403,16 @@ int ceph_parse_options(struct ceph_options **popt, char *options,
398 opt->mount_timeout = intval; 403 opt->mount_timeout = intval;
399 break; 404 break;
400 405
406 case Opt_share:
407 opt->flags &= ~CEPH_OPT_NOSHARE;
408 break;
401 case Opt_noshare: 409 case Opt_noshare:
402 opt->flags |= CEPH_OPT_NOSHARE; 410 opt->flags |= CEPH_OPT_NOSHARE;
403 break; 411 break;
404 412
413 case Opt_crc:
414 opt->flags &= ~CEPH_OPT_NOCRC;
415 break;
405 case Opt_nocrc: 416 case Opt_nocrc:
406 opt->flags |= CEPH_OPT_NOCRC; 417 opt->flags |= CEPH_OPT_NOCRC;
407 break; 418 break;
@@ -412,12 +423,11 @@ int ceph_parse_options(struct ceph_options **popt, char *options,
412 } 423 }
413 424
414 /* success */ 425 /* success */
415 *popt = opt; 426 return opt;
416 return 0;
417 427
418out: 428out:
419 ceph_destroy_options(opt); 429 ceph_destroy_options(opt);
420 return err; 430 return ERR_PTR(err);
421} 431}
422EXPORT_SYMBOL(ceph_parse_options); 432EXPORT_SYMBOL(ceph_parse_options);
423 433
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index ad5b70801f37..f0993af2ae4d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -38,48 +38,54 @@ static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
38static struct lock_class_key socket_class; 38static struct lock_class_key socket_class;
39#endif 39#endif
40 40
41/*
42 * When skipping (ignoring) a block of input we read it into a "skip
43 * buffer," which is this many bytes in size.
44 */
45#define SKIP_BUF_SIZE 1024
41 46
42static void queue_con(struct ceph_connection *con); 47static void queue_con(struct ceph_connection *con);
43static void con_work(struct work_struct *); 48static void con_work(struct work_struct *);
44static void ceph_fault(struct ceph_connection *con); 49static void ceph_fault(struct ceph_connection *con);
45 50
46/* 51/*
47 * nicely render a sockaddr as a string. 52 * Nicely render a sockaddr as a string. An array of formatted
53 * strings is used, to approximate reentrancy.
48 */ 54 */
49#define MAX_ADDR_STR 20 55#define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */
50#define MAX_ADDR_STR_LEN 60 56#define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
51static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; 57#define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
52static DEFINE_SPINLOCK(addr_str_lock); 58#define MAX_ADDR_STR_LEN 64 /* 54 is enough */
53static int last_addr_str; 59
60static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
61static atomic_t addr_str_seq = ATOMIC_INIT(0);
62
63static struct page *zero_page; /* used in certain error cases */
54 64
55const char *ceph_pr_addr(const struct sockaddr_storage *ss) 65const char *ceph_pr_addr(const struct sockaddr_storage *ss)
56{ 66{
57 int i; 67 int i;
58 char *s; 68 char *s;
59 struct sockaddr_in *in4 = (void *)ss; 69 struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
60 struct sockaddr_in6 *in6 = (void *)ss; 70 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
61 71
62 spin_lock(&addr_str_lock); 72 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
63 i = last_addr_str++;
64 if (last_addr_str == MAX_ADDR_STR)
65 last_addr_str = 0;
66 spin_unlock(&addr_str_lock);
67 s = addr_str[i]; 73 s = addr_str[i];
68 74
69 switch (ss->ss_family) { 75 switch (ss->ss_family) {
70 case AF_INET: 76 case AF_INET:
71 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, 77 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
72 (unsigned int)ntohs(in4->sin_port)); 78 ntohs(in4->sin_port));
73 break; 79 break;
74 80
75 case AF_INET6: 81 case AF_INET6:
76 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, 82 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
77 (unsigned int)ntohs(in6->sin6_port)); 83 ntohs(in6->sin6_port));
78 break; 84 break;
79 85
80 default: 86 default:
81 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)", 87 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
82 (int)ss->ss_family); 88 ss->ss_family);
83 } 89 }
84 90
85 return s; 91 return s;
@@ -95,22 +101,43 @@ static void encode_my_addr(struct ceph_messenger *msgr)
95/* 101/*
96 * work queue for all reading and writing to/from the socket. 102 * work queue for all reading and writing to/from the socket.
97 */ 103 */
98struct workqueue_struct *ceph_msgr_wq; 104static struct workqueue_struct *ceph_msgr_wq;
105
106void _ceph_msgr_exit(void)
107{
108 if (ceph_msgr_wq) {
109 destroy_workqueue(ceph_msgr_wq);
110 ceph_msgr_wq = NULL;
111 }
112
113 BUG_ON(zero_page == NULL);
114 kunmap(zero_page);
115 page_cache_release(zero_page);
116 zero_page = NULL;
117}
99 118
100int ceph_msgr_init(void) 119int ceph_msgr_init(void)
101{ 120{
121 BUG_ON(zero_page != NULL);
122 zero_page = ZERO_PAGE(0);
123 page_cache_get(zero_page);
124
102 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 125 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
103 if (!ceph_msgr_wq) { 126 if (ceph_msgr_wq)
104 pr_err("msgr_init failed to create workqueue\n"); 127 return 0;
105 return -ENOMEM; 128
106 } 129 pr_err("msgr_init failed to create workqueue\n");
107 return 0; 130 _ceph_msgr_exit();
131
132 return -ENOMEM;
108} 133}
109EXPORT_SYMBOL(ceph_msgr_init); 134EXPORT_SYMBOL(ceph_msgr_init);
110 135
111void ceph_msgr_exit(void) 136void ceph_msgr_exit(void)
112{ 137{
113 destroy_workqueue(ceph_msgr_wq); 138 BUG_ON(ceph_msgr_wq == NULL);
139
140 _ceph_msgr_exit();
114} 141}
115EXPORT_SYMBOL(ceph_msgr_exit); 142EXPORT_SYMBOL(ceph_msgr_exit);
116 143
@@ -128,8 +155,8 @@ EXPORT_SYMBOL(ceph_msgr_flush);
128/* data available on socket, or listen socket received a connect */ 155/* data available on socket, or listen socket received a connect */
129static void ceph_data_ready(struct sock *sk, int count_unused) 156static void ceph_data_ready(struct sock *sk, int count_unused)
130{ 157{
131 struct ceph_connection *con = 158 struct ceph_connection *con = sk->sk_user_data;
132 (struct ceph_connection *)sk->sk_user_data; 159
133 if (sk->sk_state != TCP_CLOSE_WAIT) { 160 if (sk->sk_state != TCP_CLOSE_WAIT) {
134 dout("ceph_data_ready on %p state = %lu, queueing work\n", 161 dout("ceph_data_ready on %p state = %lu, queueing work\n",
135 con, con->state); 162 con, con->state);
@@ -140,26 +167,30 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
140/* socket has buffer space for writing */ 167/* socket has buffer space for writing */
141static void ceph_write_space(struct sock *sk) 168static void ceph_write_space(struct sock *sk)
142{ 169{
143 struct ceph_connection *con = 170 struct ceph_connection *con = sk->sk_user_data;
144 (struct ceph_connection *)sk->sk_user_data;
145 171
146 /* only queue to workqueue if there is data we want to write. */ 172 /* only queue to workqueue if there is data we want to write,
173 * and there is sufficient space in the socket buffer to accept
174 * more data. clear SOCK_NOSPACE so that ceph_write_space()
175 * doesn't get called again until try_write() fills the socket
176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
177 * and net/core/stream.c:sk_stream_write_space().
178 */
147 if (test_bit(WRITE_PENDING, &con->state)) { 179 if (test_bit(WRITE_PENDING, &con->state)) {
148 dout("ceph_write_space %p queueing write work\n", con); 180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
149 queue_con(con); 181 dout("ceph_write_space %p queueing write work\n", con);
182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
183 queue_con(con);
184 }
150 } else { 185 } else {
151 dout("ceph_write_space %p nothing to write\n", con); 186 dout("ceph_write_space %p nothing to write\n", con);
152 } 187 }
153
154 /* since we have our own write_space, clear the SOCK_NOSPACE flag */
155 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
156} 188}
157 189
158/* socket's state has changed */ 190/* socket's state has changed */
159static void ceph_state_change(struct sock *sk) 191static void ceph_state_change(struct sock *sk)
160{ 192{
161 struct ceph_connection *con = 193 struct ceph_connection *con = sk->sk_user_data;
162 (struct ceph_connection *)sk->sk_user_data;
163 194
164 dout("ceph_state_change %p state = %lu sk_state = %u\n", 195 dout("ceph_state_change %p state = %lu sk_state = %u\n",
165 con, con->state, sk->sk_state); 196 con, con->state, sk->sk_state);
@@ -184,6 +215,8 @@ static void ceph_state_change(struct sock *sk)
184 dout("ceph_state_change TCP_ESTABLISHED\n"); 215 dout("ceph_state_change TCP_ESTABLISHED\n");
185 queue_con(con); 216 queue_con(con);
186 break; 217 break;
218 default: /* Everything else is uninteresting */
219 break;
187 } 220 }
188} 221}
189 222
@@ -194,7 +227,7 @@ static void set_sock_callbacks(struct socket *sock,
194 struct ceph_connection *con) 227 struct ceph_connection *con)
195{ 228{
196 struct sock *sk = sock->sk; 229 struct sock *sk = sock->sk;
197 sk->sk_user_data = (void *)con; 230 sk->sk_user_data = con;
198 sk->sk_data_ready = ceph_data_ready; 231 sk->sk_data_ready = ceph_data_ready;
199 sk->sk_write_space = ceph_write_space; 232 sk->sk_write_space = ceph_write_space;
200 sk->sk_state_change = ceph_state_change; 233 sk->sk_state_change = ceph_state_change;
@@ -208,7 +241,7 @@ static void set_sock_callbacks(struct socket *sock,
208/* 241/*
209 * initiate connection to a remote socket. 242 * initiate connection to a remote socket.
210 */ 243 */
211static struct socket *ceph_tcp_connect(struct ceph_connection *con) 244static int ceph_tcp_connect(struct ceph_connection *con)
212{ 245{
213 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 246 struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
214 struct socket *sock; 247 struct socket *sock;
@@ -218,8 +251,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
218 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 251 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
219 IPPROTO_TCP, &sock); 252 IPPROTO_TCP, &sock);
220 if (ret) 253 if (ret)
221 return ERR_PTR(ret); 254 return ret;
222 con->sock = sock;
223 sock->sk->sk_allocation = GFP_NOFS; 255 sock->sk->sk_allocation = GFP_NOFS;
224 256
225#ifdef CONFIG_LOCKDEP 257#ifdef CONFIG_LOCKDEP
@@ -236,19 +268,17 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
236 dout("connect %s EINPROGRESS sk_state = %u\n", 268 dout("connect %s EINPROGRESS sk_state = %u\n",
237 ceph_pr_addr(&con->peer_addr.in_addr), 269 ceph_pr_addr(&con->peer_addr.in_addr),
238 sock->sk->sk_state); 270 sock->sk->sk_state);
239 ret = 0; 271 } else if (ret < 0) {
240 }
241 if (ret < 0) {
242 pr_err("connect %s error %d\n", 272 pr_err("connect %s error %d\n",
243 ceph_pr_addr(&con->peer_addr.in_addr), ret); 273 ceph_pr_addr(&con->peer_addr.in_addr), ret);
244 sock_release(sock); 274 sock_release(sock);
245 con->sock = NULL;
246 con->error_msg = "connect error"; 275 con->error_msg = "connect error";
276
277 return ret;
247 } 278 }
279 con->sock = sock;
248 280
249 if (ret < 0) 281 return 0;
250 return ERR_PTR(ret);
251 return sock;
252} 282}
253 283
254static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 284static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
@@ -284,6 +314,19 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
284 return r; 314 return r;
285} 315}
286 316
317static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
318 int offset, size_t size, int more)
319{
320 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
321 int ret;
322
323 ret = kernel_sendpage(sock, page, offset, size, flags);
324 if (ret == -EAGAIN)
325 ret = 0;
326
327 return ret;
328}
329
287 330
288/* 331/*
289 * Shutdown/close the socket for the given connection. 332 * Shutdown/close the socket for the given connection.
@@ -391,22 +434,23 @@ bool ceph_con_opened(struct ceph_connection *con)
391 */ 434 */
392struct ceph_connection *ceph_con_get(struct ceph_connection *con) 435struct ceph_connection *ceph_con_get(struct ceph_connection *con)
393{ 436{
394 dout("con_get %p nref = %d -> %d\n", con, 437 int nref = __atomic_add_unless(&con->nref, 1, 0);
395 atomic_read(&con->nref), atomic_read(&con->nref) + 1); 438
396 if (atomic_inc_not_zero(&con->nref)) 439 dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
397 return con; 440
398 return NULL; 441 return nref ? con : NULL;
399} 442}
400 443
401void ceph_con_put(struct ceph_connection *con) 444void ceph_con_put(struct ceph_connection *con)
402{ 445{
403 dout("con_put %p nref = %d -> %d\n", con, 446 int nref = atomic_dec_return(&con->nref);
404 atomic_read(&con->nref), atomic_read(&con->nref) - 1); 447
405 BUG_ON(atomic_read(&con->nref) == 0); 448 BUG_ON(nref < 0);
406 if (atomic_dec_and_test(&con->nref)) { 449 if (nref == 0) {
407 BUG_ON(con->sock); 450 BUG_ON(con->sock);
408 kfree(con); 451 kfree(con);
409 } 452 }
453 dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
410} 454}
411 455
412/* 456/*
@@ -442,14 +486,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
442 return ret; 486 return ret;
443} 487}
444 488
489static void ceph_con_out_kvec_reset(struct ceph_connection *con)
490{
491 con->out_kvec_left = 0;
492 con->out_kvec_bytes = 0;
493 con->out_kvec_cur = &con->out_kvec[0];
494}
495
496static void ceph_con_out_kvec_add(struct ceph_connection *con,
497 size_t size, void *data)
498{
499 int index;
500
501 index = con->out_kvec_left;
502 BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
503
504 con->out_kvec[index].iov_len = size;
505 con->out_kvec[index].iov_base = data;
506 con->out_kvec_left++;
507 con->out_kvec_bytes += size;
508}
445 509
446/* 510/*
447 * Prepare footer for currently outgoing message, and finish things 511 * Prepare footer for currently outgoing message, and finish things
448 * off. Assumes out_kvec* are already valid.. we just add on to the end. 512 * off. Assumes out_kvec* are already valid.. we just add on to the end.
449 */ 513 */
450static void prepare_write_message_footer(struct ceph_connection *con, int v) 514static void prepare_write_message_footer(struct ceph_connection *con)
451{ 515{
452 struct ceph_msg *m = con->out_msg; 516 struct ceph_msg *m = con->out_msg;
517 int v = con->out_kvec_left;
453 518
454 dout("prepare_write_message_footer %p\n", con); 519 dout("prepare_write_message_footer %p\n", con);
455 con->out_kvec_is_msg = true; 520 con->out_kvec_is_msg = true;
@@ -467,9 +532,9 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
467static void prepare_write_message(struct ceph_connection *con) 532static void prepare_write_message(struct ceph_connection *con)
468{ 533{
469 struct ceph_msg *m; 534 struct ceph_msg *m;
470 int v = 0; 535 u32 crc;
471 536
472 con->out_kvec_bytes = 0; 537 ceph_con_out_kvec_reset(con);
473 con->out_kvec_is_msg = true; 538 con->out_kvec_is_msg = true;
474 con->out_msg_done = false; 539 con->out_msg_done = false;
475 540
@@ -477,16 +542,13 @@ static void prepare_write_message(struct ceph_connection *con)
477 * TCP packet that's a good thing. */ 542 * TCP packet that's a good thing. */
478 if (con->in_seq > con->in_seq_acked) { 543 if (con->in_seq > con->in_seq_acked) {
479 con->in_seq_acked = con->in_seq; 544 con->in_seq_acked = con->in_seq;
480 con->out_kvec[v].iov_base = &tag_ack; 545 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
481 con->out_kvec[v++].iov_len = 1;
482 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 546 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
483 con->out_kvec[v].iov_base = &con->out_temp_ack; 547 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
484 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 548 &con->out_temp_ack);
485 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
486 } 549 }
487 550
488 m = list_first_entry(&con->out_queue, 551 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
489 struct ceph_msg, list_head);
490 con->out_msg = m; 552 con->out_msg = m;
491 553
492 /* put message on sent list */ 554 /* put message on sent list */
@@ -510,30 +572,26 @@ static void prepare_write_message(struct ceph_connection *con)
510 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 572 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
511 573
512 /* tag + hdr + front + middle */ 574 /* tag + hdr + front + middle */
513 con->out_kvec[v].iov_base = &tag_msg; 575 ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
514 con->out_kvec[v++].iov_len = 1; 576 ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
515 con->out_kvec[v].iov_base = &m->hdr; 577 ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
516 con->out_kvec[v++].iov_len = sizeof(m->hdr); 578
517 con->out_kvec[v++] = m->front;
518 if (m->middle) 579 if (m->middle)
519 con->out_kvec[v++] = m->middle->vec; 580 ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
520 con->out_kvec_left = v; 581 m->middle->vec.iov_base);
521 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
522 (m->middle ? m->middle->vec.iov_len : 0);
523 con->out_kvec_cur = con->out_kvec;
524 582
525 /* fill in crc (except data pages), footer */ 583 /* fill in crc (except data pages), footer */
526 con->out_msg->hdr.crc = 584 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
527 cpu_to_le32(crc32c(0, (void *)&m->hdr, 585 con->out_msg->hdr.crc = cpu_to_le32(crc);
528 sizeof(m->hdr) - sizeof(m->hdr.crc)));
529 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 586 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
530 con->out_msg->footer.front_crc = 587
531 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); 588 crc = crc32c(0, m->front.iov_base, m->front.iov_len);
532 if (m->middle) 589 con->out_msg->footer.front_crc = cpu_to_le32(crc);
533 con->out_msg->footer.middle_crc = 590 if (m->middle) {
534 cpu_to_le32(crc32c(0, m->middle->vec.iov_base, 591 crc = crc32c(0, m->middle->vec.iov_base,
535 m->middle->vec.iov_len)); 592 m->middle->vec.iov_len);
536 else 593 con->out_msg->footer.middle_crc = cpu_to_le32(crc);
594 } else
537 con->out_msg->footer.middle_crc = 0; 595 con->out_msg->footer.middle_crc = 0;
538 con->out_msg->footer.data_crc = 0; 596 con->out_msg->footer.data_crc = 0;
539 dout("prepare_write_message front_crc %u data_crc %u\n", 597 dout("prepare_write_message front_crc %u data_crc %u\n",
@@ -549,11 +607,11 @@ static void prepare_write_message(struct ceph_connection *con)
549 else 607 else
550 con->out_msg_pos.page_pos = 0; 608 con->out_msg_pos.page_pos = 0;
551 con->out_msg_pos.data_pos = 0; 609 con->out_msg_pos.data_pos = 0;
552 con->out_msg_pos.did_page_crc = 0; 610 con->out_msg_pos.did_page_crc = false;
553 con->out_more = 1; /* data + footer will follow */ 611 con->out_more = 1; /* data + footer will follow */
554 } else { 612 } else {
555 /* no, queue up footer too and be done */ 613 /* no, queue up footer too and be done */
556 prepare_write_message_footer(con, v); 614 prepare_write_message_footer(con);
557 } 615 }
558 616
559 set_bit(WRITE_PENDING, &con->state); 617 set_bit(WRITE_PENDING, &con->state);
@@ -568,14 +626,14 @@ static void prepare_write_ack(struct ceph_connection *con)
568 con->in_seq_acked, con->in_seq); 626 con->in_seq_acked, con->in_seq);
569 con->in_seq_acked = con->in_seq; 627 con->in_seq_acked = con->in_seq;
570 628
571 con->out_kvec[0].iov_base = &tag_ack; 629 ceph_con_out_kvec_reset(con);
572 con->out_kvec[0].iov_len = 1; 630
631 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
632
573 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 633 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
574 con->out_kvec[1].iov_base = &con->out_temp_ack; 634 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
575 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 635 &con->out_temp_ack);
576 con->out_kvec_left = 2; 636
577 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
578 con->out_kvec_cur = con->out_kvec;
579 con->out_more = 1; /* more will follow.. eventually.. */ 637 con->out_more = 1; /* more will follow.. eventually.. */
580 set_bit(WRITE_PENDING, &con->state); 638 set_bit(WRITE_PENDING, &con->state);
581} 639}
@@ -586,11 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con)
586static void prepare_write_keepalive(struct ceph_connection *con) 644static void prepare_write_keepalive(struct ceph_connection *con)
587{ 645{
588 dout("prepare_write_keepalive %p\n", con); 646 dout("prepare_write_keepalive %p\n", con);
589 con->out_kvec[0].iov_base = &tag_keepalive; 647 ceph_con_out_kvec_reset(con);
590 con->out_kvec[0].iov_len = 1; 648 ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
591 con->out_kvec_left = 1;
592 con->out_kvec_bytes = 1;
593 con->out_kvec_cur = con->out_kvec;
594 set_bit(WRITE_PENDING, &con->state); 649 set_bit(WRITE_PENDING, &con->state);
595} 650}
596 651
@@ -619,12 +674,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
619 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 674 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
620 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 675 con->out_connect.authorizer_len = cpu_to_le32(auth_len);
621 676
622 if (auth_len) { 677 if (auth_len)
623 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 678 ceph_con_out_kvec_add(con, auth_len, auth_buf);
624 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 679
625 con->out_kvec_left++;
626 con->out_kvec_bytes += auth_len;
627 }
628 return 0; 680 return 0;
629} 681}
630 682
@@ -634,22 +686,18 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
634static void prepare_write_banner(struct ceph_messenger *msgr, 686static void prepare_write_banner(struct ceph_messenger *msgr,
635 struct ceph_connection *con) 687 struct ceph_connection *con)
636{ 688{
637 int len = strlen(CEPH_BANNER); 689 ceph_con_out_kvec_reset(con);
690 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
691 ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
692 &msgr->my_enc_addr);
638 693
639 con->out_kvec[0].iov_base = CEPH_BANNER;
640 con->out_kvec[0].iov_len = len;
641 con->out_kvec[1].iov_base = &msgr->my_enc_addr;
642 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
643 con->out_kvec_left = 2;
644 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
645 con->out_kvec_cur = con->out_kvec;
646 con->out_more = 0; 694 con->out_more = 0;
647 set_bit(WRITE_PENDING, &con->state); 695 set_bit(WRITE_PENDING, &con->state);
648} 696}
649 697
650static int prepare_write_connect(struct ceph_messenger *msgr, 698static int prepare_write_connect(struct ceph_messenger *msgr,
651 struct ceph_connection *con, 699 struct ceph_connection *con,
652 int after_banner) 700 int include_banner)
653{ 701{
654 unsigned global_seq = get_global_seq(con->msgr, 0); 702 unsigned global_seq = get_global_seq(con->msgr, 0);
655 int proto; 703 int proto;
@@ -678,22 +726,18 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
678 con->out_connect.protocol_version = cpu_to_le32(proto); 726 con->out_connect.protocol_version = cpu_to_le32(proto);
679 con->out_connect.flags = 0; 727 con->out_connect.flags = 0;
680 728
681 if (!after_banner) { 729 if (include_banner)
682 con->out_kvec_left = 0; 730 prepare_write_banner(msgr, con);
683 con->out_kvec_bytes = 0; 731 else
684 } 732 ceph_con_out_kvec_reset(con);
685 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; 733 ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
686 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); 734
687 con->out_kvec_left++;
688 con->out_kvec_bytes += sizeof(con->out_connect);
689 con->out_kvec_cur = con->out_kvec;
690 con->out_more = 0; 735 con->out_more = 0;
691 set_bit(WRITE_PENDING, &con->state); 736 set_bit(WRITE_PENDING, &con->state);
692 737
693 return prepare_connect_authorizer(con); 738 return prepare_connect_authorizer(con);
694} 739}
695 740
696
697/* 741/*
698 * write as much of pending kvecs to the socket as we can. 742 * write as much of pending kvecs to the socket as we can.
699 * 1 -> done 743 * 1 -> done
@@ -714,17 +758,18 @@ static int write_partial_kvec(struct ceph_connection *con)
714 con->out_kvec_bytes -= ret; 758 con->out_kvec_bytes -= ret;
715 if (con->out_kvec_bytes == 0) 759 if (con->out_kvec_bytes == 0)
716 break; /* done */ 760 break; /* done */
717 while (ret > 0) { 761
718 if (ret >= con->out_kvec_cur->iov_len) { 762 /* account for full iov entries consumed */
719 ret -= con->out_kvec_cur->iov_len; 763 while (ret >= con->out_kvec_cur->iov_len) {
720 con->out_kvec_cur++; 764 BUG_ON(!con->out_kvec_left);
721 con->out_kvec_left--; 765 ret -= con->out_kvec_cur->iov_len;
722 } else { 766 con->out_kvec_cur++;
723 con->out_kvec_cur->iov_len -= ret; 767 con->out_kvec_left--;
724 con->out_kvec_cur->iov_base += ret; 768 }
725 ret = 0; 769 /* and for a partially-consumed entry */
726 break; 770 if (ret) {
727 } 771 con->out_kvec_cur->iov_len -= ret;
772 con->out_kvec_cur->iov_base += ret;
728 } 773 }
729 } 774 }
730 con->out_kvec_left = 0; 775 con->out_kvec_left = 0;
@@ -773,7 +818,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
773 struct ceph_msg *msg = con->out_msg; 818 struct ceph_msg *msg = con->out_msg;
774 unsigned data_len = le32_to_cpu(msg->hdr.data_len); 819 unsigned data_len = le32_to_cpu(msg->hdr.data_len);
775 size_t len; 820 size_t len;
776 int crc = con->msgr->nocrc; 821 bool do_datacrc = !con->msgr->nocrc;
777 int ret; 822 int ret;
778 int total_max_write; 823 int total_max_write;
779 int in_trail = 0; 824 int in_trail = 0;
@@ -790,9 +835,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
790 835
791 while (data_len > con->out_msg_pos.data_pos) { 836 while (data_len > con->out_msg_pos.data_pos) {
792 struct page *page = NULL; 837 struct page *page = NULL;
793 void *kaddr = NULL;
794 int max_write = PAGE_SIZE; 838 int max_write = PAGE_SIZE;
795 int page_shift = 0; 839 int bio_offset = 0;
796 840
797 total_max_write = data_len - trail_len - 841 total_max_write = data_len - trail_len -
798 con->out_msg_pos.data_pos; 842 con->out_msg_pos.data_pos;
@@ -811,58 +855,47 @@ static int write_partial_msg_pages(struct ceph_connection *con)
811 855
812 page = list_first_entry(&msg->trail->head, 856 page = list_first_entry(&msg->trail->head,
813 struct page, lru); 857 struct page, lru);
814 if (crc)
815 kaddr = kmap(page);
816 max_write = PAGE_SIZE; 858 max_write = PAGE_SIZE;
817 } else if (msg->pages) { 859 } else if (msg->pages) {
818 page = msg->pages[con->out_msg_pos.page]; 860 page = msg->pages[con->out_msg_pos.page];
819 if (crc)
820 kaddr = kmap(page);
821 } else if (msg->pagelist) { 861 } else if (msg->pagelist) {
822 page = list_first_entry(&msg->pagelist->head, 862 page = list_first_entry(&msg->pagelist->head,
823 struct page, lru); 863 struct page, lru);
824 if (crc)
825 kaddr = kmap(page);
826#ifdef CONFIG_BLOCK 864#ifdef CONFIG_BLOCK
827 } else if (msg->bio) { 865 } else if (msg->bio) {
828 struct bio_vec *bv; 866 struct bio_vec *bv;
829 867
830 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 868 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
831 page = bv->bv_page; 869 page = bv->bv_page;
832 page_shift = bv->bv_offset; 870 bio_offset = bv->bv_offset;
833 if (crc)
834 kaddr = kmap(page) + page_shift;
835 max_write = bv->bv_len; 871 max_write = bv->bv_len;
836#endif 872#endif
837 } else { 873 } else {
838 page = con->msgr->zero_page; 874 page = zero_page;
839 if (crc)
840 kaddr = page_address(con->msgr->zero_page);
841 } 875 }
842 len = min_t(int, max_write - con->out_msg_pos.page_pos, 876 len = min_t(int, max_write - con->out_msg_pos.page_pos,
843 total_max_write); 877 total_max_write);
844 878
845 if (crc && !con->out_msg_pos.did_page_crc) { 879 if (do_datacrc && !con->out_msg_pos.did_page_crc) {
846 void *base = kaddr + con->out_msg_pos.page_pos; 880 void *base;
881 u32 crc;
847 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 882 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
883 char *kaddr;
848 884
885 kaddr = kmap(page);
849 BUG_ON(kaddr == NULL); 886 BUG_ON(kaddr == NULL);
850 con->out_msg->footer.data_crc = 887 base = kaddr + con->out_msg_pos.page_pos + bio_offset;
851 cpu_to_le32(crc32c(tmpcrc, base, len)); 888 crc = crc32c(tmpcrc, base, len);
852 con->out_msg_pos.did_page_crc = 1; 889 con->out_msg->footer.data_crc = cpu_to_le32(crc);
890 con->out_msg_pos.did_page_crc = true;
853 } 891 }
854 ret = kernel_sendpage(con->sock, page, 892 ret = ceph_tcp_sendpage(con->sock, page,
855 con->out_msg_pos.page_pos + page_shift, 893 con->out_msg_pos.page_pos + bio_offset,
856 len, 894 len, 1);
857 MSG_DONTWAIT | MSG_NOSIGNAL | 895
858 MSG_MORE); 896 if (do_datacrc)
859
860 if (crc &&
861 (msg->pages || msg->pagelist || msg->bio || in_trail))
862 kunmap(page); 897 kunmap(page);
863 898
864 if (ret == -EAGAIN)
865 ret = 0;
866 if (ret <= 0) 899 if (ret <= 0)
867 goto out; 900 goto out;
868 901
@@ -871,7 +904,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
871 if (ret == len) { 904 if (ret == len) {
872 con->out_msg_pos.page_pos = 0; 905 con->out_msg_pos.page_pos = 0;
873 con->out_msg_pos.page++; 906 con->out_msg_pos.page++;
874 con->out_msg_pos.did_page_crc = 0; 907 con->out_msg_pos.did_page_crc = false;
875 if (in_trail) 908 if (in_trail)
876 list_move_tail(&page->lru, 909 list_move_tail(&page->lru,
877 &msg->trail->head); 910 &msg->trail->head);
@@ -888,12 +921,10 @@ static int write_partial_msg_pages(struct ceph_connection *con)
888 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 921 dout("write_partial_msg_pages %p msg %p done\n", con, msg);
889 922
890 /* prepare and queue up footer, too */ 923 /* prepare and queue up footer, too */
891 if (!crc) 924 if (!do_datacrc)
892 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 925 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
893 con->out_kvec_bytes = 0; 926 ceph_con_out_kvec_reset(con);
894 con->out_kvec_left = 0; 927 prepare_write_message_footer(con);
895 con->out_kvec_cur = con->out_kvec;
896 prepare_write_message_footer(con, 0);
897 ret = 1; 928 ret = 1;
898out: 929out:
899 return ret; 930 return ret;
@@ -907,12 +938,9 @@ static int write_partial_skip(struct ceph_connection *con)
907 int ret; 938 int ret;
908 939
909 while (con->out_skip > 0) { 940 while (con->out_skip > 0) {
910 struct kvec iov = { 941 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
911 .iov_base = page_address(con->msgr->zero_page),
912 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
913 };
914 942
915 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); 943 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
916 if (ret <= 0) 944 if (ret <= 0)
917 goto out; 945 goto out;
918 con->out_skip -= ret; 946 con->out_skip -= ret;
@@ -1085,8 +1113,8 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
1085static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, 1113static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
1086 char delim, const char **ipend) 1114 char delim, const char **ipend)
1087{ 1115{
1088 struct sockaddr_in *in4 = (void *)ss; 1116 struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
1089 struct sockaddr_in6 *in6 = (void *)ss; 1117 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
1090 1118
1091 memset(ss, 0, sizeof(*ss)); 1119 memset(ss, 0, sizeof(*ss));
1092 1120
@@ -1512,10 +1540,9 @@ static int read_partial_message_section(struct ceph_connection *con,
1512 if (ret <= 0) 1540 if (ret <= 0)
1513 return ret; 1541 return ret;
1514 section->iov_len += ret; 1542 section->iov_len += ret;
1515 if (section->iov_len == sec_len)
1516 *crc = crc32c(0, section->iov_base,
1517 section->iov_len);
1518 } 1543 }
1544 if (section->iov_len == sec_len)
1545 *crc = crc32c(0, section->iov_base, section->iov_len);
1519 1546
1520 return 1; 1547 return 1;
1521} 1548}
@@ -1527,7 +1554,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1527 1554
1528static int read_partial_message_pages(struct ceph_connection *con, 1555static int read_partial_message_pages(struct ceph_connection *con,
1529 struct page **pages, 1556 struct page **pages,
1530 unsigned data_len, int datacrc) 1557 unsigned data_len, bool do_datacrc)
1531{ 1558{
1532 void *p; 1559 void *p;
1533 int ret; 1560 int ret;
@@ -1540,7 +1567,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
1540 p = kmap(pages[con->in_msg_pos.page]); 1567 p = kmap(pages[con->in_msg_pos.page]);
1541 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1568 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1542 left); 1569 left);
1543 if (ret > 0 && datacrc) 1570 if (ret > 0 && do_datacrc)
1544 con->in_data_crc = 1571 con->in_data_crc =
1545 crc32c(con->in_data_crc, 1572 crc32c(con->in_data_crc,
1546 p + con->in_msg_pos.page_pos, ret); 1573 p + con->in_msg_pos.page_pos, ret);
@@ -1560,7 +1587,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
1560#ifdef CONFIG_BLOCK 1587#ifdef CONFIG_BLOCK
1561static int read_partial_message_bio(struct ceph_connection *con, 1588static int read_partial_message_bio(struct ceph_connection *con,
1562 struct bio **bio_iter, int *bio_seg, 1589 struct bio **bio_iter, int *bio_seg,
1563 unsigned data_len, int datacrc) 1590 unsigned data_len, bool do_datacrc)
1564{ 1591{
1565 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); 1592 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1566 void *p; 1593 void *p;
@@ -1576,7 +1603,7 @@ static int read_partial_message_bio(struct ceph_connection *con,
1576 1603
1577 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1604 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1578 left); 1605 left);
1579 if (ret > 0 && datacrc) 1606 if (ret > 0 && do_datacrc)
1580 con->in_data_crc = 1607 con->in_data_crc =
1581 crc32c(con->in_data_crc, 1608 crc32c(con->in_data_crc,
1582 p + con->in_msg_pos.page_pos, ret); 1609 p + con->in_msg_pos.page_pos, ret);
@@ -1603,9 +1630,10 @@ static int read_partial_message(struct ceph_connection *con)
1603 int ret; 1630 int ret;
1604 int to, left; 1631 int to, left;
1605 unsigned front_len, middle_len, data_len; 1632 unsigned front_len, middle_len, data_len;
1606 int datacrc = con->msgr->nocrc; 1633 bool do_datacrc = !con->msgr->nocrc;
1607 int skip; 1634 int skip;
1608 u64 seq; 1635 u64 seq;
1636 u32 crc;
1609 1637
1610 dout("read_partial_message con %p msg %p\n", con, m); 1638 dout("read_partial_message con %p msg %p\n", con, m);
1611 1639
@@ -1618,17 +1646,16 @@ static int read_partial_message(struct ceph_connection *con)
1618 if (ret <= 0) 1646 if (ret <= 0)
1619 return ret; 1647 return ret;
1620 con->in_base_pos += ret; 1648 con->in_base_pos += ret;
1621 if (con->in_base_pos == sizeof(con->in_hdr)) {
1622 u32 crc = crc32c(0, (void *)&con->in_hdr,
1623 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
1624 if (crc != le32_to_cpu(con->in_hdr.crc)) {
1625 pr_err("read_partial_message bad hdr "
1626 " crc %u != expected %u\n",
1627 crc, con->in_hdr.crc);
1628 return -EBADMSG;
1629 }
1630 }
1631 } 1649 }
1650
1651 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
1652 if (cpu_to_le32(crc) != con->in_hdr.crc) {
1653 pr_err("read_partial_message bad hdr "
1654 " crc %u != expected %u\n",
1655 crc, con->in_hdr.crc);
1656 return -EBADMSG;
1657 }
1658
1632 front_len = le32_to_cpu(con->in_hdr.front_len); 1659 front_len = le32_to_cpu(con->in_hdr.front_len);
1633 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1660 if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1634 return -EIO; 1661 return -EIO;
@@ -1714,7 +1741,7 @@ static int read_partial_message(struct ceph_connection *con)
1714 while (con->in_msg_pos.data_pos < data_len) { 1741 while (con->in_msg_pos.data_pos < data_len) {
1715 if (m->pages) { 1742 if (m->pages) {
1716 ret = read_partial_message_pages(con, m->pages, 1743 ret = read_partial_message_pages(con, m->pages,
1717 data_len, datacrc); 1744 data_len, do_datacrc);
1718 if (ret <= 0) 1745 if (ret <= 0)
1719 return ret; 1746 return ret;
1720#ifdef CONFIG_BLOCK 1747#ifdef CONFIG_BLOCK
@@ -1722,7 +1749,7 @@ static int read_partial_message(struct ceph_connection *con)
1722 1749
1723 ret = read_partial_message_bio(con, 1750 ret = read_partial_message_bio(con,
1724 &m->bio_iter, &m->bio_seg, 1751 &m->bio_iter, &m->bio_seg,
1725 data_len, datacrc); 1752 data_len, do_datacrc);
1726 if (ret <= 0) 1753 if (ret <= 0)
1727 return ret; 1754 return ret;
1728#endif 1755#endif
@@ -1757,7 +1784,7 @@ static int read_partial_message(struct ceph_connection *con)
1757 m, con->in_middle_crc, m->footer.middle_crc); 1784 m, con->in_middle_crc, m->footer.middle_crc);
1758 return -EBADMSG; 1785 return -EBADMSG;
1759 } 1786 }
1760 if (datacrc && 1787 if (do_datacrc &&
1761 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1788 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1762 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1789 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1763 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1790 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
@@ -1819,7 +1846,6 @@ more:
1819 1846
1820 /* open the socket first? */ 1847 /* open the socket first? */
1821 if (con->sock == NULL) { 1848 if (con->sock == NULL) {
1822 prepare_write_banner(msgr, con);
1823 prepare_write_connect(msgr, con, 1); 1849 prepare_write_connect(msgr, con, 1);
1824 prepare_read_banner(con); 1850 prepare_read_banner(con);
1825 set_bit(CONNECTING, &con->state); 1851 set_bit(CONNECTING, &con->state);
@@ -1829,11 +1855,9 @@ more:
1829 con->in_tag = CEPH_MSGR_TAG_READY; 1855 con->in_tag = CEPH_MSGR_TAG_READY;
1830 dout("try_write initiating connect on %p new state %lu\n", 1856 dout("try_write initiating connect on %p new state %lu\n",
1831 con, con->state); 1857 con, con->state);
1832 con->sock = ceph_tcp_connect(con); 1858 ret = ceph_tcp_connect(con);
1833 if (IS_ERR(con->sock)) { 1859 if (ret < 0) {
1834 con->sock = NULL;
1835 con->error_msg = "connect error"; 1860 con->error_msg = "connect error";
1836 ret = -1;
1837 goto out; 1861 goto out;
1838 } 1862 }
1839 } 1863 }
@@ -1953,8 +1977,9 @@ more:
1953 * 1977 *
1954 * FIXME: there must be a better way to do this! 1978 * FIXME: there must be a better way to do this!
1955 */ 1979 */
1956 static char buf[1024]; 1980 static char buf[SKIP_BUF_SIZE];
1957 int skip = min(1024, -con->in_base_pos); 1981 int skip = min((int) sizeof (buf), -con->in_base_pos);
1982
1958 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 1983 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
1959 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 1984 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
1960 if (ret <= 0) 1985 if (ret <= 0)
@@ -2216,15 +2241,6 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
2216 2241
2217 spin_lock_init(&msgr->global_seq_lock); 2242 spin_lock_init(&msgr->global_seq_lock);
2218 2243
2219 /* the zero page is needed if a request is "canceled" while the message
2220 * is being written over the socket */
2221 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
2222 if (!msgr->zero_page) {
2223 kfree(msgr);
2224 return ERR_PTR(-ENOMEM);
2225 }
2226 kmap(msgr->zero_page);
2227
2228 if (myaddr) 2244 if (myaddr)
2229 msgr->inst.addr = *myaddr; 2245 msgr->inst.addr = *myaddr;
2230 2246
@@ -2241,8 +2257,6 @@ EXPORT_SYMBOL(ceph_messenger_create);
2241void ceph_messenger_destroy(struct ceph_messenger *msgr) 2257void ceph_messenger_destroy(struct ceph_messenger *msgr)
2242{ 2258{
2243 dout("destroy %p\n", msgr); 2259 dout("destroy %p\n", msgr);
2244 kunmap(msgr->zero_page);
2245 __free_page(msgr->zero_page);
2246 kfree(msgr); 2260 kfree(msgr);
2247 dout("destroyed messenger %p\n", msgr); 2261 dout("destroyed messenger %p\n", msgr);
2248} 2262}
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index fd863fe76934..29ad46ec9dcf 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -283,7 +283,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
283 ceph_decode_32_safe(p, end, yes, bad); 283 ceph_decode_32_safe(p, end, yes, bad);
284#if BITS_PER_LONG == 32 284#if BITS_PER_LONG == 32
285 err = -EINVAL; 285 err = -EINVAL;
286 if (yes > ULONG_MAX / sizeof(struct crush_rule_step)) 286 if (yes > (ULONG_MAX - sizeof(*r))
287 / sizeof(struct crush_rule_step))
287 goto bad; 288 goto bad;
288#endif 289#endif
289 r = c->rules[i] = kmalloc(sizeof(*r) + 290 r = c->rules[i] = kmalloc(sizeof(*r) +