aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-10-31 17:42:31 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2018-10-31 17:42:31 -0400
commit31990f0f5366a8f66688edae8688723b22034108 (patch)
tree07078a732a5f02d2330f3cb873286f9ac53ea969
parenta9ac6cc47bbb0fdd042012044f737ba13da10cb4 (diff)
parentea4cdc548e5e74a529cdd1aea885d74b4aa8f1b3 (diff)
Merge tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: "The highlights are: - a series that fixes some old memory allocation issues in libceph (myself). We no longer allocate memory in places where allocation failures cannot be handled and BUG when the allocation fails. - support for copy_file_range() syscall (Luis Henriques). If size and alignment conditions are met, it leverages RADOS copy-from operation. Otherwise, a local copy is performed. - a patch that reduces memory requirement of ceph_sync_read() from the size of the entire read to the size of one object (Zheng Yan). - fallocate() syscall is now restricted to FALLOC_FL_PUNCH_HOLE (Luis Henriques)" * tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client: (25 commits) ceph: new mount option to disable usage of copy-from op ceph: support copy_file_range file operation libceph: support the RADOS copy-from operation ceph: add non-blocking parameter to ceph_try_get_caps() libceph: check reply num_data_items in setup_request_data() libceph: preallocate message data items libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls libceph: introduce alloc_watch_request() libceph: assign cookies in linger_submit() libceph: enable fallback to ceph_msg_new() in ceph_msgpool_get() ceph: num_ops is off by one in ceph_aio_retry_work() libceph: no need to call osd_req_opcode_valid() in osd_req_encode_op() ceph: set timeout conditionally in __cap_delay_requeue libceph: don't consume a ref on pagelist in ceph_msg_data_add_pagelist() libceph: introduce ceph_pagelist_alloc() libceph: osd_req_op_cls_init() doesn't need to take opcode libceph: bump CEPH_MSG_MAX_DATA_LEN ceph: only allow punch hole mode in fallocate ceph: refactor ceph_sync_read() ceph: check if LOOKUPNAME request was aborted when filling trace ...
-rw-r--r--Documentation/filesystems/ceph.txt5
-rw-r--r--drivers/block/rbd.c28
-rw-r--r--fs/ceph/acl.c13
-rw-r--r--fs/ceph/addr.c2
-rw-r--r--fs/ceph/caps.c21
-rw-r--r--fs/ceph/file.c573
-rw-r--r--fs/ceph/inode.c13
-rw-r--r--fs/ceph/mds_client.c9
-rw-r--r--fs/ceph/super.c13
-rw-r--r--fs/ceph/super.h3
-rw-r--r--fs/ceph/xattr.c3
-rw-r--r--include/linux/ceph/libceph.h8
-rw-r--r--include/linux/ceph/messenger.h24
-rw-r--r--include/linux/ceph/msgpool.h11
-rw-r--r--include/linux/ceph/osd_client.h22
-rw-r--r--include/linux/ceph/pagelist.h11
-rw-r--r--include/linux/ceph/rados.h28
-rw-r--r--net/ceph/messenger.c107
-rw-r--r--net/ceph/msgpool.c27
-rw-r--r--net/ceph/osd_client.c363
-rw-r--r--net/ceph/pagelist.c20
21 files changed, 900 insertions, 404 deletions
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 8bf62240e10d..1177052701e1 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -151,6 +151,11 @@ Mount Options
151 Report overall filesystem usage in statfs instead of using the root 151 Report overall filesystem usage in statfs instead of using the root
152 directory quota. 152 directory quota.
153 153
154 nocopyfrom
155 Don't use the RADOS 'copy-from' operation to perform remote object
156 copies. Currently, it's only used in copy_file_range, which will revert
157 to the default VFS implementation if this option is used.
158
154More Information 159More Information
155================ 160================
156 161
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 73ed5f3a862d..8e5140bbf241 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1500 rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) 1500 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1501 goto err_req; 1501 goto err_req;
1502 1502
1503 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1504 goto err_req;
1505
1506 return req; 1503 return req;
1507 1504
1508err_req: 1505err_req:
@@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1945 } 1942 }
1946 if (ret) 1943 if (ret)
1947 return ret; 1944 return ret;
1945
1946 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
1947 if (ret)
1948 return ret;
1948 } 1949 }
1949 1950
1950 return 0; 1951 return 0;
@@ -2374,8 +2375,7 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2374 if (!obj_req->osd_req) 2375 if (!obj_req->osd_req)
2375 return -ENOMEM; 2376 return -ENOMEM;
2376 2377
2377 ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", 2378 ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
2378 "copyup");
2379 if (ret) 2379 if (ret)
2380 return ret; 2380 return ret;
2381 2381
@@ -2405,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2405 rbd_assert(0); 2405 rbd_assert(0);
2406 } 2406 }
2407 2407
2408 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
2409 if (ret)
2410 return ret;
2411
2408 rbd_obj_request_submit(obj_req); 2412 rbd_obj_request_submit(obj_req);
2409 return 0; 2413 return 0;
2410} 2414}
@@ -3784,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3784 ceph_oloc_copy(&req->r_base_oloc, oloc); 3788 ceph_oloc_copy(&req->r_base_oloc, oloc);
3785 req->r_flags = CEPH_OSD_FLAG_READ; 3789 req->r_flags = CEPH_OSD_FLAG_READ;
3786 3790
3787 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3788 if (ret)
3789 goto out_req;
3790
3791 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 3791 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3792 if (IS_ERR(pages)) { 3792 if (IS_ERR(pages)) {
3793 ret = PTR_ERR(pages); 3793 ret = PTR_ERR(pages);
@@ -3798,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3798 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, 3798 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3799 true); 3799 true);
3800 3800
3801 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3802 if (ret)
3803 goto out_req;
3804
3801 ceph_osdc_start_request(osdc, req, false); 3805 ceph_osdc_start_request(osdc, req, false);
3802 ret = ceph_osdc_wait_request(osdc, req); 3806 ret = ceph_osdc_wait_request(osdc, req);
3803 if (ret >= 0) 3807 if (ret >= 0)
@@ -6067,7 +6071,7 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus,
6067 * create control files in sysfs 6071 * create control files in sysfs
6068 * /sys/bus/rbd/... 6072 * /sys/bus/rbd/...
6069 */ 6073 */
6070static int rbd_sysfs_init(void) 6074static int __init rbd_sysfs_init(void)
6071{ 6075{
6072 int ret; 6076 int ret;
6073 6077
@@ -6082,13 +6086,13 @@ static int rbd_sysfs_init(void)
6082 return ret; 6086 return ret;
6083} 6087}
6084 6088
6085static void rbd_sysfs_cleanup(void) 6089static void __exit rbd_sysfs_cleanup(void)
6086{ 6090{
6087 bus_unregister(&rbd_bus_type); 6091 bus_unregister(&rbd_bus_type);
6088 device_unregister(&rbd_root_dev); 6092 device_unregister(&rbd_root_dev);
6089} 6093}
6090 6094
6091static int rbd_slab_init(void) 6095static int __init rbd_slab_init(void)
6092{ 6096{
6093 rbd_assert(!rbd_img_request_cache); 6097 rbd_assert(!rbd_img_request_cache);
6094 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); 6098 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 027408d55aee..5f0103f40079 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -104,6 +104,11 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
104 struct timespec64 old_ctime = inode->i_ctime; 104 struct timespec64 old_ctime = inode->i_ctime;
105 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; 105 umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
106 106
107 if (ceph_snap(inode) != CEPH_NOSNAP) {
108 ret = -EROFS;
109 goto out;
110 }
111
107 switch (type) { 112 switch (type) {
108 case ACL_TYPE_ACCESS: 113 case ACL_TYPE_ACCESS:
109 name = XATTR_NAME_POSIX_ACL_ACCESS; 114 name = XATTR_NAME_POSIX_ACL_ACCESS;
@@ -138,11 +143,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
138 goto out_free; 143 goto out_free;
139 } 144 }
140 145
141 if (ceph_snap(inode) != CEPH_NOSNAP) {
142 ret = -EROFS;
143 goto out_free;
144 }
145
146 if (new_mode != old_mode) { 146 if (new_mode != old_mode) {
147 newattrs.ia_ctime = current_time(inode); 147 newattrs.ia_ctime = current_time(inode);
148 newattrs.ia_mode = new_mode; 148 newattrs.ia_mode = new_mode;
@@ -206,10 +206,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
206 tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL); 206 tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
207 if (!tmp_buf) 207 if (!tmp_buf)
208 goto out_err; 208 goto out_err;
209 pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL); 209 pagelist = ceph_pagelist_alloc(GFP_KERNEL);
210 if (!pagelist) 210 if (!pagelist)
211 goto out_err; 211 goto out_err;
212 ceph_pagelist_init(pagelist);
213 212
214 err = ceph_pagelist_reserve(pagelist, PAGE_SIZE); 213 err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
215 if (err) 214 if (err)
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 9c332a6f6667..8eade7a993c1 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -322,7 +322,7 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
322 /* caller of readpages does not hold buffer and read caps 322 /* caller of readpages does not hold buffer and read caps
323 * (fadvise, madvise and readahead cases) */ 323 * (fadvise, madvise and readahead cases) */
324 int want = CEPH_CAP_FILE_CACHE; 324 int want = CEPH_CAP_FILE_CACHE;
325 ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got); 325 ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got);
326 if (ret < 0) { 326 if (ret < 0) {
327 dout("start_read %p, error getting cap\n", inode); 327 dout("start_read %p, error getting cap\n", inode);
328 } else if (!(got & want)) { 328 } else if (!(got & want)) {
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index dd7dfdd2ba13..f3496db4bb3e 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -519,9 +519,9 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
519 * -> we take mdsc->cap_delay_lock 519 * -> we take mdsc->cap_delay_lock
520 */ 520 */
521static void __cap_delay_requeue(struct ceph_mds_client *mdsc, 521static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
522 struct ceph_inode_info *ci) 522 struct ceph_inode_info *ci,
523 bool set_timeout)
523{ 524{
524 __cap_set_timeouts(mdsc, ci);
525 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, 525 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
526 ci->i_ceph_flags, ci->i_hold_caps_max); 526 ci->i_ceph_flags, ci->i_hold_caps_max);
527 if (!mdsc->stopping) { 527 if (!mdsc->stopping) {
@@ -531,6 +531,8 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
531 goto no_change; 531 goto no_change;
532 list_del_init(&ci->i_cap_delay_list); 532 list_del_init(&ci->i_cap_delay_list);
533 } 533 }
534 if (set_timeout)
535 __cap_set_timeouts(mdsc, ci);
534 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 536 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
535no_change: 537no_change:
536 spin_unlock(&mdsc->cap_delay_lock); 538 spin_unlock(&mdsc->cap_delay_lock);
@@ -720,7 +722,7 @@ void ceph_add_cap(struct inode *inode,
720 dout(" issued %s, mds wanted %s, actual %s, queueing\n", 722 dout(" issued %s, mds wanted %s, actual %s, queueing\n",
721 ceph_cap_string(issued), ceph_cap_string(wanted), 723 ceph_cap_string(issued), ceph_cap_string(wanted),
722 ceph_cap_string(actual_wanted)); 724 ceph_cap_string(actual_wanted));
723 __cap_delay_requeue(mdsc, ci); 725 __cap_delay_requeue(mdsc, ci, true);
724 } 726 }
725 727
726 if (flags & CEPH_CAP_FLAG_AUTH) { 728 if (flags & CEPH_CAP_FLAG_AUTH) {
@@ -1647,7 +1649,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
1647 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && 1649 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
1648 (mask & CEPH_CAP_FILE_BUFFER)) 1650 (mask & CEPH_CAP_FILE_BUFFER))
1649 dirty |= I_DIRTY_DATASYNC; 1651 dirty |= I_DIRTY_DATASYNC;
1650 __cap_delay_requeue(mdsc, ci); 1652 __cap_delay_requeue(mdsc, ci, true);
1651 return dirty; 1653 return dirty;
1652} 1654}
1653 1655
@@ -2065,7 +2067,7 @@ ack:
2065 2067
2066 /* Reschedule delayed caps release if we delayed anything */ 2068 /* Reschedule delayed caps release if we delayed anything */
2067 if (delayed) 2069 if (delayed)
2068 __cap_delay_requeue(mdsc, ci); 2070 __cap_delay_requeue(mdsc, ci, false);
2069 2071
2070 spin_unlock(&ci->i_ceph_lock); 2072 spin_unlock(&ci->i_ceph_lock);
2071 2073
@@ -2125,7 +2127,7 @@ retry:
2125 2127
2126 if (delayed) { 2128 if (delayed) {
2127 spin_lock(&ci->i_ceph_lock); 2129 spin_lock(&ci->i_ceph_lock);
2128 __cap_delay_requeue(mdsc, ci); 2130 __cap_delay_requeue(mdsc, ci, true);
2129 spin_unlock(&ci->i_ceph_lock); 2131 spin_unlock(&ci->i_ceph_lock);
2130 } 2132 }
2131 } else { 2133 } else {
@@ -2671,17 +2673,18 @@ static void check_max_size(struct inode *inode, loff_t endoff)
2671 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2673 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
2672} 2674}
2673 2675
2674int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) 2676int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
2677 bool nonblock, int *got)
2675{ 2678{
2676 int ret, err = 0; 2679 int ret, err = 0;
2677 2680
2678 BUG_ON(need & ~CEPH_CAP_FILE_RD); 2681 BUG_ON(need & ~CEPH_CAP_FILE_RD);
2679 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 2682 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
2680 ret = ceph_pool_perm_check(ci, need); 2683 ret = ceph_pool_perm_check(ci, need);
2681 if (ret < 0) 2684 if (ret < 0)
2682 return ret; 2685 return ret;
2683 2686
2684 ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); 2687 ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err);
2685 if (ret) { 2688 if (ret) {
2686 if (err == -EAGAIN) { 2689 if (err == -EAGAIN) {
2687 ret = 0; 2690 ret = 0;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 92ab20433682..f788496fafcc 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@
1// SPDX-License-Identifier: GPL-2.0 1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h> 2#include <linux/ceph/ceph_debug.h>
3#include <linux/ceph/striper.h>
3 4
4#include <linux/module.h> 5#include <linux/module.h>
5#include <linux/sched.h> 6#include <linux/sched.h>
@@ -557,90 +558,26 @@ enum {
557}; 558};
558 559
559/* 560/*
560 * Read a range of bytes striped over one or more objects. Iterate over
561 * objects we stripe over. (That's not atomic, but good enough for now.)
562 *
563 * If we get a short result from the OSD, check against i_size; we need to
564 * only return a short read to the caller if we hit EOF.
565 */
566static int striped_read(struct inode *inode,
567 u64 pos, u64 len,
568 struct page **pages, int num_pages,
569 int page_align, int *checkeof)
570{
571 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
572 struct ceph_inode_info *ci = ceph_inode(inode);
573 u64 this_len;
574 loff_t i_size;
575 int page_idx;
576 int ret, read = 0;
577 bool hit_stripe, was_short;
578
579 /*
580 * we may need to do multiple reads. not atomic, unfortunately.
581 */
582more:
583 this_len = len;
584 page_idx = (page_align + read) >> PAGE_SHIFT;
585 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
586 &ci->i_layout, pos, &this_len,
587 ci->i_truncate_seq, ci->i_truncate_size,
588 pages + page_idx, num_pages - page_idx,
589 ((page_align + read) & ~PAGE_MASK));
590 if (ret == -ENOENT)
591 ret = 0;
592 hit_stripe = this_len < len;
593 was_short = ret >= 0 && ret < this_len;
594 dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
595 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
596
597 i_size = i_size_read(inode);
598 if (ret >= 0) {
599 if (was_short && (pos + ret < i_size)) {
600 int zlen = min(this_len - ret, i_size - pos - ret);
601 int zoff = page_align + read + ret;
602 dout(" zero gap %llu to %llu\n",
603 pos + ret, pos + ret + zlen);
604 ceph_zero_page_vector_range(zoff, zlen, pages);
605 ret += zlen;
606 }
607
608 read += ret;
609 pos += ret;
610 len -= ret;
611
612 /* hit stripe and need continue*/
613 if (len && hit_stripe && pos < i_size)
614 goto more;
615 }
616
617 if (read > 0) {
618 ret = read;
619 /* did we bounce off eof? */
620 if (pos + len > i_size)
621 *checkeof = CHECK_EOF;
622 }
623
624 dout("striped_read returns %d\n", ret);
625 return ret;
626}
627
628/*
629 * Completely synchronous read and write methods. Direct from __user 561 * Completely synchronous read and write methods. Direct from __user
630 * buffer to osd, or directly to user pages (if O_DIRECT). 562 * buffer to osd, or directly to user pages (if O_DIRECT).
631 * 563 *
632 * If the read spans object boundary, just do multiple reads. 564 * If the read spans object boundary, just do multiple reads. (That's not
565 * atomic, but good enough for now.)
566 *
567 * If we get a short result from the OSD, check against i_size; we need to
568 * only return a short read to the caller if we hit EOF.
633 */ 569 */
634static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, 570static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
635 int *checkeof) 571 int *retry_op)
636{ 572{
637 struct file *file = iocb->ki_filp; 573 struct file *file = iocb->ki_filp;
638 struct inode *inode = file_inode(file); 574 struct inode *inode = file_inode(file);
639 struct page **pages; 575 struct ceph_inode_info *ci = ceph_inode(inode);
640 u64 off = iocb->ki_pos; 576 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
641 int num_pages; 577 struct ceph_osd_client *osdc = &fsc->client->osdc;
642 ssize_t ret; 578 ssize_t ret;
643 size_t len = iov_iter_count(to); 579 u64 off = iocb->ki_pos;
580 u64 len = iov_iter_count(to);
644 581
645 dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, 582 dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
646 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 583 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,61 +590,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
653 * but it will at least behave sensibly when they are 590 * but it will at least behave sensibly when they are
654 * in sequence. 591 * in sequence.
655 */ 592 */
656 ret = filemap_write_and_wait_range(inode->i_mapping, off, 593 ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
657 off + len);
658 if (ret < 0) 594 if (ret < 0)
659 return ret; 595 return ret;
660 596
661 if (unlikely(to->type & ITER_PIPE)) { 597 ret = 0;
598 while ((len = iov_iter_count(to)) > 0) {
599 struct ceph_osd_request *req;
600 struct page **pages;
601 int num_pages;
662 size_t page_off; 602 size_t page_off;
663 ret = iov_iter_get_pages_alloc(to, &pages, len, 603 u64 i_size;
664 &page_off); 604 bool more;
665 if (ret <= 0) 605
666 return -ENOMEM; 606 req = ceph_osdc_new_request(osdc, &ci->i_layout,
667 num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); 607 ci->i_vino, off, &len, 0, 1,
608 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
609 NULL, ci->i_truncate_seq,
610 ci->i_truncate_size, false);
611 if (IS_ERR(req)) {
612 ret = PTR_ERR(req);
613 break;
614 }
615
616 more = len < iov_iter_count(to);
668 617
669 ret = striped_read(inode, off, ret, pages, num_pages, 618 if (unlikely(to->type & ITER_PIPE)) {
670 page_off, checkeof); 619 ret = iov_iter_get_pages_alloc(to, &pages, len,
671 if (ret > 0) { 620 &page_off);
672 iov_iter_advance(to, ret); 621 if (ret <= 0) {
673 off += ret; 622 ceph_osdc_put_request(req);
623 ret = -ENOMEM;
624 break;
625 }
626 num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
627 if (ret < len) {
628 len = ret;
629 osd_req_op_extent_update(req, 0, len);
630 more = false;
631 }
674 } else { 632 } else {
675 iov_iter_advance(to, 0); 633 num_pages = calc_pages_for(off, len);
634 page_off = off & ~PAGE_MASK;
635 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
636 if (IS_ERR(pages)) {
637 ceph_osdc_put_request(req);
638 ret = PTR_ERR(pages);
639 break;
640 }
676 } 641 }
677 ceph_put_page_vector(pages, num_pages, false); 642
678 } else { 643 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
679 num_pages = calc_pages_for(off, len); 644 false, false);
680 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 645 ret = ceph_osdc_start_request(osdc, req, false);
681 if (IS_ERR(pages)) 646 if (!ret)
682 return PTR_ERR(pages); 647 ret = ceph_osdc_wait_request(osdc, req);
683 648 ceph_osdc_put_request(req);
684 ret = striped_read(inode, off, len, pages, num_pages, 649
685 (off & ~PAGE_MASK), checkeof); 650 i_size = i_size_read(inode);
686 if (ret > 0) { 651 dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
687 int l, k = 0; 652 off, len, ret, i_size, (more ? " MORE" : ""));
688 size_t left = ret; 653
689 654 if (ret == -ENOENT)
690 while (left) { 655 ret = 0;
691 size_t page_off = off & ~PAGE_MASK; 656 if (ret >= 0 && ret < len && (off + ret < i_size)) {
692 size_t copy = min_t(size_t, left, 657 int zlen = min(len - ret, i_size - off - ret);
693 PAGE_SIZE - page_off); 658 int zoff = page_off + ret;
694 l = copy_page_to_iter(pages[k++], page_off, 659 dout("sync_read zero gap %llu~%llu\n",
695 copy, to); 660 off + ret, off + ret + zlen);
696 off += l; 661 ceph_zero_page_vector_range(zoff, zlen, pages);
697 left -= l; 662 ret += zlen;
698 if (l < copy) 663 }
664
665 if (unlikely(to->type & ITER_PIPE)) {
666 if (ret > 0) {
667 iov_iter_advance(to, ret);
668 off += ret;
669 } else {
670 iov_iter_advance(to, 0);
671 }
672 ceph_put_page_vector(pages, num_pages, false);
673 } else {
674 int idx = 0;
675 size_t left = ret > 0 ? ret : 0;
676 while (left > 0) {
677 size_t len, copied;
678 page_off = off & ~PAGE_MASK;
679 len = min_t(size_t, left, PAGE_SIZE - page_off);
680 copied = copy_page_to_iter(pages[idx++],
681 page_off, len, to);
682 off += copied;
683 left -= copied;
684 if (copied < len) {
685 ret = -EFAULT;
699 break; 686 break;
687 }
700 } 688 }
689 ceph_release_page_vector(pages, num_pages);
701 } 690 }
702 ceph_release_page_vector(pages, num_pages); 691
692 if (ret <= 0 || off >= i_size || !more)
693 break;
703 } 694 }
704 695
705 if (off > iocb->ki_pos) { 696 if (off > iocb->ki_pos) {
697 if (ret >= 0 &&
698 iov_iter_count(to) > 0 && off >= i_size_read(inode))
699 *retry_op = CHECK_EOF;
706 ret = off - iocb->ki_pos; 700 ret = off - iocb->ki_pos;
707 iocb->ki_pos = off; 701 iocb->ki_pos = off;
708 } 702 }
709 703
710 dout("sync_read result %zd\n", ret); 704 dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
711 return ret; 705 return ret;
712} 706}
713 707
@@ -865,7 +859,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
865 } 859 }
866 spin_unlock(&ci->i_ceph_lock); 860 spin_unlock(&ci->i_ceph_lock);
867 861
868 req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2, 862 req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
869 false, GFP_NOFS); 863 false, GFP_NOFS);
870 if (!req) { 864 if (!req) {
871 ret = -ENOMEM; 865 ret = -ENOMEM;
@@ -877,6 +871,11 @@ static void ceph_aio_retry_work(struct work_struct *work)
877 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); 871 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
878 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); 872 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
879 873
874 req->r_ops[0] = orig_req->r_ops[0];
875
876 req->r_mtime = aio_req->mtime;
877 req->r_data_offset = req->r_ops[0].extent.offset;
878
880 ret = ceph_osdc_alloc_messages(req, GFP_NOFS); 879 ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
881 if (ret) { 880 if (ret) {
882 ceph_osdc_put_request(req); 881 ceph_osdc_put_request(req);
@@ -884,11 +883,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
884 goto out; 883 goto out;
885 } 884 }
886 885
887 req->r_ops[0] = orig_req->r_ops[0];
888
889 req->r_mtime = aio_req->mtime;
890 req->r_data_offset = req->r_ops[0].extent.offset;
891
892 ceph_osdc_put_request(orig_req); 886 ceph_osdc_put_request(orig_req);
893 887
894 req->r_callback = ceph_aio_complete_req; 888 req->r_callback = ceph_aio_complete_req;
@@ -1735,7 +1729,6 @@ static long ceph_fallocate(struct file *file, int mode,
1735 struct ceph_file_info *fi = file->private_data; 1729 struct ceph_file_info *fi = file->private_data;
1736 struct inode *inode = file_inode(file); 1730 struct inode *inode = file_inode(file);
1737 struct ceph_inode_info *ci = ceph_inode(inode); 1731 struct ceph_inode_info *ci = ceph_inode(inode);
1738 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1739 struct ceph_cap_flush *prealloc_cf; 1732 struct ceph_cap_flush *prealloc_cf;
1740 int want, got = 0; 1733 int want, got = 0;
1741 int dirty; 1734 int dirty;
@@ -1743,10 +1736,7 @@ static long ceph_fallocate(struct file *file, int mode,
1743 loff_t endoff = 0; 1736 loff_t endoff = 0;
1744 loff_t size; 1737 loff_t size;
1745 1738
1746 if ((offset + length) > max(i_size_read(inode), fsc->max_file_size)) 1739 if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
1747 return -EFBIG;
1748
1749 if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
1750 return -EOPNOTSUPP; 1740 return -EOPNOTSUPP;
1751 1741
1752 if (!S_ISREG(inode->i_mode)) 1742 if (!S_ISREG(inode->i_mode))
@@ -1763,18 +1753,6 @@ static long ceph_fallocate(struct file *file, int mode,
1763 goto unlock; 1753 goto unlock;
1764 } 1754 }
1765 1755
1766 if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
1767 ceph_quota_is_max_bytes_exceeded(inode, offset + length)) {
1768 ret = -EDQUOT;
1769 goto unlock;
1770 }
1771
1772 if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) &&
1773 !(mode & FALLOC_FL_PUNCH_HOLE)) {
1774 ret = -ENOSPC;
1775 goto unlock;
1776 }
1777
1778 if (ci->i_inline_version != CEPH_INLINE_NONE) { 1756 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1779 ret = ceph_uninline_data(file, NULL); 1757 ret = ceph_uninline_data(file, NULL);
1780 if (ret < 0) 1758 if (ret < 0)
@@ -1782,12 +1760,12 @@ static long ceph_fallocate(struct file *file, int mode,
1782 } 1760 }
1783 1761
1784 size = i_size_read(inode); 1762 size = i_size_read(inode);
1785 if (!(mode & FALLOC_FL_KEEP_SIZE)) { 1763
1786 endoff = offset + length; 1764 /* Are we punching a hole beyond EOF? */
1787 ret = inode_newsize_ok(inode, endoff); 1765 if (offset >= size)
1788 if (ret) 1766 goto unlock;
1789 goto unlock; 1767 if ((offset + length) > size)
1790 } 1768 length = size - offset;
1791 1769
1792 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1770 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1793 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1771 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
@@ -1798,16 +1776,8 @@ static long ceph_fallocate(struct file *file, int mode,
1798 if (ret < 0) 1776 if (ret < 0)
1799 goto unlock; 1777 goto unlock;
1800 1778
1801 if (mode & FALLOC_FL_PUNCH_HOLE) { 1779 ceph_zero_pagecache_range(inode, offset, length);
1802 if (offset < size) 1780 ret = ceph_zero_objects(inode, offset, length);
1803 ceph_zero_pagecache_range(inode, offset, length);
1804 ret = ceph_zero_objects(inode, offset, length);
1805 } else if (endoff > size) {
1806 truncate_pagecache_range(inode, size, -1);
1807 if (ceph_inode_set_size(inode, endoff))
1808 ceph_check_caps(ceph_inode(inode),
1809 CHECK_CAPS_AUTHONLY, NULL);
1810 }
1811 1781
1812 if (!ret) { 1782 if (!ret) {
1813 spin_lock(&ci->i_ceph_lock); 1783 spin_lock(&ci->i_ceph_lock);
@@ -1817,9 +1787,6 @@ static long ceph_fallocate(struct file *file, int mode,
1817 spin_unlock(&ci->i_ceph_lock); 1787 spin_unlock(&ci->i_ceph_lock);
1818 if (dirty) 1788 if (dirty)
1819 __mark_inode_dirty(inode, dirty); 1789 __mark_inode_dirty(inode, dirty);
1820 if ((endoff > size) &&
1821 ceph_quota_is_max_bytes_approaching(inode, endoff))
1822 ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
1823 } 1790 }
1824 1791
1825 ceph_put_cap_refs(ci, got); 1792 ceph_put_cap_refs(ci, got);
@@ -1829,6 +1796,300 @@ unlock:
1829 return ret; 1796 return ret;
1830} 1797}
1831 1798
1799/*
1800 * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
1801 * src_ci. Two attempts are made to obtain both caps, and an error is return if
1802 * this fails; zero is returned on success.
1803 */
1804static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
1805 loff_t src_endoff, int *src_got,
1806 struct ceph_inode_info *dst_ci,
1807 loff_t dst_endoff, int *dst_got)
1808{
1809 int ret = 0;
1810 bool retrying = false;
1811
1812retry_caps:
1813 ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
1814 dst_endoff, dst_got, NULL);
1815 if (ret < 0)
1816 return ret;
1817
1818 /*
1819 * Since we're already holding the FILE_WR capability for the dst file,
1820 * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some
1821 * retry dance instead to try to get both capabilities.
1822 */
1823 ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
1824 false, src_got);
1825 if (ret <= 0) {
1826 /* Start by dropping dst_ci caps and getting src_ci caps */
1827 ceph_put_cap_refs(dst_ci, *dst_got);
1828 if (retrying) {
1829 if (!ret)
1830 /* ceph_try_get_caps masks EAGAIN */
1831 ret = -EAGAIN;
1832 return ret;
1833 }
1834 ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
1835 CEPH_CAP_FILE_SHARED, src_endoff,
1836 src_got, NULL);
1837 if (ret < 0)
1838 return ret;
1839 /*... drop src_ci caps too, and retry */
1840 ceph_put_cap_refs(src_ci, *src_got);
1841 retrying = true;
1842 goto retry_caps;
1843 }
1844 return ret;
1845}
1846
1847static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
1848 struct ceph_inode_info *dst_ci, int dst_got)
1849{
1850 ceph_put_cap_refs(src_ci, src_got);
1851 ceph_put_cap_refs(dst_ci, dst_got);
1852}
1853
1854/*
1855 * This function does several size-related checks, returning an error if:
1856 * - source file is smaller than off+len
1857 * - destination file size is not OK (inode_newsize_ok())
1858 * - max bytes quotas is exceeded
1859 */
1860static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
1861 loff_t src_off, loff_t dst_off, size_t len)
1862{
1863 loff_t size, endoff;
1864
1865 size = i_size_read(src_inode);
1866 /*
1867 * Don't copy beyond source file EOF. Instead of simply setting length
1868 * to (size - src_off), just drop to VFS default implementation, as the
1869 * local i_size may be stale due to other clients writing to the source
1870 * inode.
1871 */
1872 if (src_off + len > size) {
1873 dout("Copy beyond EOF (%llu + %zu > %llu)\n",
1874 src_off, len, size);
1875 return -EOPNOTSUPP;
1876 }
1877 size = i_size_read(dst_inode);
1878
1879 endoff = dst_off + len;
1880 if (inode_newsize_ok(dst_inode, endoff))
1881 return -EOPNOTSUPP;
1882
1883 if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
1884 return -EDQUOT;
1885
1886 return 0;
1887}
1888
1889static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
1890 struct file *dst_file, loff_t dst_off,
1891 size_t len, unsigned int flags)
1892{
1893 struct inode *src_inode = file_inode(src_file);
1894 struct inode *dst_inode = file_inode(dst_file);
1895 struct ceph_inode_info *src_ci = ceph_inode(src_inode);
1896 struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
1897 struct ceph_cap_flush *prealloc_cf;
1898 struct ceph_object_locator src_oloc, dst_oloc;
1899 struct ceph_object_id src_oid, dst_oid;
1900 loff_t endoff = 0, size;
1901 ssize_t ret = -EIO;
1902 u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
1903 u32 src_objlen, dst_objlen, object_size;
1904 int src_got = 0, dst_got = 0, err, dirty;
1905 bool do_final_copy = false;
1906
1907 if (src_inode == dst_inode)
1908 return -EINVAL;
1909 if (ceph_snap(dst_inode) != CEPH_NOSNAP)
1910 return -EROFS;
1911
1912 /*
1913 * Some of the checks below will return -EOPNOTSUPP, which will force a
1914 * fallback to the default VFS copy_file_range implementation. This is
1915 * desirable in several cases (for ex, the 'len' is smaller than the
1916 * size of the objects, or in cases where that would be more
1917 * efficient).
1918 */
1919
1920 if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM))
1921 return -EOPNOTSUPP;
1922
1923 if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
1924 (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
1925 (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
1926 return -EOPNOTSUPP;
1927
1928 if (len < src_ci->i_layout.object_size)
1929 return -EOPNOTSUPP; /* no remote copy will be done */
1930
1931 prealloc_cf = ceph_alloc_cap_flush();
1932 if (!prealloc_cf)
1933 return -ENOMEM;
1934
1935 /* Start by sync'ing the source file */
1936 ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
1937 if (ret < 0)
1938 goto out;
1939
1940 /*
1941 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
1942 * clients may have dirty data in their caches. And OSDs know nothing
1943 * about caps, so they can't safely do the remote object copies.
1944 */
1945 err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
1946 dst_ci, (dst_off + len), &dst_got);
1947 if (err < 0) {
1948 dout("get_rd_wr_caps returned %d\n", err);
1949 ret = -EOPNOTSUPP;
1950 goto out;
1951 }
1952
1953 ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
1954 if (ret < 0)
1955 goto out_caps;
1956
1957 size = i_size_read(dst_inode);
1958 endoff = dst_off + len;
1959
1960 /* Drop dst file cached pages */
1961 ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
1962 dst_off >> PAGE_SHIFT,
1963 endoff >> PAGE_SHIFT);
1964 if (ret < 0) {
1965 dout("Failed to invalidate inode pages (%zd)\n", ret);
1966 ret = 0; /* XXX */
1967 }
1968 src_oloc.pool = src_ci->i_layout.pool_id;
1969 src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
1970 dst_oloc.pool = dst_ci->i_layout.pool_id;
1971 dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
1972
1973 ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
1974 src_ci->i_layout.object_size,
1975 &src_objnum, &src_objoff, &src_objlen);
1976 ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
1977 dst_ci->i_layout.object_size,
1978 &dst_objnum, &dst_objoff, &dst_objlen);
1979 /* object-level offsets need to the same */
1980 if (src_objoff != dst_objoff) {
1981 ret = -EOPNOTSUPP;
1982 goto out_caps;
1983 }
1984
1985 /*
1986 * Do a manual copy if the object offset isn't object aligned.
1987 * 'src_objlen' contains the bytes left until the end of the object,
1988 * starting at the src_off
1989 */
1990 if (src_objoff) {
1991 /*
1992 * we need to temporarily drop all caps as we'll be calling
1993 * {read,write}_iter, which will get caps again.
1994 */
1995 put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
1996 ret = do_splice_direct(src_file, &src_off, dst_file,
1997 &dst_off, src_objlen, flags);
1998 if (ret < 0) {
1999 dout("do_splice_direct returned %d\n", err);
2000 goto out;
2001 }
2002 len -= ret;
2003 err = get_rd_wr_caps(src_ci, (src_off + len),
2004 &src_got, dst_ci,
2005 (dst_off + len), &dst_got);
2006 if (err < 0)
2007 goto out;
2008 err = is_file_size_ok(src_inode, dst_inode,
2009 src_off, dst_off, len);
2010 if (err < 0)
2011 goto out_caps;
2012 }
2013 object_size = src_ci->i_layout.object_size;
2014 while (len >= object_size) {
2015 ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
2016 object_size, &src_objnum,
2017 &src_objoff, &src_objlen);
2018 ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
2019 object_size, &dst_objnum,
2020 &dst_objoff, &dst_objlen);
2021 ceph_oid_init(&src_oid);
2022 ceph_oid_printf(&src_oid, "%llx.%08llx",
2023 src_ci->i_vino.ino, src_objnum);
2024 ceph_oid_init(&dst_oid);
2025 ceph_oid_printf(&dst_oid, "%llx.%08llx",
2026 dst_ci->i_vino.ino, dst_objnum);
2027 /* Do an object remote copy */
2028 err = ceph_osdc_copy_from(
2029 &ceph_inode_to_client(src_inode)->client->osdc,
2030 src_ci->i_vino.snap, 0,
2031 &src_oid, &src_oloc,
2032 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2033 CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
2034 &dst_oid, &dst_oloc,
2035 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2036 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
2037 if (err) {
2038 dout("ceph_osdc_copy_from returned %d\n", err);
2039 if (!ret)
2040 ret = err;
2041 goto out_caps;
2042 }
2043 len -= object_size;
2044 src_off += object_size;
2045 dst_off += object_size;
2046 ret += object_size;
2047 }
2048
2049 if (len)
2050 /* We still need one final local copy */
2051 do_final_copy = true;
2052
2053 file_update_time(dst_file);
2054 if (endoff > size) {
2055 int caps_flags = 0;
2056
2057 /* Let the MDS know about dst file size change */
2058 if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
2059 caps_flags |= CHECK_CAPS_NODELAY;
2060 if (ceph_inode_set_size(dst_inode, endoff))
2061 caps_flags |= CHECK_CAPS_AUTHONLY;
2062 if (caps_flags)
2063 ceph_check_caps(dst_ci, caps_flags, NULL);
2064 }
2065 /* Mark Fw dirty */
2066 spin_lock(&dst_ci->i_ceph_lock);
2067 dst_ci->i_inline_version = CEPH_INLINE_NONE;
2068 dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
2069 spin_unlock(&dst_ci->i_ceph_lock);
2070 if (dirty)
2071 __mark_inode_dirty(dst_inode, dirty);
2072
2073out_caps:
2074 put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
2075
2076 if (do_final_copy) {
2077 err = do_splice_direct(src_file, &src_off, dst_file,
2078 &dst_off, len, flags);
2079 if (err < 0) {
2080 dout("do_splice_direct returned %d\n", err);
2081 goto out;
2082 }
2083 len -= err;
2084 ret += err;
2085 }
2086
2087out:
2088 ceph_free_cap_flush(prealloc_cf);
2089
2090 return ret;
2091}
2092
1832const struct file_operations ceph_file_fops = { 2093const struct file_operations ceph_file_fops = {
1833 .open = ceph_open, 2094 .open = ceph_open,
1834 .release = ceph_release, 2095 .release = ceph_release,
@@ -1844,5 +2105,5 @@ const struct file_operations ceph_file_fops = {
1844 .unlocked_ioctl = ceph_ioctl, 2105 .unlocked_ioctl = ceph_ioctl,
1845 .compat_ioctl = ceph_ioctl, 2106 .compat_ioctl = ceph_ioctl,
1846 .fallocate = ceph_fallocate, 2107 .fallocate = ceph_fallocate,
2108 .copy_file_range = ceph_copy_file_range,
1847}; 2109};
1848
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ebc7bdaed2d0..79dd5e6ed755 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1132,8 +1132,12 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
1132 if (IS_ERR(realdn)) { 1132 if (IS_ERR(realdn)) {
1133 pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n", 1133 pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
1134 PTR_ERR(realdn), dn, in, ceph_vinop(in)); 1134 PTR_ERR(realdn), dn, in, ceph_vinop(in));
1135 dput(dn); 1135 dn = realdn;
1136 dn = realdn; /* note realdn contains the error */ 1136 /*
1137 * Caller should release 'dn' in the case of error.
1138 * If 'req->r_dentry' is passed to this function,
1139 * caller should leave 'req->r_dentry' untouched.
1140 */
1137 goto out; 1141 goto out;
1138 } else if (realdn) { 1142 } else if (realdn) {
1139 dout("dn %p (%d) spliced with %p (%d) " 1143 dout("dn %p (%d) spliced with %p (%d) "
@@ -1196,7 +1200,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
1196 WARN_ON_ONCE(1); 1200 WARN_ON_ONCE(1);
1197 } 1201 }
1198 1202
1199 if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) { 1203 if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME &&
1204 test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
1205 !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
1200 struct qstr dname; 1206 struct qstr dname;
1201 struct dentry *dn, *parent; 1207 struct dentry *dn, *parent;
1202 1208
@@ -1677,7 +1683,6 @@ retry_lookup:
1677 if (IS_ERR(realdn)) { 1683 if (IS_ERR(realdn)) {
1678 err = PTR_ERR(realdn); 1684 err = PTR_ERR(realdn);
1679 d_drop(dn); 1685 d_drop(dn);
1680 dn = NULL;
1681 goto next_item; 1686 goto next_item;
1682 } 1687 }
1683 dn = realdn; 1688 dn = realdn;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index bc43c822426a..67a9aeb2f4ec 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2071,7 +2071,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2071 if (req->r_old_dentry_drop) 2071 if (req->r_old_dentry_drop)
2072 len += req->r_old_dentry->d_name.len; 2072 len += req->r_old_dentry->d_name.len;
2073 2073
2074 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); 2074 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2075 if (!msg) { 2075 if (!msg) {
2076 msg = ERR_PTR(-ENOMEM); 2076 msg = ERR_PTR(-ENOMEM);
2077 goto out_free2; 2077 goto out_free2;
@@ -2136,7 +2136,6 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2136 2136
2137 if (req->r_pagelist) { 2137 if (req->r_pagelist) {
2138 struct ceph_pagelist *pagelist = req->r_pagelist; 2138 struct ceph_pagelist *pagelist = req->r_pagelist;
2139 refcount_inc(&pagelist->refcnt);
2140 ceph_msg_data_add_pagelist(msg, pagelist); 2139 ceph_msg_data_add_pagelist(msg, pagelist);
2141 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2140 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2142 } else { 2141 } else {
@@ -3126,12 +3125,11 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3126 3125
3127 pr_info("mds%d reconnect start\n", mds); 3126 pr_info("mds%d reconnect start\n", mds);
3128 3127
3129 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 3128 pagelist = ceph_pagelist_alloc(GFP_NOFS);
3130 if (!pagelist) 3129 if (!pagelist)
3131 goto fail_nopagelist; 3130 goto fail_nopagelist;
3132 ceph_pagelist_init(pagelist);
3133 3131
3134 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); 3132 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3135 if (!reply) 3133 if (!reply)
3136 goto fail_nomsg; 3134 goto fail_nomsg;
3137 3135
@@ -3241,6 +3239,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3241 mutex_unlock(&mdsc->mutex); 3239 mutex_unlock(&mdsc->mutex);
3242 3240
3243 up_read(&mdsc->snap_rwsem); 3241 up_read(&mdsc->snap_rwsem);
3242 ceph_pagelist_release(pagelist);
3244 return; 3243 return;
3245 3244
3246fail: 3245fail:
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index eab1359d0553..b5ecd6f50360 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -165,6 +165,8 @@ enum {
165 Opt_noacl, 165 Opt_noacl,
166 Opt_quotadf, 166 Opt_quotadf,
167 Opt_noquotadf, 167 Opt_noquotadf,
168 Opt_copyfrom,
169 Opt_nocopyfrom,
168}; 170};
169 171
170static match_table_t fsopt_tokens = { 172static match_table_t fsopt_tokens = {
@@ -203,6 +205,8 @@ static match_table_t fsopt_tokens = {
203 {Opt_noacl, "noacl"}, 205 {Opt_noacl, "noacl"},
204 {Opt_quotadf, "quotadf"}, 206 {Opt_quotadf, "quotadf"},
205 {Opt_noquotadf, "noquotadf"}, 207 {Opt_noquotadf, "noquotadf"},
208 {Opt_copyfrom, "copyfrom"},
209 {Opt_nocopyfrom, "nocopyfrom"},
206 {-1, NULL} 210 {-1, NULL}
207}; 211};
208 212
@@ -355,6 +359,12 @@ static int parse_fsopt_token(char *c, void *private)
355 case Opt_noquotadf: 359 case Opt_noquotadf:
356 fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF; 360 fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
357 break; 361 break;
362 case Opt_copyfrom:
363 fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM;
364 break;
365 case Opt_nocopyfrom:
366 fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM;
367 break;
358#ifdef CONFIG_CEPH_FS_POSIX_ACL 368#ifdef CONFIG_CEPH_FS_POSIX_ACL
359 case Opt_acl: 369 case Opt_acl:
360 fsopt->sb_flags |= SB_POSIXACL; 370 fsopt->sb_flags |= SB_POSIXACL;
@@ -553,6 +563,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
553 seq_puts(m, ",noacl"); 563 seq_puts(m, ",noacl");
554#endif 564#endif
555 565
566 if (fsopt->flags & CEPH_MOUNT_OPT_NOCOPYFROM)
567 seq_puts(m, ",nocopyfrom");
568
556 if (fsopt->mds_namespace) 569 if (fsopt->mds_namespace)
557 seq_show_option(m, "mds_namespace", fsopt->mds_namespace); 570 seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
558 if (fsopt->wsize != CEPH_MAX_WRITE_SIZE) 571 if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 582e28fd1b7b..c005a5400f2e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -40,6 +40,7 @@
40#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ 40#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
41#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */ 41#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */
42#define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */ 42#define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */
43#define CEPH_MOUNT_OPT_NOCOPYFROM (1<<14) /* don't use RADOS 'copy-from' op */
43 44
44#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE 45#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
45 46
@@ -1008,7 +1009,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
1008extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 1009extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
1009 loff_t endoff, int *got, struct page **pinned_page); 1010 loff_t endoff, int *got, struct page **pinned_page);
1010extern int ceph_try_get_caps(struct ceph_inode_info *ci, 1011extern int ceph_try_get_caps(struct ceph_inode_info *ci,
1011 int need, int want, int *got); 1012 int need, int want, bool nonblock, int *got);
1012 1013
1013/* for counting open files by mode */ 1014/* for counting open files by mode */
1014extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); 1015extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 5cc8b94f8206..316f6ad10644 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -951,11 +951,10 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
951 951
952 if (size > 0) { 952 if (size > 0) {
953 /* copy value into pagelist */ 953 /* copy value into pagelist */
954 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 954 pagelist = ceph_pagelist_alloc(GFP_NOFS);
955 if (!pagelist) 955 if (!pagelist)
956 return -ENOMEM; 956 return -ENOMEM;
957 957
958 ceph_pagelist_init(pagelist);
959 err = ceph_pagelist_append(pagelist, value, size); 958 err = ceph_pagelist_append(pagelist, value, size);
960 if (err) 959 if (err)
961 goto out; 960 goto out;
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 49c93b9308d7..68bb09c29ce8 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -81,7 +81,13 @@ struct ceph_options {
81 81
82#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) 82#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
83#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) 83#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)
84#define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) 84
85/*
86 * Handle the largest possible rbd object in one message.
87 * There is no limit on the size of cephfs objects, but it has to obey
88 * rsize and wsize mount options anyway.
89 */
90#define CEPH_MSG_MAX_DATA_LEN (32*1024*1024)
85 91
86#define CEPH_AUTH_NAME_DEFAULT "guest" 92#define CEPH_AUTH_NAME_DEFAULT "guest"
87 93
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index fc2b4491ee0a..800a2128d411 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -82,22 +82,6 @@ enum ceph_msg_data_type {
82 CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ 82 CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */
83}; 83};
84 84
85static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
86{
87 switch (type) {
88 case CEPH_MSG_DATA_NONE:
89 case CEPH_MSG_DATA_PAGES:
90 case CEPH_MSG_DATA_PAGELIST:
91#ifdef CONFIG_BLOCK
92 case CEPH_MSG_DATA_BIO:
93#endif /* CONFIG_BLOCK */
94 case CEPH_MSG_DATA_BVECS:
95 return true;
96 default:
97 return false;
98 }
99}
100
101#ifdef CONFIG_BLOCK 85#ifdef CONFIG_BLOCK
102 86
103struct ceph_bio_iter { 87struct ceph_bio_iter {
@@ -181,7 +165,6 @@ struct ceph_bvec_iter {
181} while (0) 165} while (0)
182 166
183struct ceph_msg_data { 167struct ceph_msg_data {
184 struct list_head links; /* ceph_msg->data */
185 enum ceph_msg_data_type type; 168 enum ceph_msg_data_type type;
186 union { 169 union {
187#ifdef CONFIG_BLOCK 170#ifdef CONFIG_BLOCK
@@ -202,7 +185,6 @@ struct ceph_msg_data {
202 185
203struct ceph_msg_data_cursor { 186struct ceph_msg_data_cursor {
204 size_t total_resid; /* across all data items */ 187 size_t total_resid; /* across all data items */
205 struct list_head *data_head; /* = &ceph_msg->data */
206 188
207 struct ceph_msg_data *data; /* current data item */ 189 struct ceph_msg_data *data; /* current data item */
208 size_t resid; /* bytes not yet consumed */ 190 size_t resid; /* bytes not yet consumed */
@@ -240,7 +222,9 @@ struct ceph_msg {
240 struct ceph_buffer *middle; 222 struct ceph_buffer *middle;
241 223
242 size_t data_length; 224 size_t data_length;
243 struct list_head data; 225 struct ceph_msg_data *data;
226 int num_data_items;
227 int max_data_items;
244 struct ceph_msg_data_cursor cursor; 228 struct ceph_msg_data_cursor cursor;
245 229
246 struct ceph_connection *con; 230 struct ceph_connection *con;
@@ -381,6 +365,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
381void ceph_msg_data_add_bvecs(struct ceph_msg *msg, 365void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
382 struct ceph_bvec_iter *bvec_pos); 366 struct ceph_bvec_iter *bvec_pos);
383 367
368struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
369 gfp_t flags, bool can_fail);
384extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 370extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
385 bool can_fail); 371 bool can_fail);
386 372
diff --git a/include/linux/ceph/msgpool.h b/include/linux/ceph/msgpool.h
index 76c98a512758..729cdf700eae 100644
--- a/include/linux/ceph/msgpool.h
+++ b/include/linux/ceph/msgpool.h
@@ -13,14 +13,15 @@ struct ceph_msgpool {
13 mempool_t *pool; 13 mempool_t *pool;
14 int type; /* preallocated message type */ 14 int type; /* preallocated message type */
15 int front_len; /* preallocated payload size */ 15 int front_len; /* preallocated payload size */
16 int max_data_items;
16}; 17};
17 18
18extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type, 19int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
19 int front_len, int size, bool blocking, 20 int front_len, int max_data_items, int size,
20 const char *name); 21 const char *name);
21extern void ceph_msgpool_destroy(struct ceph_msgpool *pool); 22extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
22extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *, 23struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
23 int front_len); 24 int max_data_items);
24extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *); 25extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
25 26
26#endif 27#endif
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 02096da01845..7a2af5034278 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -136,6 +136,13 @@ struct ceph_osd_req_op {
136 u64 expected_object_size; 136 u64 expected_object_size;
137 u64 expected_write_size; 137 u64 expected_write_size;
138 } alloc_hint; 138 } alloc_hint;
139 struct {
140 u64 snapid;
141 u64 src_version;
142 u8 flags;
143 u32 src_fadvise_flags;
144 struct ceph_osd_data osd_data;
145 } copy_from;
139 }; 146 };
140}; 147};
141 148
@@ -444,9 +451,8 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
444 struct page **pages, u64 length, 451 struct page **pages, u64 length,
445 u32 alignment, bool pages_from_pool, 452 u32 alignment, bool pages_from_pool,
446 bool own_pages); 453 bool own_pages);
447extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req, 454int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
448 unsigned int which, u16 opcode, 455 const char *class, const char *method);
449 const char *class, const char *method);
450extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 456extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
451 u16 opcode, const char *name, const void *value, 457 u16 opcode, const char *name, const void *value,
452 size_t size, u8 cmp_op, u8 cmp_mode); 458 size_t size, u8 cmp_op, u8 cmp_mode);
@@ -511,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
511 struct timespec64 *mtime, 517 struct timespec64 *mtime,
512 struct page **pages, int nr_pages); 518 struct page **pages, int nr_pages);
513 519
520int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
521 u64 src_snapid, u64 src_version,
522 struct ceph_object_id *src_oid,
523 struct ceph_object_locator *src_oloc,
524 u32 src_fadvise_flags,
525 struct ceph_object_id *dst_oid,
526 struct ceph_object_locator *dst_oloc,
527 u32 dst_fadvise_flags,
528 u8 copy_from_flags);
529
514/* watch/notify */ 530/* watch/notify */
515struct ceph_osd_linger_request * 531struct ceph_osd_linger_request *
516ceph_osdc_watch(struct ceph_osd_client *osdc, 532ceph_osdc_watch(struct ceph_osd_client *osdc,
diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h
index d0223364349f..5dead8486fd8 100644
--- a/include/linux/ceph/pagelist.h
+++ b/include/linux/ceph/pagelist.h
@@ -23,16 +23,7 @@ struct ceph_pagelist_cursor {
23 size_t room; /* room remaining to reset to */ 23 size_t room; /* room remaining to reset to */
24}; 24};
25 25
26static inline void ceph_pagelist_init(struct ceph_pagelist *pl) 26struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags);
27{
28 INIT_LIST_HEAD(&pl->head);
29 pl->mapped_tail = NULL;
30 pl->length = 0;
31 pl->room = 0;
32 INIT_LIST_HEAD(&pl->free_list);
33 pl->num_pages_free = 0;
34 refcount_set(&pl->refcnt, 1);
35}
36 27
37extern void ceph_pagelist_release(struct ceph_pagelist *pl); 28extern void ceph_pagelist_release(struct ceph_pagelist *pl);
38 29
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index f1988387c5ad..3eb0e55665b4 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -410,6 +410,14 @@ enum {
410enum { 410enum {
411 CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */ 411 CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */
412 CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */ 412 CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */
413 CEPH_OSD_OP_FLAG_FADVISE_RANDOM = 0x4, /* the op is random */
414 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */
415 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED = 0x10,/* data will be accessed in
416 the near future */
417 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED = 0x20,/* data will not be accessed
418 in the near future */
419 CEPH_OSD_OP_FLAG_FADVISE_NOCACHE = 0x40,/* data will be accessed only
420 once by this client */
413}; 421};
414 422
415#define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ 423#define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/
@@ -432,6 +440,15 @@ enum {
432}; 440};
433 441
434enum { 442enum {
443 CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1, /* part of a flush operation */
444 CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */
445 CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4, /* ignore osd cache logic */
446 CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to
447 * cloneid */
448 CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16, /* order with write */
449};
450
451enum {
435 CEPH_OSD_WATCH_OP_UNWATCH = 0, 452 CEPH_OSD_WATCH_OP_UNWATCH = 0,
436 CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1, 453 CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
437 /* note: use only ODD ids to prevent pre-giant code from 454 /* note: use only ODD ids to prevent pre-giant code from
@@ -497,6 +514,17 @@ struct ceph_osd_op {
497 __le64 expected_object_size; 514 __le64 expected_object_size;
498 __le64 expected_write_size; 515 __le64 expected_write_size;
499 } __attribute__ ((packed)) alloc_hint; 516 } __attribute__ ((packed)) alloc_hint;
517 struct {
518 __le64 snapid;
519 __le64 src_version;
520 __u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */
521 /*
522 * CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags
523 * for src object, flags for dest object are in
524 * ceph_osd_op::flags.
525 */
526 __le32 src_fadvise_flags;
527 } __attribute__ ((packed)) copy_from;
500 }; 528 };
501 __le32 payload_len; 529 __le32 payload_len;
502} __attribute__ ((packed)); 530} __attribute__ ((packed));
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0a187196aeed..88e35830198c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
156/* Slab caches for frequently-allocated structures */ 156/* Slab caches for frequently-allocated structures */
157 157
158static struct kmem_cache *ceph_msg_cache; 158static struct kmem_cache *ceph_msg_cache;
159static struct kmem_cache *ceph_msg_data_cache;
160 159
161/* static tag bytes (protocol control messages) */ 160/* static tag bytes (protocol control messages) */
162static char tag_msg = CEPH_MSGR_TAG_MSG; 161static char tag_msg = CEPH_MSGR_TAG_MSG;
@@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void)
235 if (!ceph_msg_cache) 234 if (!ceph_msg_cache)
236 return -ENOMEM; 235 return -ENOMEM;
237 236
238 BUG_ON(ceph_msg_data_cache); 237 return 0;
239 ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
240 if (ceph_msg_data_cache)
241 return 0;
242
243 kmem_cache_destroy(ceph_msg_cache);
244 ceph_msg_cache = NULL;
245
246 return -ENOMEM;
247} 238}
248 239
249static void ceph_msgr_slab_exit(void) 240static void ceph_msgr_slab_exit(void)
250{ 241{
251 BUG_ON(!ceph_msg_data_cache);
252 kmem_cache_destroy(ceph_msg_data_cache);
253 ceph_msg_data_cache = NULL;
254
255 BUG_ON(!ceph_msg_cache); 242 BUG_ON(!ceph_msg_cache);
256 kmem_cache_destroy(ceph_msg_cache); 243 kmem_cache_destroy(ceph_msg_cache);
257 ceph_msg_cache = NULL; 244 ceph_msg_cache = NULL;
@@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1141static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) 1128static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1142{ 1129{
1143 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1130 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1144 struct ceph_msg_data *data;
1145 1131
1146 BUG_ON(!length); 1132 BUG_ON(!length);
1147 BUG_ON(length > msg->data_length); 1133 BUG_ON(length > msg->data_length);
1148 BUG_ON(list_empty(&msg->data)); 1134 BUG_ON(!msg->num_data_items);
1149 1135
1150 cursor->data_head = &msg->data;
1151 cursor->total_resid = length; 1136 cursor->total_resid = length;
1152 data = list_first_entry(&msg->data, struct ceph_msg_data, links); 1137 cursor->data = msg->data;
1153 cursor->data = data;
1154 1138
1155 __ceph_msg_data_cursor_init(cursor); 1139 __ceph_msg_data_cursor_init(cursor);
1156} 1140}
@@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1231 1215
1232 if (!cursor->resid && cursor->total_resid) { 1216 if (!cursor->resid && cursor->total_resid) {
1233 WARN_ON(!cursor->last_piece); 1217 WARN_ON(!cursor->last_piece);
1234 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); 1218 cursor->data++;
1235 cursor->data = list_next_entry(cursor->data, links);
1236 __ceph_msg_data_cursor_init(cursor); 1219 __ceph_msg_data_cursor_init(cursor);
1237 new_piece = true; 1220 new_piece = true;
1238 } 1221 }
@@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con)
1248 1231
1249static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 1232static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1250{ 1233{
1251 BUG_ON(!msg);
1252 BUG_ON(!data_len);
1253
1254 /* Initialize data cursor */ 1234 /* Initialize data cursor */
1255 1235
1256 ceph_msg_data_cursor_init(msg, (size_t)data_len); 1236 ceph_msg_data_cursor_init(msg, (size_t)data_len);
@@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con)
1590 1570
1591 dout("%s %p msg %p\n", __func__, con, msg); 1571 dout("%s %p msg %p\n", __func__, con, msg);
1592 1572
1593 if (list_empty(&msg->data)) 1573 if (!msg->num_data_items)
1594 return -EINVAL; 1574 return -EINVAL;
1595 1575
1596 /* 1576 /*
@@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
2347 u32 crc = 0; 2327 u32 crc = 0;
2348 int ret; 2328 int ret;
2349 2329
2350 BUG_ON(!msg); 2330 if (!msg->num_data_items)
2351 if (list_empty(&msg->data))
2352 return -EIO; 2331 return -EIO;
2353 2332
2354 if (do_datacrc) 2333 if (do_datacrc)
@@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con,
3256 return false; 3235 return false;
3257} 3236}
3258 3237
3259static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) 3238static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
3260{ 3239{
3261 struct ceph_msg_data *data; 3240 BUG_ON(msg->num_data_items >= msg->max_data_items);
3262 3241 return &msg->data[msg->num_data_items++];
3263 if (WARN_ON(!ceph_msg_data_type_valid(type)))
3264 return NULL;
3265
3266 data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
3267 if (!data)
3268 return NULL;
3269
3270 data->type = type;
3271 INIT_LIST_HEAD(&data->links);
3272
3273 return data;
3274} 3242}
3275 3243
3276static void ceph_msg_data_destroy(struct ceph_msg_data *data) 3244static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3277{ 3245{
3278 if (!data)
3279 return;
3280
3281 WARN_ON(!list_empty(&data->links));
3282 if (data->type == CEPH_MSG_DATA_PAGELIST) 3246 if (data->type == CEPH_MSG_DATA_PAGELIST)
3283 ceph_pagelist_release(data->pagelist); 3247 ceph_pagelist_release(data->pagelist);
3284 kmem_cache_free(ceph_msg_data_cache, data);
3285} 3248}
3286 3249
3287void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, 3250void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
@@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
3292 BUG_ON(!pages); 3255 BUG_ON(!pages);
3293 BUG_ON(!length); 3256 BUG_ON(!length);
3294 3257
3295 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); 3258 data = ceph_msg_data_add(msg);
3296 BUG_ON(!data); 3259 data->type = CEPH_MSG_DATA_PAGES;
3297 data->pages = pages; 3260 data->pages = pages;
3298 data->length = length; 3261 data->length = length;
3299 data->alignment = alignment & ~PAGE_MASK; 3262 data->alignment = alignment & ~PAGE_MASK;
3300 3263
3301 list_add_tail(&data->links, &msg->data);
3302 msg->data_length += length; 3264 msg->data_length += length;
3303} 3265}
3304EXPORT_SYMBOL(ceph_msg_data_add_pages); 3266EXPORT_SYMBOL(ceph_msg_data_add_pages);
@@ -3311,11 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
3311 BUG_ON(!pagelist); 3273 BUG_ON(!pagelist);
3312 BUG_ON(!pagelist->length); 3274 BUG_ON(!pagelist->length);
3313 3275
3314 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); 3276 data = ceph_msg_data_add(msg);
3315 BUG_ON(!data); 3277 data->type = CEPH_MSG_DATA_PAGELIST;
3278 refcount_inc(&pagelist->refcnt);
3316 data->pagelist = pagelist; 3279 data->pagelist = pagelist;
3317 3280
3318 list_add_tail(&data->links, &msg->data);
3319 msg->data_length += pagelist->length; 3281 msg->data_length += pagelist->length;
3320} 3282}
3321EXPORT_SYMBOL(ceph_msg_data_add_pagelist); 3283EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
@@ -3326,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
3326{ 3288{
3327 struct ceph_msg_data *data; 3289 struct ceph_msg_data *data;
3328 3290
3329 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); 3291 data = ceph_msg_data_add(msg);
3330 BUG_ON(!data); 3292 data->type = CEPH_MSG_DATA_BIO;
3331 data->bio_pos = *bio_pos; 3293 data->bio_pos = *bio_pos;
3332 data->bio_length = length; 3294 data->bio_length = length;
3333 3295
3334 list_add_tail(&data->links, &msg->data);
3335 msg->data_length += length; 3296 msg->data_length += length;
3336} 3297}
3337EXPORT_SYMBOL(ceph_msg_data_add_bio); 3298EXPORT_SYMBOL(ceph_msg_data_add_bio);
@@ -3342,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
3342{ 3303{
3343 struct ceph_msg_data *data; 3304 struct ceph_msg_data *data;
3344 3305
3345 data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); 3306 data = ceph_msg_data_add(msg);
3346 BUG_ON(!data); 3307 data->type = CEPH_MSG_DATA_BVECS;
3347 data->bvec_pos = *bvec_pos; 3308 data->bvec_pos = *bvec_pos;
3348 3309
3349 list_add_tail(&data->links, &msg->data);
3350 msg->data_length += bvec_pos->iter.bi_size; 3310 msg->data_length += bvec_pos->iter.bi_size;
3351} 3311}
3352EXPORT_SYMBOL(ceph_msg_data_add_bvecs); 3312EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
@@ -3355,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
3355 * construct a new message with given type, size 3315 * construct a new message with given type, size
3356 * the new msg has a ref count of 1. 3316 * the new msg has a ref count of 1.
3357 */ 3317 */
3358struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 3318struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
3359 bool can_fail) 3319 gfp_t flags, bool can_fail)
3360{ 3320{
3361 struct ceph_msg *m; 3321 struct ceph_msg *m;
3362 3322
@@ -3370,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3370 3330
3371 INIT_LIST_HEAD(&m->list_head); 3331 INIT_LIST_HEAD(&m->list_head);
3372 kref_init(&m->kref); 3332 kref_init(&m->kref);
3373 INIT_LIST_HEAD(&m->data);
3374 3333
3375 /* front */ 3334 /* front */
3376 if (front_len) { 3335 if (front_len) {
@@ -3385,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3385 } 3344 }
3386 m->front_alloc_len = m->front.iov_len = front_len; 3345 m->front_alloc_len = m->front.iov_len = front_len;
3387 3346
3347 if (max_data_items) {
3348 m->data = kmalloc_array(max_data_items, sizeof(*m->data),
3349 flags);
3350 if (!m->data)
3351 goto out2;
3352
3353 m->max_data_items = max_data_items;
3354 }
3355
3388 dout("ceph_msg_new %p front %d\n", m, front_len); 3356 dout("ceph_msg_new %p front %d\n", m, front_len);
3389 return m; 3357 return m;
3390 3358
@@ -3401,6 +3369,13 @@ out:
3401 } 3369 }
3402 return NULL; 3370 return NULL;
3403} 3371}
3372EXPORT_SYMBOL(ceph_msg_new2);
3373
3374struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3375 bool can_fail)
3376{
3377 return ceph_msg_new2(type, front_len, 0, flags, can_fail);
3378}
3404EXPORT_SYMBOL(ceph_msg_new); 3379EXPORT_SYMBOL(ceph_msg_new);
3405 3380
3406/* 3381/*
@@ -3496,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m)
3496{ 3471{
3497 dout("%s %p\n", __func__, m); 3472 dout("%s %p\n", __func__, m);
3498 kvfree(m->front.iov_base); 3473 kvfree(m->front.iov_base);
3474 kfree(m->data);
3499 kmem_cache_free(ceph_msg_cache, m); 3475 kmem_cache_free(ceph_msg_cache, m);
3500} 3476}
3501 3477
3502static void ceph_msg_release(struct kref *kref) 3478static void ceph_msg_release(struct kref *kref)
3503{ 3479{
3504 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3480 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3505 struct ceph_msg_data *data, *next; 3481 int i;
3506 3482
3507 dout("%s %p\n", __func__, m); 3483 dout("%s %p\n", __func__, m);
3508 WARN_ON(!list_empty(&m->list_head)); 3484 WARN_ON(!list_empty(&m->list_head));
@@ -3515,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref)
3515 m->middle = NULL; 3491 m->middle = NULL;
3516 } 3492 }
3517 3493
3518 list_for_each_entry_safe(data, next, &m->data, links) { 3494 for (i = 0; i < m->num_data_items; i++)
3519 list_del_init(&data->links); 3495 ceph_msg_data_destroy(&m->data[i]);
3520 ceph_msg_data_destroy(data);
3521 }
3522 m->data_length = 0;
3523 3496
3524 if (m->pool) 3497 if (m->pool)
3525 ceph_msgpool_put(m->pool, m); 3498 ceph_msgpool_put(m->pool, m);
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 72571535883f..e3ecb80cd182 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
14 struct ceph_msgpool *pool = arg; 14 struct ceph_msgpool *pool = arg;
15 struct ceph_msg *msg; 15 struct ceph_msg *msg;
16 16
17 msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); 17 msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items,
18 gfp_mask, true);
18 if (!msg) { 19 if (!msg) {
19 dout("msgpool_alloc %s failed\n", pool->name); 20 dout("msgpool_alloc %s failed\n", pool->name);
20 } else { 21 } else {
@@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg)
35} 36}
36 37
37int ceph_msgpool_init(struct ceph_msgpool *pool, int type, 38int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
38 int front_len, int size, bool blocking, const char *name) 39 int front_len, int max_data_items, int size,
40 const char *name)
39{ 41{
40 dout("msgpool %s init\n", name); 42 dout("msgpool %s init\n", name);
41 pool->type = type; 43 pool->type = type;
42 pool->front_len = front_len; 44 pool->front_len = front_len;
45 pool->max_data_items = max_data_items;
43 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); 46 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
44 if (!pool->pool) 47 if (!pool->pool)
45 return -ENOMEM; 48 return -ENOMEM;
@@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool)
53 mempool_destroy(pool->pool); 56 mempool_destroy(pool->pool);
54} 57}
55 58
56struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, 59struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
57 int front_len) 60 int max_data_items)
58{ 61{
59 struct ceph_msg *msg; 62 struct ceph_msg *msg;
60 63
61 if (front_len > pool->front_len) { 64 if (front_len > pool->front_len ||
62 dout("msgpool_get %s need front %d, pool size is %d\n", 65 max_data_items > pool->max_data_items) {
63 pool->name, front_len, pool->front_len); 66 pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n",
64 WARN_ON(1); 67 __func__, front_len, max_data_items, pool->name,
68 pool->front_len, pool->max_data_items);
69 WARN_ON_ONCE(1);
65 70
66 /* try to alloc a fresh message */ 71 /* try to alloc a fresh message */
67 return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); 72 return ceph_msg_new2(pool->type, front_len, max_data_items,
73 GFP_NOFS, false);
68 } 74 }
69 75
70 msg = mempool_alloc(pool->pool, GFP_NOFS); 76 msg = mempool_alloc(pool->pool, GFP_NOFS);
@@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
80 msg->front.iov_len = pool->front_len; 86 msg->front.iov_len = pool->front_len;
81 msg->hdr.front_len = cpu_to_le32(pool->front_len); 87 msg->hdr.front_len = cpu_to_le32(pool->front_len);
82 88
89 msg->data_length = 0;
90 msg->num_data_items = 0;
91
83 kref_init(&msg->kref); /* retake single ref */ 92 kref_init(&msg->kref); /* retake single ref */
84 mempool_free(msg, pool->pool); 93 mempool_free(msg, pool->pool);
85} 94}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 60934bd8796c..d23a9f81f3d7 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127} 127}
128 128
129/*
130 * Consumes @pages if @own_pages is true.
131 */
129static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 132static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
130 struct page **pages, u64 length, u32 alignment, 133 struct page **pages, u64 length, u32 alignment,
131 bool pages_from_pool, bool own_pages) 134 bool pages_from_pool, bool own_pages)
@@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
138 osd_data->own_pages = own_pages; 141 osd_data->own_pages = own_pages;
139} 142}
140 143
144/*
145 * Consumes a ref on @pagelist.
146 */
141static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 147static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
142 struct ceph_pagelist *pagelist) 148 struct ceph_pagelist *pagelist)
143{ 149{
@@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
362 num_pages = calc_pages_for((u64)osd_data->alignment, 368 num_pages = calc_pages_for((u64)osd_data->alignment,
363 (u64)osd_data->length); 369 (u64)osd_data->length);
364 ceph_release_page_vector(osd_data->pages, num_pages); 370 ceph_release_page_vector(osd_data->pages, num_pages);
371 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
372 ceph_pagelist_release(osd_data->pagelist);
365 } 373 }
366 ceph_osd_data_init(osd_data); 374 ceph_osd_data_init(osd_data);
367} 375}
@@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
402 case CEPH_OSD_OP_LIST_WATCHERS: 410 case CEPH_OSD_OP_LIST_WATCHERS:
403 ceph_osd_data_release(&op->list_watchers.response_data); 411 ceph_osd_data_release(&op->list_watchers.response_data);
404 break; 412 break;
413 case CEPH_OSD_OP_COPY_FROM:
414 ceph_osd_data_release(&op->copy_from.osd_data);
415 break;
405 default: 416 default:
406 break; 417 break;
407 } 418 }
@@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
606 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 617 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
607} 618}
608 619
609int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 620static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
621 int num_request_data_items,
622 int num_reply_data_items)
610{ 623{
611 struct ceph_osd_client *osdc = req->r_osdc; 624 struct ceph_osd_client *osdc = req->r_osdc;
612 struct ceph_msg *msg; 625 struct ceph_msg *msg;
613 int msg_size; 626 int msg_size;
614 627
628 WARN_ON(req->r_request || req->r_reply);
615 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 629 WARN_ON(ceph_oid_empty(&req->r_base_oid));
616 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 630 WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
617 631
@@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
633 msg_size += 4 + 8; /* retry_attempt, features */ 647 msg_size += 4 + 8; /* retry_attempt, features */
634 648
635 if (req->r_mempool) 649 if (req->r_mempool)
636 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 650 msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
651 num_request_data_items);
637 else 652 else
638 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 653 msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
654 num_request_data_items, gfp, true);
639 if (!msg) 655 if (!msg)
640 return -ENOMEM; 656 return -ENOMEM;
641 657
@@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
648 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 664 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
649 665
650 if (req->r_mempool) 666 if (req->r_mempool)
651 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 667 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
668 num_reply_data_items);
652 else 669 else
653 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 670 msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
671 num_reply_data_items, gfp, true);
654 if (!msg) 672 if (!msg)
655 return -ENOMEM; 673 return -ENOMEM;
656 674
@@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
658 676
659 return 0; 677 return 0;
660} 678}
661EXPORT_SYMBOL(ceph_osdc_alloc_messages);
662 679
663static bool osd_req_opcode_valid(u16 opcode) 680static bool osd_req_opcode_valid(u16 opcode)
664{ 681{
@@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
671 } 688 }
672} 689}
673 690
691static void get_num_data_items(struct ceph_osd_request *req,
692 int *num_request_data_items,
693 int *num_reply_data_items)
694{
695 struct ceph_osd_req_op *op;
696
697 *num_request_data_items = 0;
698 *num_reply_data_items = 0;
699
700 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
701 switch (op->op) {
702 /* request */
703 case CEPH_OSD_OP_WRITE:
704 case CEPH_OSD_OP_WRITEFULL:
705 case CEPH_OSD_OP_SETXATTR:
706 case CEPH_OSD_OP_CMPXATTR:
707 case CEPH_OSD_OP_NOTIFY_ACK:
708 case CEPH_OSD_OP_COPY_FROM:
709 *num_request_data_items += 1;
710 break;
711
712 /* reply */
713 case CEPH_OSD_OP_STAT:
714 case CEPH_OSD_OP_READ:
715 case CEPH_OSD_OP_LIST_WATCHERS:
716 *num_reply_data_items += 1;
717 break;
718
719 /* both */
720 case CEPH_OSD_OP_NOTIFY:
721 *num_request_data_items += 1;
722 *num_reply_data_items += 1;
723 break;
724 case CEPH_OSD_OP_CALL:
725 *num_request_data_items += 2;
726 *num_reply_data_items += 1;
727 break;
728
729 default:
730 WARN_ON(!osd_req_opcode_valid(op->op));
731 break;
732 }
733 }
734}
735
736/*
737 * oid, oloc and OSD op opcode(s) must be filled in before this function
738 * is called.
739 */
740int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
741{
742 int num_request_data_items, num_reply_data_items;
743
744 get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
745 return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
746 num_reply_data_items);
747}
748EXPORT_SYMBOL(ceph_osdc_alloc_messages);
749
674/* 750/*
675 * This is an osd op init function for opcodes that have no data or 751 * This is an osd op init function for opcodes that have no data or
676 * other information associated with them. It also serves as a 752 * other information associated with them. It also serves as a
@@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
767EXPORT_SYMBOL(osd_req_op_extent_dup_last); 843EXPORT_SYMBOL(osd_req_op_extent_dup_last);
768 844
769int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 845int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
770 u16 opcode, const char *class, const char *method) 846 const char *class, const char *method)
771{ 847{
772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 848 struct ceph_osd_req_op *op;
773 opcode, 0);
774 struct ceph_pagelist *pagelist; 849 struct ceph_pagelist *pagelist;
775 size_t payload_len = 0; 850 size_t payload_len = 0;
776 size_t size; 851 size_t size;
777 852
778 BUG_ON(opcode != CEPH_OSD_OP_CALL); 853 op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
779 854
780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 855 pagelist = ceph_pagelist_alloc(GFP_NOFS);
781 if (!pagelist) 856 if (!pagelist)
782 return -ENOMEM; 857 return -ENOMEM;
783 858
784 ceph_pagelist_init(pagelist);
785
786 op->cls.class_name = class; 859 op->cls.class_name = class;
787 size = strlen(class); 860 size = strlen(class);
788 BUG_ON(size > (size_t) U8_MAX); 861 BUG_ON(size > (size_t) U8_MAX);
@@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
815 888
816 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 889 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
817 890
818 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 891 pagelist = ceph_pagelist_alloc(GFP_NOFS);
819 if (!pagelist) 892 if (!pagelist)
820 return -ENOMEM; 893 return -ENOMEM;
821 894
822 ceph_pagelist_init(pagelist);
823
824 payload_len = strlen(name); 895 payload_len = strlen(name);
825 op->xattr.name_len = payload_len; 896 op->xattr.name_len = payload_len;
826 ceph_pagelist_append(pagelist, name, payload_len); 897 ceph_pagelist_append(pagelist, name, payload_len);
@@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
900static u32 osd_req_encode_op(struct ceph_osd_op *dst, 971static u32 osd_req_encode_op(struct ceph_osd_op *dst,
901 const struct ceph_osd_req_op *src) 972 const struct ceph_osd_req_op *src)
902{ 973{
903 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
904 pr_err("unrecognized osd opcode %d\n", src->op);
905
906 return 0;
907 }
908
909 switch (src->op) { 974 switch (src->op) {
910 case CEPH_OSD_OP_STAT: 975 case CEPH_OSD_OP_STAT:
911 break; 976 break;
@@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
955 case CEPH_OSD_OP_CREATE: 1020 case CEPH_OSD_OP_CREATE:
956 case CEPH_OSD_OP_DELETE: 1021 case CEPH_OSD_OP_DELETE:
957 break; 1022 break;
1023 case CEPH_OSD_OP_COPY_FROM:
1024 dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1025 dst->copy_from.src_version =
1026 cpu_to_le64(src->copy_from.src_version);
1027 dst->copy_from.flags = src->copy_from.flags;
1028 dst->copy_from.src_fadvise_flags =
1029 cpu_to_le32(src->copy_from.src_fadvise_flags);
1030 break;
958 default: 1031 default:
959 pr_err("unsupported osd opcode %s\n", 1032 pr_err("unsupported osd opcode %s\n",
960 ceph_osd_op_name(src->op)); 1033 ceph_osd_op_name(src->op));
@@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1038 if (flags & CEPH_OSD_FLAG_WRITE) 1111 if (flags & CEPH_OSD_FLAG_WRITE)
1039 req->r_data_offset = off; 1112 req->r_data_offset = off;
1040 1113
1041 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 1114 if (num_ops > 1)
1115 /*
1116 * This is a special case for ceph_writepages_start(), but it
1117 * also covers ceph_uninline_data(). If more multi-op request
1118 * use cases emerge, we will need a separate helper.
1119 */
1120 r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
1121 else
1122 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1042 if (r) 1123 if (r)
1043 goto fail; 1124 goto fail;
1044 1125
@@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req)
1845 return true; 1926 return true;
1846} 1927}
1847 1928
1848static void setup_request_data(struct ceph_osd_request *req, 1929/*
1849 struct ceph_msg *msg) 1930 * Keep get_num_data_items() in sync with this function.
1931 */
1932static void setup_request_data(struct ceph_osd_request *req)
1850{ 1933{
1851 u32 data_len = 0; 1934 struct ceph_msg *request_msg = req->r_request;
1852 int i; 1935 struct ceph_msg *reply_msg = req->r_reply;
1936 struct ceph_osd_req_op *op;
1853 1937
1854 if (!list_empty(&msg->data)) 1938 if (req->r_request->num_data_items || req->r_reply->num_data_items)
1855 return; 1939 return;
1856 1940
1857 WARN_ON(msg->data_length); 1941 WARN_ON(request_msg->data_length || reply_msg->data_length);
1858 for (i = 0; i < req->r_num_ops; i++) { 1942 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
1859 struct ceph_osd_req_op *op = &req->r_ops[i];
1860
1861 switch (op->op) { 1943 switch (op->op) {
1862 /* request */ 1944 /* request */
1863 case CEPH_OSD_OP_WRITE: 1945 case CEPH_OSD_OP_WRITE:
1864 case CEPH_OSD_OP_WRITEFULL: 1946 case CEPH_OSD_OP_WRITEFULL:
1865 WARN_ON(op->indata_len != op->extent.length); 1947 WARN_ON(op->indata_len != op->extent.length);
1866 ceph_osdc_msg_data_add(msg, &op->extent.osd_data); 1948 ceph_osdc_msg_data_add(request_msg,
1949 &op->extent.osd_data);
1867 break; 1950 break;
1868 case CEPH_OSD_OP_SETXATTR: 1951 case CEPH_OSD_OP_SETXATTR:
1869 case CEPH_OSD_OP_CMPXATTR: 1952 case CEPH_OSD_OP_CMPXATTR:
1870 WARN_ON(op->indata_len != op->xattr.name_len + 1953 WARN_ON(op->indata_len != op->xattr.name_len +
1871 op->xattr.value_len); 1954 op->xattr.value_len);
1872 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1955 ceph_osdc_msg_data_add(request_msg,
1956 &op->xattr.osd_data);
1873 break; 1957 break;
1874 case CEPH_OSD_OP_NOTIFY_ACK: 1958 case CEPH_OSD_OP_NOTIFY_ACK:
1875 ceph_osdc_msg_data_add(msg, 1959 ceph_osdc_msg_data_add(request_msg,
1876 &op->notify_ack.request_data); 1960 &op->notify_ack.request_data);
1877 break; 1961 break;
1962 case CEPH_OSD_OP_COPY_FROM:
1963 ceph_osdc_msg_data_add(request_msg,
1964 &op->copy_from.osd_data);
1965 break;
1878 1966
1879 /* reply */ 1967 /* reply */
1880 case CEPH_OSD_OP_STAT: 1968 case CEPH_OSD_OP_STAT:
1881 ceph_osdc_msg_data_add(req->r_reply, 1969 ceph_osdc_msg_data_add(reply_msg,
1882 &op->raw_data_in); 1970 &op->raw_data_in);
1883 break; 1971 break;
1884 case CEPH_OSD_OP_READ: 1972 case CEPH_OSD_OP_READ:
1885 ceph_osdc_msg_data_add(req->r_reply, 1973 ceph_osdc_msg_data_add(reply_msg,
1886 &op->extent.osd_data); 1974 &op->extent.osd_data);
1887 break; 1975 break;
1888 case CEPH_OSD_OP_LIST_WATCHERS: 1976 case CEPH_OSD_OP_LIST_WATCHERS:
1889 ceph_osdc_msg_data_add(req->r_reply, 1977 ceph_osdc_msg_data_add(reply_msg,
1890 &op->list_watchers.response_data); 1978 &op->list_watchers.response_data);
1891 break; 1979 break;
1892 1980
@@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req,
1895 WARN_ON(op->indata_len != op->cls.class_len + 1983 WARN_ON(op->indata_len != op->cls.class_len +
1896 op->cls.method_len + 1984 op->cls.method_len +
1897 op->cls.indata_len); 1985 op->cls.indata_len);
1898 ceph_osdc_msg_data_add(msg, &op->cls.request_info); 1986 ceph_osdc_msg_data_add(request_msg,
1987 &op->cls.request_info);
1899 /* optional, can be NONE */ 1988 /* optional, can be NONE */
1900 ceph_osdc_msg_data_add(msg, &op->cls.request_data); 1989 ceph_osdc_msg_data_add(request_msg,
1990 &op->cls.request_data);
1901 /* optional, can be NONE */ 1991 /* optional, can be NONE */
1902 ceph_osdc_msg_data_add(req->r_reply, 1992 ceph_osdc_msg_data_add(reply_msg,
1903 &op->cls.response_data); 1993 &op->cls.response_data);
1904 break; 1994 break;
1905 case CEPH_OSD_OP_NOTIFY: 1995 case CEPH_OSD_OP_NOTIFY:
1906 ceph_osdc_msg_data_add(msg, 1996 ceph_osdc_msg_data_add(request_msg,
1907 &op->notify.request_data); 1997 &op->notify.request_data);
1908 ceph_osdc_msg_data_add(req->r_reply, 1998 ceph_osdc_msg_data_add(reply_msg,
1909 &op->notify.response_data); 1999 &op->notify.response_data);
1910 break; 2000 break;
1911 } 2001 }
1912
1913 data_len += op->indata_len;
1914 } 2002 }
1915
1916 WARN_ON(data_len != msg->data_length);
1917} 2003}
1918 2004
1919static void encode_pgid(void **p, const struct ceph_pg *pgid) 2005static void encode_pgid(void **p, const struct ceph_pg *pgid)
@@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req,
1961 req->r_data_offset || req->r_snapc); 2047 req->r_data_offset || req->r_snapc);
1962 } 2048 }
1963 2049
1964 setup_request_data(req, msg); 2050 setup_request_data(req);
1965 2051
1966 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 2052 encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1967 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 2053 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
@@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
3001 struct ceph_osd_client *osdc = lreq->osdc; 3087 struct ceph_osd_client *osdc = lreq->osdc;
3002 struct ceph_osd *osd; 3088 struct ceph_osd *osd;
3003 3089
3090 down_write(&osdc->lock);
3091 linger_register(lreq);
3092 if (lreq->is_watch) {
3093 lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
3094 lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
3095 } else {
3096 lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
3097 }
3098
3004 calc_target(osdc, &lreq->t, NULL, false); 3099 calc_target(osdc, &lreq->t, NULL, false);
3005 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3100 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3006 link_linger(osd, lreq); 3101 link_linger(osd, lreq);
3007 3102
3008 send_linger(lreq); 3103 send_linger(lreq);
3104 up_write(&osdc->lock);
3009} 3105}
3010 3106
3011static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 3107static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
@@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
4318 lreq->notify_id, notify_id); 4414 lreq->notify_id, notify_id);
4319 } else if (!completion_done(&lreq->notify_finish_wait)) { 4415 } else if (!completion_done(&lreq->notify_finish_wait)) {
4320 struct ceph_msg_data *data = 4416 struct ceph_msg_data *data =
4321 list_first_entry_or_null(&msg->data, 4417 msg->num_data_items ? &msg->data[0] : NULL;
4322 struct ceph_msg_data,
4323 links);
4324 4418
4325 if (data) { 4419 if (data) {
4326 if (lreq->preply_pages) { 4420 if (lreq->preply_pages) {
@@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
4476 4570
4477 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4571 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4478 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4572 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4573 return req;
4574}
4575
4576static struct ceph_osd_request *
4577alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
4578{
4579 struct ceph_osd_request *req;
4580
4581 req = alloc_linger_request(lreq);
4582 if (!req)
4583 return NULL;
4584
4585 /*
4586 * Pass 0 for cookie because we don't know it yet, it will be
4587 * filled in by linger_submit().
4588 */
4589 osd_req_op_watch_init(req, 0, 0, watch_opcode);
4479 4590
4480 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 4591 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4481 ceph_osdc_put_request(req); 4592 ceph_osdc_put_request(req);
@@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
4514 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4625 lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4515 ktime_get_real_ts64(&lreq->mtime); 4626 ktime_get_real_ts64(&lreq->mtime);
4516 4627
4517 lreq->reg_req = alloc_linger_request(lreq); 4628 lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
4518 if (!lreq->reg_req) { 4629 if (!lreq->reg_req) {
4519 ret = -ENOMEM; 4630 ret = -ENOMEM;
4520 goto err_put_lreq; 4631 goto err_put_lreq;
4521 } 4632 }
4522 4633
4523 lreq->ping_req = alloc_linger_request(lreq); 4634 lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
4524 if (!lreq->ping_req) { 4635 if (!lreq->ping_req) {
4525 ret = -ENOMEM; 4636 ret = -ENOMEM;
4526 goto err_put_lreq; 4637 goto err_put_lreq;
4527 } 4638 }
4528 4639
4529 down_write(&osdc->lock);
4530 linger_register(lreq); /* before osd_req_op_* */
4531 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
4532 CEPH_OSD_WATCH_OP_WATCH);
4533 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
4534 CEPH_OSD_WATCH_OP_PING);
4535 linger_submit(lreq); 4640 linger_submit(lreq);
4536 up_write(&osdc->lock);
4537
4538 ret = linger_reg_commit_wait(lreq); 4641 ret = linger_reg_commit_wait(lreq);
4539 if (ret) { 4642 if (ret) {
4540 linger_cancel(lreq); 4643 linger_cancel(lreq);
@@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4599 4702
4600 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4703 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4601 4704
4602 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4705 pl = ceph_pagelist_alloc(GFP_NOIO);
4603 if (!pl) 4706 if (!pl)
4604 return -ENOMEM; 4707 return -ENOMEM;
4605 4708
4606 ceph_pagelist_init(pl);
4607 ret = ceph_pagelist_encode_64(pl, notify_id); 4709 ret = ceph_pagelist_encode_64(pl, notify_id);
4608 ret |= ceph_pagelist_encode_64(pl, cookie); 4710 ret |= ceph_pagelist_encode_64(pl, cookie);
4609 if (payload) { 4711 if (payload) {
@@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4641 ceph_oloc_copy(&req->r_base_oloc, oloc); 4743 ceph_oloc_copy(&req->r_base_oloc, oloc);
4642 req->r_flags = CEPH_OSD_FLAG_READ; 4744 req->r_flags = CEPH_OSD_FLAG_READ;
4643 4745
4644 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4746 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4747 payload_len);
4645 if (ret) 4748 if (ret)
4646 goto out_put_req; 4749 goto out_put_req;
4647 4750
4648 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4751 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4649 payload_len);
4650 if (ret) 4752 if (ret)
4651 goto out_put_req; 4753 goto out_put_req;
4652 4754
@@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4670 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 4772 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4671 op->notify.cookie = cookie; 4773 op->notify.cookie = cookie;
4672 4774
4673 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4775 pl = ceph_pagelist_alloc(GFP_NOIO);
4674 if (!pl) 4776 if (!pl)
4675 return -ENOMEM; 4777 return -ENOMEM;
4676 4778
4677 ceph_pagelist_init(pl);
4678 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 4779 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4679 ret |= ceph_pagelist_encode_32(pl, timeout); 4780 ret |= ceph_pagelist_encode_32(pl, timeout);
4680 ret |= ceph_pagelist_encode_32(pl, payload_len); 4781 ret |= ceph_pagelist_encode_32(pl, payload_len);
@@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
4733 goto out_put_lreq; 4834 goto out_put_lreq;
4734 } 4835 }
4735 4836
4837 /*
4838 * Pass 0 for cookie because we don't know it yet, it will be
4839 * filled in by linger_submit().
4840 */
4841 ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
4842 payload, payload_len);
4843 if (ret)
4844 goto out_put_lreq;
4845
4736 /* for notify_id */ 4846 /* for notify_id */
4737 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4847 pages = ceph_alloc_page_vector(1, GFP_NOIO);
4738 if (IS_ERR(pages)) { 4848 if (IS_ERR(pages)) {
4739 ret = PTR_ERR(pages); 4849 ret = PTR_ERR(pages);
4740 goto out_put_lreq; 4850 goto out_put_lreq;
4741 } 4851 }
4742
4743 down_write(&osdc->lock);
4744 linger_register(lreq); /* before osd_req_op_* */
4745 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4746 timeout, payload, payload_len);
4747 if (ret) {
4748 linger_unregister(lreq);
4749 up_write(&osdc->lock);
4750 ceph_release_page_vector(pages, 1);
4751 goto out_put_lreq;
4752 }
4753 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 4852 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4754 response_data), 4853 response_data),
4755 pages, PAGE_SIZE, 0, false, true); 4854 pages, PAGE_SIZE, 0, false, true);
4756 linger_submit(lreq);
4757 up_write(&osdc->lock);
4758 4855
4856 ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
4857 if (ret)
4858 goto out_put_lreq;
4859
4860 linger_submit(lreq);
4759 ret = linger_reg_commit_wait(lreq); 4861 ret = linger_reg_commit_wait(lreq);
4760 if (!ret) 4862 if (!ret)
4761 ret = linger_notify_finish_wait(lreq); 4863 ret = linger_notify_finish_wait(lreq);
@@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4881 ceph_oloc_copy(&req->r_base_oloc, oloc); 4983 ceph_oloc_copy(&req->r_base_oloc, oloc);
4882 req->r_flags = CEPH_OSD_FLAG_READ; 4984 req->r_flags = CEPH_OSD_FLAG_READ;
4883 4985
4884 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4885 if (ret)
4886 goto out_put_req;
4887
4888 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4986 pages = ceph_alloc_page_vector(1, GFP_NOIO);
4889 if (IS_ERR(pages)) { 4987 if (IS_ERR(pages)) {
4890 ret = PTR_ERR(pages); 4988 ret = PTR_ERR(pages);
@@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4896 response_data), 4994 response_data),
4897 pages, PAGE_SIZE, 0, false, true); 4995 pages, PAGE_SIZE, 0, false, true);
4898 4996
4997 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4998 if (ret)
4999 goto out_put_req;
5000
4899 ceph_osdc_start_request(osdc, req, false); 5001 ceph_osdc_start_request(osdc, req, false);
4900 ret = ceph_osdc_wait_request(osdc, req); 5002 ret = ceph_osdc_wait_request(osdc, req);
4901 if (ret >= 0) { 5003 if (ret >= 0) {
@@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4958 ceph_oloc_copy(&req->r_base_oloc, oloc); 5060 ceph_oloc_copy(&req->r_base_oloc, oloc);
4959 req->r_flags = flags; 5061 req->r_flags = flags;
4960 5062
4961 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 5063 ret = osd_req_op_cls_init(req, 0, class, method);
4962 if (ret)
4963 goto out_put_req;
4964
4965 ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4966 if (ret) 5064 if (ret)
4967 goto out_put_req; 5065 goto out_put_req;
4968 5066
@@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
4973 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 5071 osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4974 *resp_len, 0, false, false); 5072 *resp_len, 0, false, false);
4975 5073
5074 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5075 if (ret)
5076 goto out_put_req;
5077
4976 ceph_osdc_start_request(osdc, req, false); 5078 ceph_osdc_start_request(osdc, req, false);
4977 ret = ceph_osdc_wait_request(osdc, req); 5079 ret = ceph_osdc_wait_request(osdc, req);
4978 if (ret >= 0) { 5080 if (ret >= 0) {
@@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5021 goto out_map; 5123 goto out_map;
5022 5124
5023 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 5125 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5024 PAGE_SIZE, 10, true, "osd_op"); 5126 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5025 if (err < 0) 5127 if (err < 0)
5026 goto out_mempool; 5128 goto out_mempool;
5027 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 5129 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5028 PAGE_SIZE, 10, true, "osd_op_reply"); 5130 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5131 "osd_op_reply");
5029 if (err < 0) 5132 if (err < 0)
5030 goto out_msgpool; 5133 goto out_msgpool;
5031 5134
@@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
5168} 5271}
5169EXPORT_SYMBOL(ceph_osdc_writepages); 5272EXPORT_SYMBOL(ceph_osdc_writepages);
5170 5273
5274static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5275 u64 src_snapid, u64 src_version,
5276 struct ceph_object_id *src_oid,
5277 struct ceph_object_locator *src_oloc,
5278 u32 src_fadvise_flags,
5279 u32 dst_fadvise_flags,
5280 u8 copy_from_flags)
5281{
5282 struct ceph_osd_req_op *op;
5283 struct page **pages;
5284 void *p, *end;
5285
5286 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5287 if (IS_ERR(pages))
5288 return PTR_ERR(pages);
5289
5290 op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
5291 op->copy_from.snapid = src_snapid;
5292 op->copy_from.src_version = src_version;
5293 op->copy_from.flags = copy_from_flags;
5294 op->copy_from.src_fadvise_flags = src_fadvise_flags;
5295
5296 p = page_address(pages[0]);
5297 end = p + PAGE_SIZE;
5298 ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5299 encode_oloc(&p, end, src_oloc);
5300 op->indata_len = PAGE_SIZE - (end - p);
5301
5302 ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5303 op->indata_len, 0, false, true);
5304 return 0;
5305}
5306
5307int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
5308 u64 src_snapid, u64 src_version,
5309 struct ceph_object_id *src_oid,
5310 struct ceph_object_locator *src_oloc,
5311 u32 src_fadvise_flags,
5312 struct ceph_object_id *dst_oid,
5313 struct ceph_object_locator *dst_oloc,
5314 u32 dst_fadvise_flags,
5315 u8 copy_from_flags)
5316{
5317 struct ceph_osd_request *req;
5318 int ret;
5319
5320 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
5321 if (!req)
5322 return -ENOMEM;
5323
5324 req->r_flags = CEPH_OSD_FLAG_WRITE;
5325
5326 ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
5327 ceph_oid_copy(&req->r_t.base_oid, dst_oid);
5328
5329 ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
5330 src_oloc, src_fadvise_flags,
5331 dst_fadvise_flags, copy_from_flags);
5332 if (ret)
5333 goto out;
5334
5335 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
5336 if (ret)
5337 goto out;
5338
5339 ceph_osdc_start_request(osdc, req, false);
5340 ret = ceph_osdc_wait_request(osdc, req);
5341
5342out:
5343 ceph_osdc_put_request(req);
5344 return ret;
5345}
5346EXPORT_SYMBOL(ceph_osdc_copy_from);
5347
5171int __init ceph_osdc_setup(void) 5348int __init ceph_osdc_setup(void)
5172{ 5349{
5173 size_t size = sizeof(struct ceph_osd_request) + 5350 size_t size = sizeof(struct ceph_osd_request) +
@@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5295 u32 front_len = le32_to_cpu(hdr->front_len); 5472 u32 front_len = le32_to_cpu(hdr->front_len);
5296 u32 data_len = le32_to_cpu(hdr->data_len); 5473 u32 data_len = le32_to_cpu(hdr->data_len);
5297 5474
5298 m = ceph_msg_new(type, front_len, GFP_NOIO, false); 5475 m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5299 if (!m) 5476 if (!m)
5300 return NULL; 5477 return NULL;
5301 5478
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
index 2ea0564771d2..65e34f78b05d 100644
--- a/net/ceph/pagelist.c
+++ b/net/ceph/pagelist.c
@@ -6,6 +6,26 @@
6#include <linux/highmem.h> 6#include <linux/highmem.h>
7#include <linux/ceph/pagelist.h> 7#include <linux/ceph/pagelist.h>
8 8
9struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags)
10{
11 struct ceph_pagelist *pl;
12
13 pl = kmalloc(sizeof(*pl), gfp_flags);
14 if (!pl)
15 return NULL;
16
17 INIT_LIST_HEAD(&pl->head);
18 pl->mapped_tail = NULL;
19 pl->length = 0;
20 pl->room = 0;
21 INIT_LIST_HEAD(&pl->free_list);
22 pl->num_pages_free = 0;
23 refcount_set(&pl->refcnt, 1);
24
25 return pl;
26}
27EXPORT_SYMBOL(ceph_pagelist_alloc);
28
9static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl) 29static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
10{ 30{
11 if (pl->mapped_tail) { 31 if (pl->mapped_tail) {