aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-05-06 16:11:19 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2013-05-06 16:11:19 -0400
commit91f8575685e35f3bd021286bc82d26397458f5a9 (patch)
tree09de8d889758a12071adb9427ed741e27c907aa6 /net
parent2e378f3eebd28feefbb1f9953834a5a19482f053 (diff)
parentb5b09be30cf99f9c699e825629f02e3bce555d44 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Alex Elder: "This is a big pull. Most of it is culmination of Alex's work to implement RBD image layering, which is now complete (yay!). There is also some work from Yan to fix i_mutex behavior surrounding writes in cephfs, a sync write fix, a fix for RBD images that get resized while they are mapped, and a few patches from me that resolve annoying auth warnings and fix several bugs in the ceph auth code." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (254 commits) rbd: fix image request leak on parent read libceph: use slab cache for osd client requests libceph: allocate ceph message data with a slab allocator libceph: allocate ceph messages with a slab allocator rbd: allocate image object names with a slab allocator rbd: allocate object requests with a slab allocator rbd: allocate name separate from obj_request rbd: allocate image requests with a slab allocator rbd: use binary search for snapshot lookup rbd: clear EXISTS flag if mapped snapshot disappears rbd: kill off the snapshot list rbd: define rbd_snap_size() and rbd_snap_features() rbd: use snap_id not index to look up snap info rbd: look up snapshot name in names buffer rbd: drop obj_request->version rbd: drop rbd_obj_method_sync() version parameter rbd: more version parameter removal rbd: get rid of some version parameters rbd: stop tracking header object version rbd: snap names are pointer to constant data ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/Makefile2
-rw-r--r--net/ceph/auth.c117
-rw-r--r--net/ceph/auth_x.c24
-rw-r--r--net/ceph/auth_x.h1
-rw-r--r--net/ceph/ceph_common.c7
-rw-r--r--net/ceph/debugfs.c4
-rw-r--r--net/ceph/messenger.c1019
-rw-r--r--net/ceph/mon_client.c7
-rw-r--r--net/ceph/osd_client.c1087
-rw-r--r--net/ceph/osdmap.c45
-rw-r--r--net/ceph/snapshot.c78
11 files changed, 1665 insertions, 726 deletions
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index e87ef435e11b..958d9856912c 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -11,5 +11,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
11 crypto.o armor.o \ 11 crypto.o armor.o \
12 auth_x.o \ 12 auth_x.o \
13 ceph_fs.o ceph_strings.o ceph_hash.o \ 13 ceph_fs.o ceph_strings.o ceph_hash.o \
14 pagevec.o 14 pagevec.o snapshot.o
15 15
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index b4bf4ac090f1..6b923bcaa2a4 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -47,6 +47,7 @@ struct ceph_auth_client *ceph_auth_init(const char *name, const struct ceph_cryp
47 if (!ac) 47 if (!ac)
48 goto out; 48 goto out;
49 49
50 mutex_init(&ac->mutex);
50 ac->negotiating = true; 51 ac->negotiating = true;
51 if (name) 52 if (name)
52 ac->name = name; 53 ac->name = name;
@@ -73,10 +74,12 @@ void ceph_auth_destroy(struct ceph_auth_client *ac)
73 */ 74 */
74void ceph_auth_reset(struct ceph_auth_client *ac) 75void ceph_auth_reset(struct ceph_auth_client *ac)
75{ 76{
77 mutex_lock(&ac->mutex);
76 dout("auth_reset %p\n", ac); 78 dout("auth_reset %p\n", ac);
77 if (ac->ops && !ac->negotiating) 79 if (ac->ops && !ac->negotiating)
78 ac->ops->reset(ac); 80 ac->ops->reset(ac);
79 ac->negotiating = true; 81 ac->negotiating = true;
82 mutex_unlock(&ac->mutex);
80} 83}
81 84
82int ceph_entity_name_encode(const char *name, void **p, void *end) 85int ceph_entity_name_encode(const char *name, void **p, void *end)
@@ -102,6 +105,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
102 int i, num; 105 int i, num;
103 int ret; 106 int ret;
104 107
108 mutex_lock(&ac->mutex);
105 dout("auth_build_hello\n"); 109 dout("auth_build_hello\n");
106 monhdr->have_version = 0; 110 monhdr->have_version = 0;
107 monhdr->session_mon = cpu_to_le16(-1); 111 monhdr->session_mon = cpu_to_le16(-1);
@@ -122,15 +126,19 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
122 126
123 ret = ceph_entity_name_encode(ac->name, &p, end); 127 ret = ceph_entity_name_encode(ac->name, &p, end);
124 if (ret < 0) 128 if (ret < 0)
125 return ret; 129 goto out;
126 ceph_decode_need(&p, end, sizeof(u64), bad); 130 ceph_decode_need(&p, end, sizeof(u64), bad);
127 ceph_encode_64(&p, ac->global_id); 131 ceph_encode_64(&p, ac->global_id);
128 132
129 ceph_encode_32(&lenp, p - lenp - sizeof(u32)); 133 ceph_encode_32(&lenp, p - lenp - sizeof(u32));
130 return p - buf; 134 ret = p - buf;
135out:
136 mutex_unlock(&ac->mutex);
137 return ret;
131 138
132bad: 139bad:
133 return -ERANGE; 140 ret = -ERANGE;
141 goto out;
134} 142}
135 143
136static int ceph_build_auth_request(struct ceph_auth_client *ac, 144static int ceph_build_auth_request(struct ceph_auth_client *ac,
@@ -151,11 +159,13 @@ static int ceph_build_auth_request(struct ceph_auth_client *ac,
151 if (ret < 0) { 159 if (ret < 0) {
152 pr_err("error %d building auth method %s request\n", ret, 160 pr_err("error %d building auth method %s request\n", ret,
153 ac->ops->name); 161 ac->ops->name);
154 return ret; 162 goto out;
155 } 163 }
156 dout(" built request %d bytes\n", ret); 164 dout(" built request %d bytes\n", ret);
157 ceph_encode_32(&p, ret); 165 ceph_encode_32(&p, ret);
158 return p + ret - msg_buf; 166 ret = p + ret - msg_buf;
167out:
168 return ret;
159} 169}
160 170
161/* 171/*
@@ -176,6 +186,7 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
176 int result_msg_len; 186 int result_msg_len;
177 int ret = -EINVAL; 187 int ret = -EINVAL;
178 188
189 mutex_lock(&ac->mutex);
179 dout("handle_auth_reply %p %p\n", p, end); 190 dout("handle_auth_reply %p %p\n", p, end);
180 ceph_decode_need(&p, end, sizeof(u32) * 3 + sizeof(u64), bad); 191 ceph_decode_need(&p, end, sizeof(u32) * 3 + sizeof(u64), bad);
181 protocol = ceph_decode_32(&p); 192 protocol = ceph_decode_32(&p);
@@ -227,33 +238,103 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
227 238
228 ret = ac->ops->handle_reply(ac, result, payload, payload_end); 239 ret = ac->ops->handle_reply(ac, result, payload, payload_end);
229 if (ret == -EAGAIN) { 240 if (ret == -EAGAIN) {
230 return ceph_build_auth_request(ac, reply_buf, reply_len); 241 ret = ceph_build_auth_request(ac, reply_buf, reply_len);
231 } else if (ret) { 242 } else if (ret) {
232 pr_err("auth method '%s' error %d\n", ac->ops->name, ret); 243 pr_err("auth method '%s' error %d\n", ac->ops->name, ret);
233 return ret;
234 } 244 }
235 return 0;
236 245
237bad:
238 pr_err("failed to decode auth msg\n");
239out: 246out:
247 mutex_unlock(&ac->mutex);
240 return ret; 248 return ret;
249
250bad:
251 pr_err("failed to decode auth msg\n");
252 ret = -EINVAL;
253 goto out;
241} 254}
242 255
243int ceph_build_auth(struct ceph_auth_client *ac, 256int ceph_build_auth(struct ceph_auth_client *ac,
244 void *msg_buf, size_t msg_len) 257 void *msg_buf, size_t msg_len)
245{ 258{
259 int ret = 0;
260
261 mutex_lock(&ac->mutex);
246 if (!ac->protocol) 262 if (!ac->protocol)
247 return ceph_auth_build_hello(ac, msg_buf, msg_len); 263 ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
248 BUG_ON(!ac->ops); 264 else if (ac->ops->should_authenticate(ac))
249 if (ac->ops->should_authenticate(ac)) 265 ret = ceph_build_auth_request(ac, msg_buf, msg_len);
250 return ceph_build_auth_request(ac, msg_buf, msg_len); 266 mutex_unlock(&ac->mutex);
251 return 0; 267 return ret;
252} 268}
253 269
254int ceph_auth_is_authenticated(struct ceph_auth_client *ac) 270int ceph_auth_is_authenticated(struct ceph_auth_client *ac)
255{ 271{
256 if (!ac->ops) 272 int ret = 0;
257 return 0; 273
258 return ac->ops->is_authenticated(ac); 274 mutex_lock(&ac->mutex);
275 if (ac->ops)
276 ret = ac->ops->is_authenticated(ac);
277 mutex_unlock(&ac->mutex);
278 return ret;
279}
280EXPORT_SYMBOL(ceph_auth_is_authenticated);
281
282int ceph_auth_create_authorizer(struct ceph_auth_client *ac,
283 int peer_type,
284 struct ceph_auth_handshake *auth)
285{
286 int ret = 0;
287
288 mutex_lock(&ac->mutex);
289 if (ac->ops && ac->ops->create_authorizer)
290 ret = ac->ops->create_authorizer(ac, peer_type, auth);
291 mutex_unlock(&ac->mutex);
292 return ret;
293}
294EXPORT_SYMBOL(ceph_auth_create_authorizer);
295
296void ceph_auth_destroy_authorizer(struct ceph_auth_client *ac,
297 struct ceph_authorizer *a)
298{
299 mutex_lock(&ac->mutex);
300 if (ac->ops && ac->ops->destroy_authorizer)
301 ac->ops->destroy_authorizer(ac, a);
302 mutex_unlock(&ac->mutex);
303}
304EXPORT_SYMBOL(ceph_auth_destroy_authorizer);
305
306int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
307 int peer_type,
308 struct ceph_auth_handshake *a)
309{
310 int ret = 0;
311
312 mutex_lock(&ac->mutex);
313 if (ac->ops && ac->ops->update_authorizer)
314 ret = ac->ops->update_authorizer(ac, peer_type, a);
315 mutex_unlock(&ac->mutex);
316 return ret;
317}
318EXPORT_SYMBOL(ceph_auth_update_authorizer);
319
320int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
321 struct ceph_authorizer *a, size_t len)
322{
323 int ret = 0;
324
325 mutex_lock(&ac->mutex);
326 if (ac->ops && ac->ops->verify_authorizer_reply)
327 ret = ac->ops->verify_authorizer_reply(ac, a, len);
328 mutex_unlock(&ac->mutex);
329 return ret;
330}
331EXPORT_SYMBOL(ceph_auth_verify_authorizer_reply);
332
333void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac, int peer_type)
334{
335 mutex_lock(&ac->mutex);
336 if (ac->ops && ac->ops->invalidate_authorizer)
337 ac->ops->invalidate_authorizer(ac, peer_type);
338 mutex_unlock(&ac->mutex);
259} 339}
340EXPORT_SYMBOL(ceph_auth_invalidate_authorizer);
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index a16bf14eb027..96238ba95f2b 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -298,6 +298,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
298 return -ENOMEM; 298 return -ENOMEM;
299 } 299 }
300 au->service = th->service; 300 au->service = th->service;
301 au->secret_id = th->secret_id;
301 302
302 msg_a = au->buf->vec.iov_base; 303 msg_a = au->buf->vec.iov_base;
303 msg_a->struct_v = 1; 304 msg_a->struct_v = 1;
@@ -555,6 +556,26 @@ static int ceph_x_create_authorizer(
555 return 0; 556 return 0;
556} 557}
557 558
559static int ceph_x_update_authorizer(
560 struct ceph_auth_client *ac, int peer_type,
561 struct ceph_auth_handshake *auth)
562{
563 struct ceph_x_authorizer *au;
564 struct ceph_x_ticket_handler *th;
565
566 th = get_ticket_handler(ac, peer_type);
567 if (IS_ERR(th))
568 return PTR_ERR(th);
569
570 au = (struct ceph_x_authorizer *)auth->authorizer;
571 if (au->secret_id < th->secret_id) {
572 dout("ceph_x_update_authorizer service %u secret %llu < %llu\n",
573 au->service, au->secret_id, th->secret_id);
574 return ceph_x_build_authorizer(ac, th, au);
575 }
576 return 0;
577}
578
558static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac, 579static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
559 struct ceph_authorizer *a, size_t len) 580 struct ceph_authorizer *a, size_t len)
560{ 581{
@@ -630,7 +651,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
630 651
631 th = get_ticket_handler(ac, peer_type); 652 th = get_ticket_handler(ac, peer_type);
632 if (!IS_ERR(th)) 653 if (!IS_ERR(th))
633 remove_ticket_handler(ac, th); 654 memset(&th->validity, 0, sizeof(th->validity));
634} 655}
635 656
636 657
@@ -641,6 +662,7 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
641 .build_request = ceph_x_build_request, 662 .build_request = ceph_x_build_request,
642 .handle_reply = ceph_x_handle_reply, 663 .handle_reply = ceph_x_handle_reply,
643 .create_authorizer = ceph_x_create_authorizer, 664 .create_authorizer = ceph_x_create_authorizer,
665 .update_authorizer = ceph_x_update_authorizer,
644 .verify_authorizer_reply = ceph_x_verify_authorizer_reply, 666 .verify_authorizer_reply = ceph_x_verify_authorizer_reply,
645 .destroy_authorizer = ceph_x_destroy_authorizer, 667 .destroy_authorizer = ceph_x_destroy_authorizer,
646 .invalidate_authorizer = ceph_x_invalidate_authorizer, 668 .invalidate_authorizer = ceph_x_invalidate_authorizer,
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index f459e93b774f..c5a058da7ac8 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -29,6 +29,7 @@ struct ceph_x_authorizer {
29 struct ceph_buffer *buf; 29 struct ceph_buffer *buf;
30 unsigned int service; 30 unsigned int service;
31 u64 nonce; 31 u64 nonce;
32 u64 secret_id;
32 char reply_buf[128]; /* big enough for encrypted blob */ 33 char reply_buf[128]; /* big enough for encrypted blob */
33}; 34};
34 35
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index e65e6e4be38b..34b11ee8124e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -606,11 +606,17 @@ static int __init init_ceph_lib(void)
606 if (ret < 0) 606 if (ret < 0)
607 goto out_crypto; 607 goto out_crypto;
608 608
609 ret = ceph_osdc_setup();
610 if (ret < 0)
611 goto out_msgr;
612
609 pr_info("loaded (mon/osd proto %d/%d)\n", 613 pr_info("loaded (mon/osd proto %d/%d)\n",
610 CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL); 614 CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL);
611 615
612 return 0; 616 return 0;
613 617
618out_msgr:
619 ceph_msgr_exit();
614out_crypto: 620out_crypto:
615 ceph_crypto_shutdown(); 621 ceph_crypto_shutdown();
616out_debugfs: 622out_debugfs:
@@ -622,6 +628,7 @@ out:
622static void __exit exit_ceph_lib(void) 628static void __exit exit_ceph_lib(void)
623{ 629{
624 dout("exit_ceph_lib\n"); 630 dout("exit_ceph_lib\n");
631 ceph_osdc_cleanup();
625 ceph_msgr_exit(); 632 ceph_msgr_exit();
626 ceph_crypto_shutdown(); 633 ceph_crypto_shutdown();
627 ceph_debugfs_cleanup(); 634 ceph_debugfs_cleanup();
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 00d051f4894e..83661cdc0766 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -123,8 +123,8 @@ static int osdc_show(struct seq_file *s, void *pp)
123 mutex_lock(&osdc->request_mutex); 123 mutex_lock(&osdc->request_mutex);
124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
125 struct ceph_osd_request *req; 125 struct ceph_osd_request *req;
126 unsigned int i;
126 int opcode; 127 int opcode;
127 int i;
128 128
129 req = rb_entry(p, struct ceph_osd_request, r_node); 129 req = rb_entry(p, struct ceph_osd_request, r_node);
130 130
@@ -142,7 +142,7 @@ static int osdc_show(struct seq_file *s, void *pp)
142 seq_printf(s, "\t"); 142 seq_printf(s, "\t");
143 143
144 for (i = 0; i < req->r_num_ops; i++) { 144 for (i = 0; i < req->r_num_ops; i++) {
145 opcode = le16_to_cpu(req->r_request_ops[i].op); 145 opcode = req->r_ops[i].op;
146 seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); 146 seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
147 } 147 }
148 148
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 2c0669fb54e3..eb0a46a49bd4 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -21,6 +21,9 @@
21#include <linux/ceph/pagelist.h> 21#include <linux/ceph/pagelist.h>
22#include <linux/export.h> 22#include <linux/export.h>
23 23
24#define list_entry_next(pos, member) \
25 list_entry(pos->member.next, typeof(*pos), member)
26
24/* 27/*
25 * Ceph uses the messenger to exchange ceph_msg messages with other 28 * Ceph uses the messenger to exchange ceph_msg messages with other
26 * hosts in the system. The messenger provides ordered and reliable 29 * hosts in the system. The messenger provides ordered and reliable
@@ -149,6 +152,11 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
149 return test_and_set_bit(con_flag, &con->flags); 152 return test_and_set_bit(con_flag, &con->flags);
150} 153}
151 154
155/* Slab caches for frequently-allocated structures */
156
157static struct kmem_cache *ceph_msg_cache;
158static struct kmem_cache *ceph_msg_data_cache;
159
152/* static tag bytes (protocol control messages) */ 160/* static tag bytes (protocol control messages) */
153static char tag_msg = CEPH_MSGR_TAG_MSG; 161static char tag_msg = CEPH_MSGR_TAG_MSG;
154static char tag_ack = CEPH_MSGR_TAG_ACK; 162static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -223,6 +231,41 @@ static void encode_my_addr(struct ceph_messenger *msgr)
223 */ 231 */
224static struct workqueue_struct *ceph_msgr_wq; 232static struct workqueue_struct *ceph_msgr_wq;
225 233
234static int ceph_msgr_slab_init(void)
235{
236 BUG_ON(ceph_msg_cache);
237 ceph_msg_cache = kmem_cache_create("ceph_msg",
238 sizeof (struct ceph_msg),
239 __alignof__(struct ceph_msg), 0, NULL);
240
241 if (!ceph_msg_cache)
242 return -ENOMEM;
243
244 BUG_ON(ceph_msg_data_cache);
245 ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
246 sizeof (struct ceph_msg_data),
247 __alignof__(struct ceph_msg_data),
248 0, NULL);
249 if (ceph_msg_data_cache)
250 return 0;
251
252 kmem_cache_destroy(ceph_msg_cache);
253 ceph_msg_cache = NULL;
254
255 return -ENOMEM;
256}
257
258static void ceph_msgr_slab_exit(void)
259{
260 BUG_ON(!ceph_msg_data_cache);
261 kmem_cache_destroy(ceph_msg_data_cache);
262 ceph_msg_data_cache = NULL;
263
264 BUG_ON(!ceph_msg_cache);
265 kmem_cache_destroy(ceph_msg_cache);
266 ceph_msg_cache = NULL;
267}
268
226static void _ceph_msgr_exit(void) 269static void _ceph_msgr_exit(void)
227{ 270{
228 if (ceph_msgr_wq) { 271 if (ceph_msgr_wq) {
@@ -230,6 +273,8 @@ static void _ceph_msgr_exit(void)
230 ceph_msgr_wq = NULL; 273 ceph_msgr_wq = NULL;
231 } 274 }
232 275
276 ceph_msgr_slab_exit();
277
233 BUG_ON(zero_page == NULL); 278 BUG_ON(zero_page == NULL);
234 kunmap(zero_page); 279 kunmap(zero_page);
235 page_cache_release(zero_page); 280 page_cache_release(zero_page);
@@ -242,6 +287,9 @@ int ceph_msgr_init(void)
242 zero_page = ZERO_PAGE(0); 287 zero_page = ZERO_PAGE(0);
243 page_cache_get(zero_page); 288 page_cache_get(zero_page);
244 289
290 if (ceph_msgr_slab_init())
291 return -ENOMEM;
292
245 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 293 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
246 if (ceph_msgr_wq) 294 if (ceph_msgr_wq)
247 return 0; 295 return 0;
@@ -471,6 +519,22 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
471 return r; 519 return r;
472} 520}
473 521
522static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
523 int page_offset, size_t length)
524{
525 void *kaddr;
526 int ret;
527
528 BUG_ON(page_offset + length > PAGE_SIZE);
529
530 kaddr = kmap(page);
531 BUG_ON(!kaddr);
532 ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length);
533 kunmap(page);
534
535 return ret;
536}
537
474/* 538/*
475 * write something. @more is true if caller will be sending more data 539 * write something. @more is true if caller will be sending more data
476 * shortly. 540 * shortly.
@@ -493,7 +557,7 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
493} 557}
494 558
495static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 559static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
496 int offset, size_t size, int more) 560 int offset, size_t size, bool more)
497{ 561{
498 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); 562 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
499 int ret; 563 int ret;
@@ -697,50 +761,397 @@ static void con_out_kvec_add(struct ceph_connection *con,
697} 761}
698 762
699#ifdef CONFIG_BLOCK 763#ifdef CONFIG_BLOCK
700static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 764
765/*
766 * For a bio data item, a piece is whatever remains of the next
767 * entry in the current bio iovec, or the first entry in the next
768 * bio in the list.
769 */
770static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
771 size_t length)
701{ 772{
702 if (!bio) { 773 struct ceph_msg_data *data = cursor->data;
703 *iter = NULL; 774 struct bio *bio;
704 *seg = 0; 775
705 return; 776 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
777
778 bio = data->bio;
779 BUG_ON(!bio);
780 BUG_ON(!bio->bi_vcnt);
781
782 cursor->resid = min(length, data->bio_length);
783 cursor->bio = bio;
784 cursor->vector_index = 0;
785 cursor->vector_offset = 0;
786 cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
787}
788
789static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
790 size_t *page_offset,
791 size_t *length)
792{
793 struct ceph_msg_data *data = cursor->data;
794 struct bio *bio;
795 struct bio_vec *bio_vec;
796 unsigned int index;
797
798 BUG_ON(data->type != CEPH_MSG_DATA_BIO);
799
800 bio = cursor->bio;
801 BUG_ON(!bio);
802
803 index = cursor->vector_index;
804 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
805
806 bio_vec = &bio->bi_io_vec[index];
807 BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
808 *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
809 BUG_ON(*page_offset >= PAGE_SIZE);
810 if (cursor->last_piece) /* pagelist offset is always 0 */
811 *length = cursor->resid;
812 else
813 *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
814 BUG_ON(*length > cursor->resid);
815 BUG_ON(*page_offset + *length > PAGE_SIZE);
816
817 return bio_vec->bv_page;
818}
819
820static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
821 size_t bytes)
822{
823 struct bio *bio;
824 struct bio_vec *bio_vec;
825 unsigned int index;
826
827 BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
828
829 bio = cursor->bio;
830 BUG_ON(!bio);
831
832 index = cursor->vector_index;
833 BUG_ON(index >= (unsigned int) bio->bi_vcnt);
834 bio_vec = &bio->bi_io_vec[index];
835
836 /* Advance the cursor offset */
837
838 BUG_ON(cursor->resid < bytes);
839 cursor->resid -= bytes;
840 cursor->vector_offset += bytes;
841 if (cursor->vector_offset < bio_vec->bv_len)
842 return false; /* more bytes to process in this segment */
843 BUG_ON(cursor->vector_offset != bio_vec->bv_len);
844
845 /* Move on to the next segment, and possibly the next bio */
846
847 if (++index == (unsigned int) bio->bi_vcnt) {
848 bio = bio->bi_next;
849 index = 0;
706 } 850 }
707 *iter = bio; 851 cursor->bio = bio;
708 *seg = bio->bi_idx; 852 cursor->vector_index = index;
853 cursor->vector_offset = 0;
854
855 if (!cursor->last_piece) {
856 BUG_ON(!cursor->resid);
857 BUG_ON(!bio);
858 /* A short read is OK, so use <= rather than == */
859 if (cursor->resid <= bio->bi_io_vec[index].bv_len)
860 cursor->last_piece = true;
861 }
862
863 return true;
709} 864}
865#endif /* CONFIG_BLOCK */
710 866
711static void iter_bio_next(struct bio **bio_iter, int *seg) 867/*
868 * For a page array, a piece comes from the first page in the array
869 * that has not already been fully consumed.
870 */
871static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
872 size_t length)
712{ 873{
713 if (*bio_iter == NULL) 874 struct ceph_msg_data *data = cursor->data;
714 return; 875 int page_count;
876
877 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
715 878
716 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 879 BUG_ON(!data->pages);
880 BUG_ON(!data->length);
717 881
718 (*seg)++; 882 cursor->resid = min(length, data->length);
719 if (*seg == (*bio_iter)->bi_vcnt) 883 page_count = calc_pages_for(data->alignment, (u64)data->length);
720 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 884 cursor->page_offset = data->alignment & ~PAGE_MASK;
885 cursor->page_index = 0;
886 BUG_ON(page_count > (int)USHRT_MAX);
887 cursor->page_count = (unsigned short)page_count;
888 BUG_ON(length > SIZE_MAX - cursor->page_offset);
889 cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
721} 890}
722#endif
723 891
724static void prepare_write_message_data(struct ceph_connection *con) 892static struct page *
893ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
894 size_t *page_offset, size_t *length)
725{ 895{
726 struct ceph_msg *msg = con->out_msg; 896 struct ceph_msg_data *data = cursor->data;
727 897
728 BUG_ON(!msg); 898 BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
729 BUG_ON(!msg->hdr.data_len); 899
900 BUG_ON(cursor->page_index >= cursor->page_count);
901 BUG_ON(cursor->page_offset >= PAGE_SIZE);
902
903 *page_offset = cursor->page_offset;
904 if (cursor->last_piece)
905 *length = cursor->resid;
906 else
907 *length = PAGE_SIZE - *page_offset;
908
909 return data->pages[cursor->page_index];
910}
911
912static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
913 size_t bytes)
914{
915 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
916
917 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
918
919 /* Advance the cursor page offset */
920
921 cursor->resid -= bytes;
922 cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
923 if (!bytes || cursor->page_offset)
924 return false; /* more bytes to process in the current page */
925
926 /* Move on to the next page; offset is already at 0 */
927
928 BUG_ON(cursor->page_index >= cursor->page_count);
929 cursor->page_index++;
930 cursor->last_piece = cursor->resid <= PAGE_SIZE;
931
932 return true;
933}
934
935/*
936 * For a pagelist, a piece is whatever remains to be consumed in the
937 * first page in the list, or the front of the next page.
938 */
939static void
940ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
941 size_t length)
942{
943 struct ceph_msg_data *data = cursor->data;
944 struct ceph_pagelist *pagelist;
945 struct page *page;
946
947 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
948
949 pagelist = data->pagelist;
950 BUG_ON(!pagelist);
951
952 if (!length)
953 return; /* pagelist can be assigned but empty */
954
955 BUG_ON(list_empty(&pagelist->head));
956 page = list_first_entry(&pagelist->head, struct page, lru);
957
958 cursor->resid = min(length, pagelist->length);
959 cursor->page = page;
960 cursor->offset = 0;
961 cursor->last_piece = cursor->resid <= PAGE_SIZE;
962}
963
964static struct page *
965ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
966 size_t *page_offset, size_t *length)
967{
968 struct ceph_msg_data *data = cursor->data;
969 struct ceph_pagelist *pagelist;
970
971 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
730 972
731 /* initialize page iterator */ 973 pagelist = data->pagelist;
732 con->out_msg_pos.page = 0; 974 BUG_ON(!pagelist);
733 if (msg->pages) 975
734 con->out_msg_pos.page_pos = msg->page_alignment; 976 BUG_ON(!cursor->page);
977 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
978
979 /* offset of first page in pagelist is always 0 */
980 *page_offset = cursor->offset & ~PAGE_MASK;
981 if (cursor->last_piece)
982 *length = cursor->resid;
735 else 983 else
736 con->out_msg_pos.page_pos = 0; 984 *length = PAGE_SIZE - *page_offset;
985
986 return cursor->page;
987}
988
989static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
990 size_t bytes)
991{
992 struct ceph_msg_data *data = cursor->data;
993 struct ceph_pagelist *pagelist;
994
995 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
996
997 pagelist = data->pagelist;
998 BUG_ON(!pagelist);
999
1000 BUG_ON(cursor->offset + cursor->resid != pagelist->length);
1001 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
1002
1003 /* Advance the cursor offset */
1004
1005 cursor->resid -= bytes;
1006 cursor->offset += bytes;
1007 /* offset of first page in pagelist is always 0 */
1008 if (!bytes || cursor->offset & ~PAGE_MASK)
1009 return false; /* more bytes to process in the current page */
1010
1011 /* Move on to the next page */
1012
1013 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
1014 cursor->page = list_entry_next(cursor->page, lru);
1015 cursor->last_piece = cursor->resid <= PAGE_SIZE;
1016
1017 return true;
1018}
1019
1020/*
1021 * Message data is handled (sent or received) in pieces, where each
1022 * piece resides on a single page. The network layer might not
1023 * consume an entire piece at once. A data item's cursor keeps
1024 * track of which piece is next to process and how much remains to
1025 * be processed in that piece. It also tracks whether the current
1026 * piece is the last one in the data item.
1027 */
1028static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1029{
1030 size_t length = cursor->total_resid;
1031
1032 switch (cursor->data->type) {
1033 case CEPH_MSG_DATA_PAGELIST:
1034 ceph_msg_data_pagelist_cursor_init(cursor, length);
1035 break;
1036 case CEPH_MSG_DATA_PAGES:
1037 ceph_msg_data_pages_cursor_init(cursor, length);
1038 break;
737#ifdef CONFIG_BLOCK 1039#ifdef CONFIG_BLOCK
738 if (msg->bio) 1040 case CEPH_MSG_DATA_BIO:
739 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 1041 ceph_msg_data_bio_cursor_init(cursor, length);
740#endif 1042 break;
741 con->out_msg_pos.data_pos = 0; 1043#endif /* CONFIG_BLOCK */
742 con->out_msg_pos.did_page_crc = false; 1044 case CEPH_MSG_DATA_NONE:
743 con->out_more = 1; /* data + footer will follow */ 1045 default:
1046 /* BUG(); */
1047 break;
1048 }
1049 cursor->need_crc = true;
1050}
1051
1052static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1053{
1054 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1055 struct ceph_msg_data *data;
1056
1057 BUG_ON(!length);
1058 BUG_ON(length > msg->data_length);
1059 BUG_ON(list_empty(&msg->data));
1060
1061 cursor->data_head = &msg->data;
1062 cursor->total_resid = length;
1063 data = list_first_entry(&msg->data, struct ceph_msg_data, links);
1064 cursor->data = data;
1065
1066 __ceph_msg_data_cursor_init(cursor);
1067}
1068
1069/*
1070 * Return the page containing the next piece to process for a given
1071 * data item, and supply the page offset and length of that piece.
1072 * Indicate whether this is the last piece in this data item.
1073 */
1074static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1075 size_t *page_offset, size_t *length,
1076 bool *last_piece)
1077{
1078 struct page *page;
1079
1080 switch (cursor->data->type) {
1081 case CEPH_MSG_DATA_PAGELIST:
1082 page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1083 break;
1084 case CEPH_MSG_DATA_PAGES:
1085 page = ceph_msg_data_pages_next(cursor, page_offset, length);
1086 break;
1087#ifdef CONFIG_BLOCK
1088 case CEPH_MSG_DATA_BIO:
1089 page = ceph_msg_data_bio_next(cursor, page_offset, length);
1090 break;
1091#endif /* CONFIG_BLOCK */
1092 case CEPH_MSG_DATA_NONE:
1093 default:
1094 page = NULL;
1095 break;
1096 }
1097 BUG_ON(!page);
1098 BUG_ON(*page_offset + *length > PAGE_SIZE);
1099 BUG_ON(!*length);
1100 if (last_piece)
1101 *last_piece = cursor->last_piece;
1102
1103 return page;
1104}
1105
1106/*
1107 * Returns true if the result moves the cursor on to the next piece
1108 * of the data item.
1109 */
1110static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1111 size_t bytes)
1112{
1113 bool new_piece;
1114
1115 BUG_ON(bytes > cursor->resid);
1116 switch (cursor->data->type) {
1117 case CEPH_MSG_DATA_PAGELIST:
1118 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1119 break;
1120 case CEPH_MSG_DATA_PAGES:
1121 new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1122 break;
1123#ifdef CONFIG_BLOCK
1124 case CEPH_MSG_DATA_BIO:
1125 new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1126 break;
1127#endif /* CONFIG_BLOCK */
1128 case CEPH_MSG_DATA_NONE:
1129 default:
1130 BUG();
1131 break;
1132 }
1133 cursor->total_resid -= bytes;
1134
1135 if (!cursor->resid && cursor->total_resid) {
1136 WARN_ON(!cursor->last_piece);
1137 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
1138 cursor->data = list_entry_next(cursor->data, links);
1139 __ceph_msg_data_cursor_init(cursor);
1140 new_piece = true;
1141 }
1142 cursor->need_crc = new_piece;
1143
1144 return new_piece;
1145}
1146
1147static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1148{
1149 BUG_ON(!msg);
1150 BUG_ON(!data_len);
1151
1152 /* Initialize data cursor */
1153
1154 ceph_msg_data_cursor_init(msg, (size_t)data_len);
744} 1155}
745 1156
746/* 1157/*
@@ -803,16 +1214,12 @@ static void prepare_write_message(struct ceph_connection *con)
803 m->hdr.seq = cpu_to_le64(++con->out_seq); 1214 m->hdr.seq = cpu_to_le64(++con->out_seq);
804 m->needs_out_seq = false; 1215 m->needs_out_seq = false;
805 } 1216 }
806#ifdef CONFIG_BLOCK 1217 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
807 else
808 m->bio_iter = NULL;
809#endif
810 1218
811 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 1219 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
812 m, con->out_seq, le16_to_cpu(m->hdr.type), 1220 m, con->out_seq, le16_to_cpu(m->hdr.type),
813 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1221 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
814 le32_to_cpu(m->hdr.data_len), 1222 m->data_length);
815 m->nr_pages);
816 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 1223 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
817 1224
818 /* tag + hdr + front + middle */ 1225 /* tag + hdr + front + middle */
@@ -843,11 +1250,13 @@ static void prepare_write_message(struct ceph_connection *con)
843 1250
844 /* is there a data payload? */ 1251 /* is there a data payload? */
845 con->out_msg->footer.data_crc = 0; 1252 con->out_msg->footer.data_crc = 0;
846 if (m->hdr.data_len) 1253 if (m->data_length) {
847 prepare_write_message_data(con); 1254 prepare_message_data(con->out_msg, m->data_length);
848 else 1255 con->out_more = 1; /* data + footer will follow */
1256 } else {
849 /* no, queue up footer too and be done */ 1257 /* no, queue up footer too and be done */
850 prepare_write_message_footer(con); 1258 prepare_write_message_footer(con);
1259 }
851 1260
852 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1261 con_flag_set(con, CON_FLAG_WRITE_PENDING);
853} 1262}
@@ -874,6 +1283,24 @@ static void prepare_write_ack(struct ceph_connection *con)
874} 1283}
875 1284
876/* 1285/*
1286 * Prepare to share the seq during handshake
1287 */
1288static void prepare_write_seq(struct ceph_connection *con)
1289{
1290 dout("prepare_write_seq %p %llu -> %llu\n", con,
1291 con->in_seq_acked, con->in_seq);
1292 con->in_seq_acked = con->in_seq;
1293
1294 con_out_kvec_reset(con);
1295
1296 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1297 con_out_kvec_add(con, sizeof (con->out_temp_ack),
1298 &con->out_temp_ack);
1299
1300 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1301}
1302
1303/*
877 * Prepare to write keepalive byte. 1304 * Prepare to write keepalive byte.
878 */ 1305 */
879static void prepare_write_keepalive(struct ceph_connection *con) 1306static void prepare_write_keepalive(struct ceph_connection *con)
@@ -1022,35 +1449,19 @@ out:
1022 return ret; /* done! */ 1449 return ret; /* done! */
1023} 1450}
1024 1451
1025static void out_msg_pos_next(struct ceph_connection *con, struct page *page, 1452static u32 ceph_crc32c_page(u32 crc, struct page *page,
1026 size_t len, size_t sent, bool in_trail) 1453 unsigned int page_offset,
1454 unsigned int length)
1027{ 1455{
1028 struct ceph_msg *msg = con->out_msg; 1456 char *kaddr;
1029 1457
1030 BUG_ON(!msg); 1458 kaddr = kmap(page);
1031 BUG_ON(!sent); 1459 BUG_ON(kaddr == NULL);
1032 1460 crc = crc32c(crc, kaddr + page_offset, length);
1033 con->out_msg_pos.data_pos += sent; 1461 kunmap(page);
1034 con->out_msg_pos.page_pos += sent;
1035 if (sent < len)
1036 return;
1037 1462
1038 BUG_ON(sent != len); 1463 return crc;
1039 con->out_msg_pos.page_pos = 0;
1040 con->out_msg_pos.page++;
1041 con->out_msg_pos.did_page_crc = false;
1042 if (in_trail)
1043 list_move_tail(&page->lru,
1044 &msg->trail->head);
1045 else if (msg->pagelist)
1046 list_move_tail(&page->lru,
1047 &msg->pagelist->head);
1048#ifdef CONFIG_BLOCK
1049 else if (msg->bio)
1050 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
1051#endif
1052} 1464}
1053
1054/* 1465/*
1055 * Write as much message data payload as we can. If we finish, queue 1466 * Write as much message data payload as we can. If we finish, queue
1056 * up the footer. 1467 * up the footer.
@@ -1058,21 +1469,17 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1058 * 0 -> socket full, but more to do 1469 * 0 -> socket full, but more to do
1059 * <0 -> error 1470 * <0 -> error
1060 */ 1471 */
1061static int write_partial_msg_pages(struct ceph_connection *con) 1472static int write_partial_message_data(struct ceph_connection *con)
1062{ 1473{
1063 struct ceph_msg *msg = con->out_msg; 1474 struct ceph_msg *msg = con->out_msg;
1064 unsigned int data_len = le32_to_cpu(msg->hdr.data_len); 1475 struct ceph_msg_data_cursor *cursor = &msg->cursor;
1065 size_t len;
1066 bool do_datacrc = !con->msgr->nocrc; 1476 bool do_datacrc = !con->msgr->nocrc;
1067 int ret; 1477 u32 crc;
1068 int total_max_write;
1069 bool in_trail = false;
1070 const size_t trail_len = (msg->trail ? msg->trail->length : 0);
1071 const size_t trail_off = data_len - trail_len;
1072 1478
1073 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 1479 dout("%s %p msg %p\n", __func__, con, msg);
1074 con, msg, con->out_msg_pos.page, msg->nr_pages, 1480
1075 con->out_msg_pos.page_pos); 1481 if (list_empty(&msg->data))
1482 return -EINVAL;
1076 1483
1077 /* 1484 /*
1078 * Iterate through each page that contains data to be 1485 * Iterate through each page that contains data to be
@@ -1082,72 +1489,41 @@ static int write_partial_msg_pages(struct ceph_connection *con)
1082 * need to map the page. If we have no pages, they have 1489 * need to map the page. If we have no pages, they have
1083 * been revoked, so use the zero page. 1490 * been revoked, so use the zero page.
1084 */ 1491 */
1085 while (data_len > con->out_msg_pos.data_pos) { 1492 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
1086 struct page *page = NULL; 1493 while (cursor->resid) {
1087 int max_write = PAGE_SIZE; 1494 struct page *page;
1088 int bio_offset = 0; 1495 size_t page_offset;
1089 1496 size_t length;
1090 in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off; 1497 bool last_piece;
1091 if (!in_trail) 1498 bool need_crc;
1092 total_max_write = trail_off - con->out_msg_pos.data_pos; 1499 int ret;
1093
1094 if (in_trail) {
1095 total_max_write = data_len - con->out_msg_pos.data_pos;
1096
1097 page = list_first_entry(&msg->trail->head,
1098 struct page, lru);
1099 } else if (msg->pages) {
1100 page = msg->pages[con->out_msg_pos.page];
1101 } else if (msg->pagelist) {
1102 page = list_first_entry(&msg->pagelist->head,
1103 struct page, lru);
1104#ifdef CONFIG_BLOCK
1105 } else if (msg->bio) {
1106 struct bio_vec *bv;
1107 1500
1108 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 1501 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
1109 page = bv->bv_page; 1502 &last_piece);
1110 bio_offset = bv->bv_offset; 1503 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1111 max_write = bv->bv_len; 1504 length, last_piece);
1112#endif 1505 if (ret <= 0) {
1113 } else { 1506 if (do_datacrc)
1114 page = zero_page; 1507 msg->footer.data_crc = cpu_to_le32(crc);
1115 }
1116 len = min_t(int, max_write - con->out_msg_pos.page_pos,
1117 total_max_write);
1118
1119 if (do_datacrc && !con->out_msg_pos.did_page_crc) {
1120 void *base;
1121 u32 crc = le32_to_cpu(msg->footer.data_crc);
1122 char *kaddr;
1123
1124 kaddr = kmap(page);
1125 BUG_ON(kaddr == NULL);
1126 base = kaddr + con->out_msg_pos.page_pos + bio_offset;
1127 crc = crc32c(crc, base, len);
1128 kunmap(page);
1129 msg->footer.data_crc = cpu_to_le32(crc);
1130 con->out_msg_pos.did_page_crc = true;
1131 }
1132 ret = ceph_tcp_sendpage(con->sock, page,
1133 con->out_msg_pos.page_pos + bio_offset,
1134 len, 1);
1135 if (ret <= 0)
1136 goto out;
1137 1508
1138 out_msg_pos_next(con, page, len, (size_t) ret, in_trail); 1509 return ret;
1510 }
1511 if (do_datacrc && cursor->need_crc)
1512 crc = ceph_crc32c_page(crc, page, page_offset, length);
1513 need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
1139 } 1514 }
1140 1515
1141 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 1516 dout("%s %p msg %p done\n", __func__, con, msg);
1142 1517
1143 /* prepare and queue up footer, too */ 1518 /* prepare and queue up footer, too */
1144 if (!do_datacrc) 1519 if (do_datacrc)
1520 msg->footer.data_crc = cpu_to_le32(crc);
1521 else
1145 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1522 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
1146 con_out_kvec_reset(con); 1523 con_out_kvec_reset(con);
1147 prepare_write_message_footer(con); 1524 prepare_write_message_footer(con);
1148 ret = 1; 1525
1149out: 1526 return 1; /* must return > 0 to indicate success */
1150 return ret;
1151} 1527}
1152 1528
1153/* 1529/*
@@ -1160,7 +1536,7 @@ static int write_partial_skip(struct ceph_connection *con)
1160 while (con->out_skip > 0) { 1536 while (con->out_skip > 0) {
1161 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 1537 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
1162 1538
1163 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1); 1539 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
1164 if (ret <= 0) 1540 if (ret <= 0)
1165 goto out; 1541 goto out;
1166 con->out_skip -= ret; 1542 con->out_skip -= ret;
@@ -1191,6 +1567,13 @@ static void prepare_read_ack(struct ceph_connection *con)
1191 con->in_base_pos = 0; 1567 con->in_base_pos = 0;
1192} 1568}
1193 1569
1570static void prepare_read_seq(struct ceph_connection *con)
1571{
1572 dout("prepare_read_seq %p\n", con);
1573 con->in_base_pos = 0;
1574 con->in_tag = CEPH_MSGR_TAG_SEQ;
1575}
1576
1194static void prepare_read_tag(struct ceph_connection *con) 1577static void prepare_read_tag(struct ceph_connection *con)
1195{ 1578{
1196 dout("prepare_read_tag %p\n", con); 1579 dout("prepare_read_tag %p\n", con);
@@ -1597,7 +1980,6 @@ static int process_connect(struct ceph_connection *con)
1597 con->error_msg = "connect authorization failure"; 1980 con->error_msg = "connect authorization failure";
1598 return -1; 1981 return -1;
1599 } 1982 }
1600 con->auth_retry = 1;
1601 con_out_kvec_reset(con); 1983 con_out_kvec_reset(con);
1602 ret = prepare_write_connect(con); 1984 ret = prepare_write_connect(con);
1603 if (ret < 0) 1985 if (ret < 0)
@@ -1668,6 +2050,7 @@ static int process_connect(struct ceph_connection *con)
1668 prepare_read_connect(con); 2050 prepare_read_connect(con);
1669 break; 2051 break;
1670 2052
2053 case CEPH_MSGR_TAG_SEQ:
1671 case CEPH_MSGR_TAG_READY: 2054 case CEPH_MSGR_TAG_READY:
1672 if (req_feat & ~server_feat) { 2055 if (req_feat & ~server_feat) {
1673 pr_err("%s%lld %s protocol feature mismatch," 2056 pr_err("%s%lld %s protocol feature mismatch,"
@@ -1682,7 +2065,7 @@ static int process_connect(struct ceph_connection *con)
1682 2065
1683 WARN_ON(con->state != CON_STATE_NEGOTIATING); 2066 WARN_ON(con->state != CON_STATE_NEGOTIATING);
1684 con->state = CON_STATE_OPEN; 2067 con->state = CON_STATE_OPEN;
1685 2068 con->auth_retry = 0; /* we authenticated; clear flag */
1686 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 2069 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1687 con->connect_seq++; 2070 con->connect_seq++;
1688 con->peer_features = server_feat; 2071 con->peer_features = server_feat;
@@ -1698,7 +2081,12 @@ static int process_connect(struct ceph_connection *con)
1698 2081
1699 con->delay = 0; /* reset backoff memory */ 2082 con->delay = 0; /* reset backoff memory */
1700 2083
1701 prepare_read_tag(con); 2084 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
2085 prepare_write_seq(con);
2086 prepare_read_seq(con);
2087 } else {
2088 prepare_read_tag(con);
2089 }
1702 break; 2090 break;
1703 2091
1704 case CEPH_MSGR_TAG_WAIT: 2092 case CEPH_MSGR_TAG_WAIT:
@@ -1732,7 +2120,6 @@ static int read_partial_ack(struct ceph_connection *con)
1732 return read_partial(con, end, size, &con->in_temp_ack); 2120 return read_partial(con, end, size, &con->in_temp_ack);
1733} 2121}
1734 2122
1735
1736/* 2123/*
1737 * We can finally discard anything that's been acked. 2124 * We can finally discard anything that's been acked.
1738 */ 2125 */
@@ -1757,8 +2144,6 @@ static void process_ack(struct ceph_connection *con)
1757} 2144}
1758 2145
1759 2146
1760
1761
1762static int read_partial_message_section(struct ceph_connection *con, 2147static int read_partial_message_section(struct ceph_connection *con,
1763 struct kvec *section, 2148 struct kvec *section,
1764 unsigned int sec_len, u32 *crc) 2149 unsigned int sec_len, u32 *crc)
@@ -1782,77 +2167,49 @@ static int read_partial_message_section(struct ceph_connection *con,
1782 return 1; 2167 return 1;
1783} 2168}
1784 2169
1785static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); 2170static int read_partial_msg_data(struct ceph_connection *con)
1786
1787static int read_partial_message_pages(struct ceph_connection *con,
1788 struct page **pages,
1789 unsigned int data_len, bool do_datacrc)
1790{ 2171{
1791 void *p; 2172 struct ceph_msg *msg = con->in_msg;
2173 struct ceph_msg_data_cursor *cursor = &msg->cursor;
2174 const bool do_datacrc = !con->msgr->nocrc;
2175 struct page *page;
2176 size_t page_offset;
2177 size_t length;
2178 u32 crc = 0;
1792 int ret; 2179 int ret;
1793 int left;
1794 2180
1795 left = min((int)(data_len - con->in_msg_pos.data_pos), 2181 BUG_ON(!msg);
1796 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 2182 if (list_empty(&msg->data))
1797 /* (page) data */ 2183 return -EIO;
1798 BUG_ON(pages == NULL);
1799 p = kmap(pages[con->in_msg_pos.page]);
1800 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1801 left);
1802 if (ret > 0 && do_datacrc)
1803 con->in_data_crc =
1804 crc32c(con->in_data_crc,
1805 p + con->in_msg_pos.page_pos, ret);
1806 kunmap(pages[con->in_msg_pos.page]);
1807 if (ret <= 0)
1808 return ret;
1809 con->in_msg_pos.data_pos += ret;
1810 con->in_msg_pos.page_pos += ret;
1811 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1812 con->in_msg_pos.page_pos = 0;
1813 con->in_msg_pos.page++;
1814 }
1815
1816 return ret;
1817}
1818
1819#ifdef CONFIG_BLOCK
1820static int read_partial_message_bio(struct ceph_connection *con,
1821 struct bio **bio_iter, int *bio_seg,
1822 unsigned int data_len, bool do_datacrc)
1823{
1824 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1825 void *p;
1826 int ret, left;
1827 2184
1828 left = min((int)(data_len - con->in_msg_pos.data_pos), 2185 if (do_datacrc)
1829 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 2186 crc = con->in_data_crc;
2187 while (cursor->resid) {
2188 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
2189 NULL);
2190 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2191 if (ret <= 0) {
2192 if (do_datacrc)
2193 con->in_data_crc = crc;
1830 2194
1831 p = kmap(bv->bv_page) + bv->bv_offset; 2195 return ret;
2196 }
1832 2197
1833 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 2198 if (do_datacrc)
1834 left); 2199 crc = ceph_crc32c_page(crc, page, page_offset, ret);
1835 if (ret > 0 && do_datacrc) 2200 (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
1836 con->in_data_crc =
1837 crc32c(con->in_data_crc,
1838 p + con->in_msg_pos.page_pos, ret);
1839 kunmap(bv->bv_page);
1840 if (ret <= 0)
1841 return ret;
1842 con->in_msg_pos.data_pos += ret;
1843 con->in_msg_pos.page_pos += ret;
1844 if (con->in_msg_pos.page_pos == bv->bv_len) {
1845 con->in_msg_pos.page_pos = 0;
1846 iter_bio_next(bio_iter, bio_seg);
1847 } 2201 }
2202 if (do_datacrc)
2203 con->in_data_crc = crc;
1848 2204
1849 return ret; 2205 return 1; /* must return > 0 to indicate success */
1850} 2206}
1851#endif
1852 2207
1853/* 2208/*
1854 * read (part of) a message. 2209 * read (part of) a message.
1855 */ 2210 */
2211static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2212
1856static int read_partial_message(struct ceph_connection *con) 2213static int read_partial_message(struct ceph_connection *con)
1857{ 2214{
1858 struct ceph_msg *m = con->in_msg; 2215 struct ceph_msg *m = con->in_msg;
@@ -1885,7 +2242,7 @@ static int read_partial_message(struct ceph_connection *con)
1885 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 2242 if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1886 return -EIO; 2243 return -EIO;
1887 middle_len = le32_to_cpu(con->in_hdr.middle_len); 2244 middle_len = le32_to_cpu(con->in_hdr.middle_len);
1888 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 2245 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1889 return -EIO; 2246 return -EIO;
1890 data_len = le32_to_cpu(con->in_hdr.data_len); 2247 data_len = le32_to_cpu(con->in_hdr.data_len);
1891 if (data_len > CEPH_MSG_MAX_DATA_LEN) 2248 if (data_len > CEPH_MSG_MAX_DATA_LEN)
@@ -1914,14 +2271,22 @@ static int read_partial_message(struct ceph_connection *con)
1914 int skip = 0; 2271 int skip = 0;
1915 2272
1916 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 2273 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1917 con->in_hdr.front_len, con->in_hdr.data_len); 2274 front_len, data_len);
1918 ret = ceph_con_in_msg_alloc(con, &skip); 2275 ret = ceph_con_in_msg_alloc(con, &skip);
1919 if (ret < 0) 2276 if (ret < 0)
1920 return ret; 2277 return ret;
2278
2279 BUG_ON(!con->in_msg ^ skip);
2280 if (con->in_msg && data_len > con->in_msg->data_length) {
2281 pr_warning("%s skipping long message (%u > %zd)\n",
2282 __func__, data_len, con->in_msg->data_length);
2283 ceph_msg_put(con->in_msg);
2284 con->in_msg = NULL;
2285 skip = 1;
2286 }
1921 if (skip) { 2287 if (skip) {
1922 /* skip this message */ 2288 /* skip this message */
1923 dout("alloc_msg said skip message\n"); 2289 dout("alloc_msg said skip message\n");
1924 BUG_ON(con->in_msg);
1925 con->in_base_pos = -front_len - middle_len - data_len - 2290 con->in_base_pos = -front_len - middle_len - data_len -
1926 sizeof(m->footer); 2291 sizeof(m->footer);
1927 con->in_tag = CEPH_MSGR_TAG_READY; 2292 con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1936,17 +2301,10 @@ static int read_partial_message(struct ceph_connection *con)
1936 if (m->middle) 2301 if (m->middle)
1937 m->middle->vec.iov_len = 0; 2302 m->middle->vec.iov_len = 0;
1938 2303
1939 con->in_msg_pos.page = 0; 2304 /* prepare for data payload, if any */
1940 if (m->pages)
1941 con->in_msg_pos.page_pos = m->page_alignment;
1942 else
1943 con->in_msg_pos.page_pos = 0;
1944 con->in_msg_pos.data_pos = 0;
1945 2305
1946#ifdef CONFIG_BLOCK 2306 if (data_len)
1947 if (m->bio) 2307 prepare_message_data(con->in_msg, data_len);
1948 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1949#endif
1950 } 2308 }
1951 2309
1952 /* front */ 2310 /* front */
@@ -1965,24 +2323,10 @@ static int read_partial_message(struct ceph_connection *con)
1965 } 2323 }
1966 2324
1967 /* (page) data */ 2325 /* (page) data */
1968 while (con->in_msg_pos.data_pos < data_len) { 2326 if (data_len) {
1969 if (m->pages) { 2327 ret = read_partial_msg_data(con);
1970 ret = read_partial_message_pages(con, m->pages, 2328 if (ret <= 0)
1971 data_len, do_datacrc); 2329 return ret;
1972 if (ret <= 0)
1973 return ret;
1974#ifdef CONFIG_BLOCK
1975 } else if (m->bio) {
1976 BUG_ON(!m->bio_iter);
1977 ret = read_partial_message_bio(con,
1978 &m->bio_iter, &m->bio_seg,
1979 data_len, do_datacrc);
1980 if (ret <= 0)
1981 return ret;
1982#endif
1983 } else {
1984 BUG_ON(1);
1985 }
1986 } 2330 }
1987 2331
1988 /* footer */ 2332 /* footer */
@@ -2108,13 +2452,13 @@ more_kvec:
2108 goto do_next; 2452 goto do_next;
2109 } 2453 }
2110 2454
2111 ret = write_partial_msg_pages(con); 2455 ret = write_partial_message_data(con);
2112 if (ret == 1) 2456 if (ret == 1)
2113 goto more_kvec; /* we need to send the footer, too! */ 2457 goto more_kvec; /* we need to send the footer, too! */
2114 if (ret == 0) 2458 if (ret == 0)
2115 goto out; 2459 goto out;
2116 if (ret < 0) { 2460 if (ret < 0) {
2117 dout("try_write write_partial_msg_pages err %d\n", 2461 dout("try_write write_partial_message_data err %d\n",
2118 ret); 2462 ret);
2119 goto out; 2463 goto out;
2120 } 2464 }
@@ -2266,7 +2610,12 @@ more:
2266 prepare_read_tag(con); 2610 prepare_read_tag(con);
2267 goto more; 2611 goto more;
2268 } 2612 }
2269 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2613 if (con->in_tag == CEPH_MSGR_TAG_ACK ||
2614 con->in_tag == CEPH_MSGR_TAG_SEQ) {
2615 /*
2616 * the final handshake seq exchange is semantically
2617 * equivalent to an ACK
2618 */
2270 ret = read_partial_ack(con); 2619 ret = read_partial_ack(con);
2271 if (ret <= 0) 2620 if (ret <= 0)
2272 goto out; 2621 goto out;
@@ -2672,6 +3021,88 @@ void ceph_con_keepalive(struct ceph_connection *con)
2672} 3021}
2673EXPORT_SYMBOL(ceph_con_keepalive); 3022EXPORT_SYMBOL(ceph_con_keepalive);
2674 3023
3024static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
3025{
3026 struct ceph_msg_data *data;
3027
3028 if (WARN_ON(!ceph_msg_data_type_valid(type)))
3029 return NULL;
3030
3031 data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
3032 if (data)
3033 data->type = type;
3034 INIT_LIST_HEAD(&data->links);
3035
3036 return data;
3037}
3038
3039static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3040{
3041 if (!data)
3042 return;
3043
3044 WARN_ON(!list_empty(&data->links));
3045 if (data->type == CEPH_MSG_DATA_PAGELIST) {
3046 ceph_pagelist_release(data->pagelist);
3047 kfree(data->pagelist);
3048 }
3049 kmem_cache_free(ceph_msg_data_cache, data);
3050}
3051
3052void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
3053 size_t length, size_t alignment)
3054{
3055 struct ceph_msg_data *data;
3056
3057 BUG_ON(!pages);
3058 BUG_ON(!length);
3059
3060 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
3061 BUG_ON(!data);
3062 data->pages = pages;
3063 data->length = length;
3064 data->alignment = alignment & ~PAGE_MASK;
3065
3066 list_add_tail(&data->links, &msg->data);
3067 msg->data_length += length;
3068}
3069EXPORT_SYMBOL(ceph_msg_data_add_pages);
3070
3071void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
3072 struct ceph_pagelist *pagelist)
3073{
3074 struct ceph_msg_data *data;
3075
3076 BUG_ON(!pagelist);
3077 BUG_ON(!pagelist->length);
3078
3079 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
3080 BUG_ON(!data);
3081 data->pagelist = pagelist;
3082
3083 list_add_tail(&data->links, &msg->data);
3084 msg->data_length += pagelist->length;
3085}
3086EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
3087
3088#ifdef CONFIG_BLOCK
3089void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
3090 size_t length)
3091{
3092 struct ceph_msg_data *data;
3093
3094 BUG_ON(!bio);
3095
3096 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
3097 BUG_ON(!data);
3098 data->bio = bio;
3099 data->bio_length = length;
3100
3101 list_add_tail(&data->links, &msg->data);
3102 msg->data_length += length;
3103}
3104EXPORT_SYMBOL(ceph_msg_data_add_bio);
3105#endif /* CONFIG_BLOCK */
2675 3106
2676/* 3107/*
2677 * construct a new message with given type, size 3108 * construct a new message with given type, size
@@ -2682,49 +3113,20 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2682{ 3113{
2683 struct ceph_msg *m; 3114 struct ceph_msg *m;
2684 3115
2685 m = kmalloc(sizeof(*m), flags); 3116 m = kmem_cache_zalloc(ceph_msg_cache, flags);
2686 if (m == NULL) 3117 if (m == NULL)
2687 goto out; 3118 goto out;
2688 kref_init(&m->kref);
2689 3119
2690 m->con = NULL;
2691 INIT_LIST_HEAD(&m->list_head);
2692
2693 m->hdr.tid = 0;
2694 m->hdr.type = cpu_to_le16(type); 3120 m->hdr.type = cpu_to_le16(type);
2695 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 3121 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
2696 m->hdr.version = 0;
2697 m->hdr.front_len = cpu_to_le32(front_len); 3122 m->hdr.front_len = cpu_to_le32(front_len);
2698 m->hdr.middle_len = 0;
2699 m->hdr.data_len = 0;
2700 m->hdr.data_off = 0;
2701 m->hdr.reserved = 0;
2702 m->footer.front_crc = 0;
2703 m->footer.middle_crc = 0;
2704 m->footer.data_crc = 0;
2705 m->footer.flags = 0;
2706 m->front_max = front_len;
2707 m->front_is_vmalloc = false;
2708 m->more_to_follow = false;
2709 m->ack_stamp = 0;
2710 m->pool = NULL;
2711
2712 /* middle */
2713 m->middle = NULL;
2714 3123
2715 /* data */ 3124 INIT_LIST_HEAD(&m->list_head);
2716 m->nr_pages = 0; 3125 kref_init(&m->kref);
2717 m->page_alignment = 0; 3126 INIT_LIST_HEAD(&m->data);
2718 m->pages = NULL;
2719 m->pagelist = NULL;
2720#ifdef CONFIG_BLOCK
2721 m->bio = NULL;
2722 m->bio_iter = NULL;
2723 m->bio_seg = 0;
2724#endif /* CONFIG_BLOCK */
2725 m->trail = NULL;
2726 3127
2727 /* front */ 3128 /* front */
3129 m->front_max = front_len;
2728 if (front_len) { 3130 if (front_len) {
2729 if (front_len > PAGE_CACHE_SIZE) { 3131 if (front_len > PAGE_CACHE_SIZE) {
2730 m->front.iov_base = __vmalloc(front_len, flags, 3132 m->front.iov_base = __vmalloc(front_len, flags,
@@ -2802,49 +3204,37 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2802static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) 3204static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
2803{ 3205{
2804 struct ceph_msg_header *hdr = &con->in_hdr; 3206 struct ceph_msg_header *hdr = &con->in_hdr;
2805 int type = le16_to_cpu(hdr->type);
2806 int front_len = le32_to_cpu(hdr->front_len);
2807 int middle_len = le32_to_cpu(hdr->middle_len); 3207 int middle_len = le32_to_cpu(hdr->middle_len);
3208 struct ceph_msg *msg;
2808 int ret = 0; 3209 int ret = 0;
2809 3210
2810 BUG_ON(con->in_msg != NULL); 3211 BUG_ON(con->in_msg != NULL);
3212 BUG_ON(!con->ops->alloc_msg);
2811 3213
2812 if (con->ops->alloc_msg) { 3214 mutex_unlock(&con->mutex);
2813 struct ceph_msg *msg; 3215 msg = con->ops->alloc_msg(con, hdr, skip);
2814 3216 mutex_lock(&con->mutex);
2815 mutex_unlock(&con->mutex); 3217 if (con->state != CON_STATE_OPEN) {
2816 msg = con->ops->alloc_msg(con, hdr, skip); 3218 if (msg)
2817 mutex_lock(&con->mutex); 3219 ceph_msg_put(msg);
2818 if (con->state != CON_STATE_OPEN) { 3220 return -EAGAIN;
2819 if (msg)
2820 ceph_msg_put(msg);
2821 return -EAGAIN;
2822 }
2823 con->in_msg = msg;
2824 if (con->in_msg) {
2825 con->in_msg->con = con->ops->get(con);
2826 BUG_ON(con->in_msg->con == NULL);
2827 }
2828 if (*skip) {
2829 con->in_msg = NULL;
2830 return 0;
2831 }
2832 if (!con->in_msg) {
2833 con->error_msg =
2834 "error allocating memory for incoming message";
2835 return -ENOMEM;
2836 }
2837 } 3221 }
2838 if (!con->in_msg) { 3222 if (msg) {
2839 con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 3223 BUG_ON(*skip);
2840 if (!con->in_msg) { 3224 con->in_msg = msg;
2841 pr_err("unable to allocate msg type %d len %d\n",
2842 type, front_len);
2843 return -ENOMEM;
2844 }
2845 con->in_msg->con = con->ops->get(con); 3225 con->in_msg->con = con->ops->get(con);
2846 BUG_ON(con->in_msg->con == NULL); 3226 BUG_ON(con->in_msg->con == NULL);
2847 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); 3227 } else {
3228 /*
3229 * Null message pointer means either we should skip
3230 * this message or we couldn't allocate memory. The
3231 * former is not an error.
3232 */
3233 if (*skip)
3234 return 0;
3235 con->error_msg = "error allocating memory for incoming message";
3236
3237 return -ENOMEM;
2848 } 3238 }
2849 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 3239 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2850 3240
@@ -2870,7 +3260,7 @@ void ceph_msg_kfree(struct ceph_msg *m)
2870 vfree(m->front.iov_base); 3260 vfree(m->front.iov_base);
2871 else 3261 else
2872 kfree(m->front.iov_base); 3262 kfree(m->front.iov_base);
2873 kfree(m); 3263 kmem_cache_free(ceph_msg_cache, m);
2874} 3264}
2875 3265
2876/* 3266/*
@@ -2879,6 +3269,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
2879void ceph_msg_last_put(struct kref *kref) 3269void ceph_msg_last_put(struct kref *kref)
2880{ 3270{
2881 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3271 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3272 LIST_HEAD(data);
3273 struct list_head *links;
3274 struct list_head *next;
2882 3275
2883 dout("ceph_msg_put last one on %p\n", m); 3276 dout("ceph_msg_put last one on %p\n", m);
2884 WARN_ON(!list_empty(&m->list_head)); 3277 WARN_ON(!list_empty(&m->list_head));
@@ -2888,16 +3281,16 @@ void ceph_msg_last_put(struct kref *kref)
2888 ceph_buffer_put(m->middle); 3281 ceph_buffer_put(m->middle);
2889 m->middle = NULL; 3282 m->middle = NULL;
2890 } 3283 }
2891 m->nr_pages = 0;
2892 m->pages = NULL;
2893 3284
2894 if (m->pagelist) { 3285 list_splice_init(&m->data, &data);
2895 ceph_pagelist_release(m->pagelist); 3286 list_for_each_safe(links, next, &data) {
2896 kfree(m->pagelist); 3287 struct ceph_msg_data *data;
2897 m->pagelist = NULL;
2898 }
2899 3288
2900 m->trail = NULL; 3289 data = list_entry(links, struct ceph_msg_data, links);
3290 list_del_init(links);
3291 ceph_msg_data_destroy(data);
3292 }
3293 m->data_length = 0;
2901 3294
2902 if (m->pool) 3295 if (m->pool)
2903 ceph_msgpool_put(m->pool, m); 3296 ceph_msgpool_put(m->pool, m);
@@ -2908,8 +3301,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
2908 3301
2909void ceph_msg_dump(struct ceph_msg *msg) 3302void ceph_msg_dump(struct ceph_msg *msg)
2910{ 3303{
2911 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 3304 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
2912 msg->front_max, msg->nr_pages); 3305 msg->front_max, msg->data_length);
2913 print_hex_dump(KERN_DEBUG, "header: ", 3306 print_hex_dump(KERN_DEBUG, "header: ",
2914 DUMP_PREFIX_OFFSET, 16, 1, 3307 DUMP_PREFIX_OFFSET, 16, 1,
2915 &msg->hdr, sizeof(msg->hdr), true); 3308 &msg->hdr, sizeof(msg->hdr), true);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index aef5b1062bee..1fe25cd29d0e 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -737,7 +737,7 @@ static void delayed_work(struct work_struct *work)
737 737
738 __validate_auth(monc); 738 __validate_auth(monc);
739 739
740 if (monc->auth->ops->is_authenticated(monc->auth)) 740 if (ceph_auth_is_authenticated(monc->auth))
741 __send_subscribe(monc); 741 __send_subscribe(monc);
742 } 742 }
743 __schedule_delayed(monc); 743 __schedule_delayed(monc);
@@ -892,8 +892,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
892 892
893 mutex_lock(&monc->mutex); 893 mutex_lock(&monc->mutex);
894 had_debugfs_info = have_debugfs_info(monc); 894 had_debugfs_info = have_debugfs_info(monc);
895 if (monc->auth->ops) 895 was_auth = ceph_auth_is_authenticated(monc->auth);
896 was_auth = monc->auth->ops->is_authenticated(monc->auth);
897 monc->pending_auth = 0; 896 monc->pending_auth = 0;
898 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 897 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
899 msg->front.iov_len, 898 msg->front.iov_len,
@@ -904,7 +903,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
904 wake_up_all(&monc->client->auth_wq); 903 wake_up_all(&monc->client->auth_wq);
905 } else if (ret > 0) { 904 } else if (ret > 0) {
906 __send_prepared_auth_request(monc, ret); 905 __send_prepared_auth_request(monc, ret);
907 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 906 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
908 dout("authenticated, starting session\n"); 907 dout("authenticated, starting session\n");
909 908
910 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 909 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d730dd4d8eb2..a3395fdfbd4f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1,3 +1,4 @@
1
1#include <linux/ceph/ceph_debug.h> 2#include <linux/ceph/ceph_debug.h>
2 3
3#include <linux/module.h> 4#include <linux/module.h>
@@ -21,6 +22,8 @@
21#define OSD_OP_FRONT_LEN 4096 22#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512 23#define OSD_OPREPLY_FRONT_LEN 512
23 24
25static struct kmem_cache *ceph_osd_request_cache;
26
24static const struct ceph_connection_operations osd_con_ops; 27static const struct ceph_connection_operations osd_con_ops;
25 28
26static void __send_queued(struct ceph_osd_client *osdc); 29static void __send_queued(struct ceph_osd_client *osdc);
@@ -32,12 +35,6 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 35static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 36 struct ceph_osd_request *req);
34 37
35static int op_has_extent(int op)
36{
37 return (op == CEPH_OSD_OP_READ ||
38 op == CEPH_OSD_OP_WRITE);
39}
40
41/* 38/*
42 * Implement client access to distributed object storage cluster. 39 * Implement client access to distributed object storage cluster.
43 * 40 *
@@ -63,53 +60,238 @@ static int op_has_extent(int op)
63 * 60 *
64 * fill osd op in request message. 61 * fill osd op in request message.
65 */ 62 */
66static int calc_layout(struct ceph_vino vino, 63static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
67 struct ceph_file_layout *layout, 64 u64 *objnum, u64 *objoff, u64 *objlen)
68 u64 off, u64 *plen,
69 struct ceph_osd_request *req,
70 struct ceph_osd_req_op *op)
71{ 65{
72 u64 orig_len = *plen; 66 u64 orig_len = *plen;
73 u64 bno = 0;
74 u64 objoff = 0;
75 u64 objlen = 0;
76 int r; 67 int r;
77 68
78 /* object extent? */ 69 /* object extent? */
79 r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno, 70 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
80 &objoff, &objlen); 71 objoff, objlen);
81 if (r < 0) 72 if (r < 0)
82 return r; 73 return r;
83 if (objlen < orig_len) { 74 if (*objlen < orig_len) {
84 *plen = objlen; 75 *plen = *objlen;
85 dout(" skipping last %llu, final file extent %llu~%llu\n", 76 dout(" skipping last %llu, final file extent %llu~%llu\n",
86 orig_len - *plen, off, *plen); 77 orig_len - *plen, off, *plen);
87 } 78 }
88 79
89 if (op_has_extent(op->op)) { 80 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
90 u32 osize = le32_to_cpu(layout->fl_object_size); 81
91 op->extent.offset = objoff; 82 return 0;
92 op->extent.length = objlen; 83}
93 if (op->extent.truncate_size <= off - objoff) { 84
94 op->extent.truncate_size = 0; 85static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
95 } else { 86{
96 op->extent.truncate_size -= off - objoff; 87 memset(osd_data, 0, sizeof (*osd_data));
97 if (op->extent.truncate_size > osize) 88 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
98 op->extent.truncate_size = osize; 89}
99 } 90
91static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
92 struct page **pages, u64 length, u32 alignment,
93 bool pages_from_pool, bool own_pages)
94{
95 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
96 osd_data->pages = pages;
97 osd_data->length = length;
98 osd_data->alignment = alignment;
99 osd_data->pages_from_pool = pages_from_pool;
100 osd_data->own_pages = own_pages;
101}
102
103static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
104 struct ceph_pagelist *pagelist)
105{
106 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
107 osd_data->pagelist = pagelist;
108}
109
110#ifdef CONFIG_BLOCK
111static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
112 struct bio *bio, size_t bio_length)
113{
114 osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
115 osd_data->bio = bio;
116 osd_data->bio_length = bio_length;
117}
118#endif /* CONFIG_BLOCK */
119
120#define osd_req_op_data(oreq, whch, typ, fld) \
121 ({ \
122 BUG_ON(whch >= (oreq)->r_num_ops); \
123 &(oreq)->r_ops[whch].typ.fld; \
124 })
125
126static struct ceph_osd_data *
127osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
128{
129 BUG_ON(which >= osd_req->r_num_ops);
130
131 return &osd_req->r_ops[which].raw_data_in;
132}
133
134struct ceph_osd_data *
135osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
136 unsigned int which)
137{
138 return osd_req_op_data(osd_req, which, extent, osd_data);
139}
140EXPORT_SYMBOL(osd_req_op_extent_osd_data);
141
142struct ceph_osd_data *
143osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
144 unsigned int which)
145{
146 return osd_req_op_data(osd_req, which, cls, response_data);
147}
148EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */
149
150void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
151 unsigned int which, struct page **pages,
152 u64 length, u32 alignment,
153 bool pages_from_pool, bool own_pages)
154{
155 struct ceph_osd_data *osd_data;
156
157 osd_data = osd_req_op_raw_data_in(osd_req, which);
158 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
159 pages_from_pool, own_pages);
160}
161EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
162
163void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
164 unsigned int which, struct page **pages,
165 u64 length, u32 alignment,
166 bool pages_from_pool, bool own_pages)
167{
168 struct ceph_osd_data *osd_data;
169
170 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
171 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
172 pages_from_pool, own_pages);
173}
174EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
175
176void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
177 unsigned int which, struct ceph_pagelist *pagelist)
178{
179 struct ceph_osd_data *osd_data;
180
181 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
182 ceph_osd_data_pagelist_init(osd_data, pagelist);
183}
184EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
185
186#ifdef CONFIG_BLOCK
187void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
188 unsigned int which, struct bio *bio, size_t bio_length)
189{
190 struct ceph_osd_data *osd_data;
191
192 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
193 ceph_osd_data_bio_init(osd_data, bio, bio_length);
194}
195EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
196#endif /* CONFIG_BLOCK */
197
198static void osd_req_op_cls_request_info_pagelist(
199 struct ceph_osd_request *osd_req,
200 unsigned int which, struct ceph_pagelist *pagelist)
201{
202 struct ceph_osd_data *osd_data;
203
204 osd_data = osd_req_op_data(osd_req, which, cls, request_info);
205 ceph_osd_data_pagelist_init(osd_data, pagelist);
206}
207
208void osd_req_op_cls_request_data_pagelist(
209 struct ceph_osd_request *osd_req,
210 unsigned int which, struct ceph_pagelist *pagelist)
211{
212 struct ceph_osd_data *osd_data;
213
214 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
215 ceph_osd_data_pagelist_init(osd_data, pagelist);
216}
217EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
218
219void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
220 unsigned int which, struct page **pages, u64 length,
221 u32 alignment, bool pages_from_pool, bool own_pages)
222{
223 struct ceph_osd_data *osd_data;
224
225 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
226 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
227 pages_from_pool, own_pages);
228}
229EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
230
231void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
232 unsigned int which, struct page **pages, u64 length,
233 u32 alignment, bool pages_from_pool, bool own_pages)
234{
235 struct ceph_osd_data *osd_data;
236
237 osd_data = osd_req_op_data(osd_req, which, cls, response_data);
238 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
239 pages_from_pool, own_pages);
240}
241EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
242
243static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
244{
245 switch (osd_data->type) {
246 case CEPH_OSD_DATA_TYPE_NONE:
247 return 0;
248 case CEPH_OSD_DATA_TYPE_PAGES:
249 return osd_data->length;
250 case CEPH_OSD_DATA_TYPE_PAGELIST:
251 return (u64)osd_data->pagelist->length;
252#ifdef CONFIG_BLOCK
253 case CEPH_OSD_DATA_TYPE_BIO:
254 return (u64)osd_data->bio_length;
255#endif /* CONFIG_BLOCK */
256 default:
257 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
258 return 0;
100 } 259 }
101 req->r_num_pages = calc_pages_for(off, *plen); 260}
102 req->r_page_alignment = off & ~PAGE_MASK;
103 if (op->op == CEPH_OSD_OP_WRITE)
104 op->payload_len = *plen;
105 261
106 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 262static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
107 bno, objoff, objlen, req->r_num_pages); 263{
264 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
265 int num_pages;
108 266
109 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 267 num_pages = calc_pages_for((u64)osd_data->alignment,
110 req->r_oid_len = strlen(req->r_oid); 268 (u64)osd_data->length);
269 ceph_release_page_vector(osd_data->pages, num_pages);
270 }
271 ceph_osd_data_init(osd_data);
272}
273
274static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
275 unsigned int which)
276{
277 struct ceph_osd_req_op *op;
278
279 BUG_ON(which >= osd_req->r_num_ops);
280 op = &osd_req->r_ops[which];
111 281
112 return r; 282 switch (op->op) {
283 case CEPH_OSD_OP_READ:
284 case CEPH_OSD_OP_WRITE:
285 ceph_osd_data_release(&op->extent.osd_data);
286 break;
287 case CEPH_OSD_OP_CALL:
288 ceph_osd_data_release(&op->cls.request_info);
289 ceph_osd_data_release(&op->cls.request_data);
290 ceph_osd_data_release(&op->cls.response_data);
291 break;
292 default:
293 break;
294 }
113} 295}
114 296
115/* 297/*
@@ -117,30 +299,26 @@ static int calc_layout(struct ceph_vino vino,
117 */ 299 */
118void ceph_osdc_release_request(struct kref *kref) 300void ceph_osdc_release_request(struct kref *kref)
119{ 301{
120 struct ceph_osd_request *req = container_of(kref, 302 struct ceph_osd_request *req;
121 struct ceph_osd_request, 303 unsigned int which;
122 r_kref);
123 304
305 req = container_of(kref, struct ceph_osd_request, r_kref);
124 if (req->r_request) 306 if (req->r_request)
125 ceph_msg_put(req->r_request); 307 ceph_msg_put(req->r_request);
126 if (req->r_con_filling_msg) { 308 if (req->r_reply) {
127 dout("%s revoking msg %p from con %p\n", __func__,
128 req->r_reply, req->r_con_filling_msg);
129 ceph_msg_revoke_incoming(req->r_reply); 309 ceph_msg_revoke_incoming(req->r_reply);
130 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 req->r_con_filling_msg = NULL;
132 }
133 if (req->r_reply)
134 ceph_msg_put(req->r_reply); 310 ceph_msg_put(req->r_reply);
135 if (req->r_own_pages) 311 }
136 ceph_release_page_vector(req->r_pages, 312
137 req->r_num_pages); 313 for (which = 0; which < req->r_num_ops; which++)
314 osd_req_op_data_release(req, which);
315
138 ceph_put_snap_context(req->r_snapc); 316 ceph_put_snap_context(req->r_snapc);
139 ceph_pagelist_release(&req->r_trail);
140 if (req->r_mempool) 317 if (req->r_mempool)
141 mempool_free(req, req->r_osdc->req_mempool); 318 mempool_free(req, req->r_osdc->req_mempool);
142 else 319 else
143 kfree(req); 320 kmem_cache_free(ceph_osd_request_cache, req);
321
144} 322}
145EXPORT_SYMBOL(ceph_osdc_release_request); 323EXPORT_SYMBOL(ceph_osdc_release_request);
146 324
@@ -154,6 +332,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
154 struct ceph_msg *msg; 332 struct ceph_msg *msg;
155 size_t msg_size; 333 size_t msg_size;
156 334
335 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
336 BUG_ON(num_ops > CEPH_OSD_MAX_OP);
337
157 msg_size = 4 + 4 + 8 + 8 + 4+8; 338 msg_size = 4 + 4 + 8 + 8 + 4+8;
158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */
@@ -168,13 +349,14 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
168 req = mempool_alloc(osdc->req_mempool, gfp_flags); 349 req = mempool_alloc(osdc->req_mempool, gfp_flags);
169 memset(req, 0, sizeof(*req)); 350 memset(req, 0, sizeof(*req));
170 } else { 351 } else {
171 req = kzalloc(sizeof(*req), gfp_flags); 352 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
172 } 353 }
173 if (req == NULL) 354 if (req == NULL)
174 return NULL; 355 return NULL;
175 356
176 req->r_osdc = osdc; 357 req->r_osdc = osdc;
177 req->r_mempool = use_mempool; 358 req->r_mempool = use_mempool;
359 req->r_num_ops = num_ops;
178 360
179 kref_init(&req->r_kref); 361 kref_init(&req->r_kref);
180 init_completion(&req->r_completion); 362 init_completion(&req->r_completion);
@@ -198,8 +380,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
198 } 380 }
199 req->r_reply = msg; 381 req->r_reply = msg;
200 382
201 ceph_pagelist_init(&req->r_trail);
202
203 /* create request message; allow space for oid */ 383 /* create request message; allow space for oid */
204 if (use_mempool) 384 if (use_mempool)
205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 385 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -218,60 +398,24 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
218} 398}
219EXPORT_SYMBOL(ceph_osdc_alloc_request); 399EXPORT_SYMBOL(ceph_osdc_alloc_request);
220 400
221static void osd_req_encode_op(struct ceph_osd_request *req, 401static bool osd_req_opcode_valid(u16 opcode)
222 struct ceph_osd_op *dst,
223 struct ceph_osd_req_op *src)
224{ 402{
225 dst->op = cpu_to_le16(src->op); 403 switch (opcode) {
226
227 switch (src->op) {
228 case CEPH_OSD_OP_STAT:
229 break;
230 case CEPH_OSD_OP_READ: 404 case CEPH_OSD_OP_READ:
231 case CEPH_OSD_OP_WRITE: 405 case CEPH_OSD_OP_STAT:
232 dst->extent.offset =
233 cpu_to_le64(src->extent.offset);
234 dst->extent.length =
235 cpu_to_le64(src->extent.length);
236 dst->extent.truncate_size =
237 cpu_to_le64(src->extent.truncate_size);
238 dst->extent.truncate_seq =
239 cpu_to_le32(src->extent.truncate_seq);
240 break;
241 case CEPH_OSD_OP_CALL:
242 dst->cls.class_len = src->cls.class_len;
243 dst->cls.method_len = src->cls.method_len;
244 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
245
246 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
247 src->cls.class_len);
248 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
249 src->cls.method_len);
250 ceph_pagelist_append(&req->r_trail, src->cls.indata,
251 src->cls.indata_len);
252 break;
253 case CEPH_OSD_OP_STARTSYNC:
254 break;
255 case CEPH_OSD_OP_NOTIFY_ACK:
256 case CEPH_OSD_OP_WATCH:
257 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
258 dst->watch.ver = cpu_to_le64(src->watch.ver);
259 dst->watch.flag = src->watch.flag;
260 break;
261 default:
262 pr_err("unrecognized osd opcode %d\n", dst->op);
263 WARN_ON(1);
264 break;
265 case CEPH_OSD_OP_MAPEXT: 406 case CEPH_OSD_OP_MAPEXT:
266 case CEPH_OSD_OP_MASKTRUNC: 407 case CEPH_OSD_OP_MASKTRUNC:
267 case CEPH_OSD_OP_SPARSE_READ: 408 case CEPH_OSD_OP_SPARSE_READ:
268 case CEPH_OSD_OP_NOTIFY: 409 case CEPH_OSD_OP_NOTIFY:
410 case CEPH_OSD_OP_NOTIFY_ACK:
269 case CEPH_OSD_OP_ASSERT_VER: 411 case CEPH_OSD_OP_ASSERT_VER:
412 case CEPH_OSD_OP_WRITE:
270 case CEPH_OSD_OP_WRITEFULL: 413 case CEPH_OSD_OP_WRITEFULL:
271 case CEPH_OSD_OP_TRUNCATE: 414 case CEPH_OSD_OP_TRUNCATE:
272 case CEPH_OSD_OP_ZERO: 415 case CEPH_OSD_OP_ZERO:
273 case CEPH_OSD_OP_DELETE: 416 case CEPH_OSD_OP_DELETE:
274 case CEPH_OSD_OP_APPEND: 417 case CEPH_OSD_OP_APPEND:
418 case CEPH_OSD_OP_STARTSYNC:
275 case CEPH_OSD_OP_SETTRUNC: 419 case CEPH_OSD_OP_SETTRUNC:
276 case CEPH_OSD_OP_TRIMTRUNC: 420 case CEPH_OSD_OP_TRIMTRUNC:
277 case CEPH_OSD_OP_TMAPUP: 421 case CEPH_OSD_OP_TMAPUP:
@@ -279,11 +423,11 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
279 case CEPH_OSD_OP_TMAPGET: 423 case CEPH_OSD_OP_TMAPGET:
280 case CEPH_OSD_OP_CREATE: 424 case CEPH_OSD_OP_CREATE:
281 case CEPH_OSD_OP_ROLLBACK: 425 case CEPH_OSD_OP_ROLLBACK:
426 case CEPH_OSD_OP_WATCH:
282 case CEPH_OSD_OP_OMAPGETKEYS: 427 case CEPH_OSD_OP_OMAPGETKEYS:
283 case CEPH_OSD_OP_OMAPGETVALS: 428 case CEPH_OSD_OP_OMAPGETVALS:
284 case CEPH_OSD_OP_OMAPGETHEADER: 429 case CEPH_OSD_OP_OMAPGETHEADER:
285 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 430 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286 case CEPH_OSD_OP_MODE_RD:
287 case CEPH_OSD_OP_OMAPSETVALS: 431 case CEPH_OSD_OP_OMAPSETVALS:
288 case CEPH_OSD_OP_OMAPSETHEADER: 432 case CEPH_OSD_OP_OMAPSETHEADER:
289 case CEPH_OSD_OP_OMAPCLEAR: 433 case CEPH_OSD_OP_OMAPCLEAR:
@@ -314,113 +458,233 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
314 case CEPH_OSD_OP_RDUNLOCK: 458 case CEPH_OSD_OP_RDUNLOCK:
315 case CEPH_OSD_OP_UPLOCK: 459 case CEPH_OSD_OP_UPLOCK:
316 case CEPH_OSD_OP_DNLOCK: 460 case CEPH_OSD_OP_DNLOCK:
461 case CEPH_OSD_OP_CALL:
317 case CEPH_OSD_OP_PGLS: 462 case CEPH_OSD_OP_PGLS:
318 case CEPH_OSD_OP_PGLS_FILTER: 463 case CEPH_OSD_OP_PGLS_FILTER:
319 pr_err("unsupported osd opcode %s\n", 464 return true;
320 ceph_osd_op_name(dst->op)); 465 default:
321 WARN_ON(1); 466 return false;
322 break;
323 } 467 }
324 dst->payload_len = cpu_to_le32(src->payload_len);
325} 468}
326 469
327/* 470/*
328 * build new request AND message 471 * This is an osd op init function for opcodes that have no data or
329 * 472 * other information associated with them. It also serves as a
473 * common init routine for all the other init functions, below.
330 */ 474 */
331void ceph_osdc_build_request(struct ceph_osd_request *req, 475static struct ceph_osd_req_op *
332 u64 off, u64 len, unsigned int num_ops, 476_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
333 struct ceph_osd_req_op *src_ops, 477 u16 opcode)
334 struct ceph_snap_context *snapc, u64 snap_id,
335 struct timespec *mtime)
336{ 478{
337 struct ceph_msg *msg = req->r_request; 479 struct ceph_osd_req_op *op;
338 struct ceph_osd_req_op *src_op;
339 void *p;
340 size_t msg_size;
341 int flags = req->r_flags;
342 u64 data_len;
343 int i;
344 480
345 req->r_num_ops = num_ops; 481 BUG_ON(which >= osd_req->r_num_ops);
346 req->r_snapid = snap_id; 482 BUG_ON(!osd_req_opcode_valid(opcode));
347 req->r_snapc = ceph_get_snap_context(snapc);
348 483
349 /* encode request */ 484 op = &osd_req->r_ops[which];
350 msg->hdr.version = cpu_to_le16(4); 485 memset(op, 0, sizeof (*op));
486 op->op = opcode;
351 487
352 p = msg->front.iov_base; 488 return op;
353 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 489}
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363 490
364 /* oloc */ 491void osd_req_op_init(struct ceph_osd_request *osd_req,
365 ceph_encode_8(&p, 4); 492 unsigned int which, u16 opcode)
366 ceph_encode_8(&p, 4); 493{
367 ceph_encode_32(&p, 8 + 4 + 4); 494 (void)_osd_req_op_init(osd_req, which, opcode);
368 req->r_request_pool = p; 495}
369 p += 8; 496EXPORT_SYMBOL(osd_req_op_init);
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372 497
373 ceph_encode_8(&p, 1); 498void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
374 req->r_request_pgid = p; 499 unsigned int which, u16 opcode,
375 p += 8 + 4; 500 u64 offset, u64 length,
376 ceph_encode_32(&p, -1); /* preferred */ 501 u64 truncate_size, u32 truncate_seq)
502{
503 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
504 size_t payload_len = 0;
377 505
378 /* oid */ 506 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
379 ceph_encode_32(&p, req->r_oid_len);
380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382 p += req->r_oid_len;
383 507
384 /* ops */ 508 op->extent.offset = offset;
385 ceph_encode_16(&p, num_ops); 509 op->extent.length = length;
386 src_op = src_ops; 510 op->extent.truncate_size = truncate_size;
387 req->r_request_ops = p; 511 op->extent.truncate_seq = truncate_seq;
388 for (i = 0; i < num_ops; i++, src_op++) { 512 if (opcode == CEPH_OSD_OP_WRITE)
389 osd_req_encode_op(req, p, src_op); 513 payload_len += length;
390 p += sizeof(struct ceph_osd_op);
391 }
392 514
393 /* snaps */ 515 op->payload_len = payload_len;
394 ceph_encode_64(&p, req->r_snapid); 516}
395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 517EXPORT_SYMBOL(osd_req_op_extent_init);
396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 518
397 if (req->r_snapc) { 519void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
398 for (i = 0; i < snapc->num_snaps; i++) { 520 unsigned int which, u64 length)
399 ceph_encode_64(&p, req->r_snapc->snaps[i]); 521{
400 } 522 struct ceph_osd_req_op *op;
523 u64 previous;
524
525 BUG_ON(which >= osd_req->r_num_ops);
526 op = &osd_req->r_ops[which];
527 previous = op->extent.length;
528
529 if (length == previous)
530 return; /* Nothing to do */
531 BUG_ON(length > previous);
532
533 op->extent.length = length;
534 op->payload_len -= previous - length;
535}
536EXPORT_SYMBOL(osd_req_op_extent_update);
537
538void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
539 u16 opcode, const char *class, const char *method)
540{
541 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
542 struct ceph_pagelist *pagelist;
543 size_t payload_len = 0;
544 size_t size;
545
546 BUG_ON(opcode != CEPH_OSD_OP_CALL);
547
548 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
549 BUG_ON(!pagelist);
550 ceph_pagelist_init(pagelist);
551
552 op->cls.class_name = class;
553 size = strlen(class);
554 BUG_ON(size > (size_t) U8_MAX);
555 op->cls.class_len = size;
556 ceph_pagelist_append(pagelist, class, size);
557 payload_len += size;
558
559 op->cls.method_name = method;
560 size = strlen(method);
561 BUG_ON(size > (size_t) U8_MAX);
562 op->cls.method_len = size;
563 ceph_pagelist_append(pagelist, method, size);
564 payload_len += size;
565
566 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
567
568 op->cls.argc = 0; /* currently unused */
569
570 op->payload_len = payload_len;
571}
572EXPORT_SYMBOL(osd_req_op_cls_init);
573
574void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
575 unsigned int which, u16 opcode,
576 u64 cookie, u64 version, int flag)
577{
578 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
579
580 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
581
582 op->watch.cookie = cookie;
583 op->watch.ver = version;
584 if (opcode == CEPH_OSD_OP_WATCH && flag)
585 op->watch.flag = (u8)1;
586}
587EXPORT_SYMBOL(osd_req_op_watch_init);
588
589static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
590 struct ceph_osd_data *osd_data)
591{
592 u64 length = ceph_osd_data_length(osd_data);
593
594 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
595 BUG_ON(length > (u64) SIZE_MAX);
596 if (length)
597 ceph_msg_data_add_pages(msg, osd_data->pages,
598 length, osd_data->alignment);
599 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
600 BUG_ON(!length);
601 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
602#ifdef CONFIG_BLOCK
603 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
604 ceph_msg_data_add_bio(msg, osd_data->bio, length);
605#endif
606 } else {
607 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
401 } 608 }
609}
402 610
403 req->r_request_attempts = p; 611static u64 osd_req_encode_op(struct ceph_osd_request *req,
404 p += 4; 612 struct ceph_osd_op *dst, unsigned int which)
613{
614 struct ceph_osd_req_op *src;
615 struct ceph_osd_data *osd_data;
616 u64 request_data_len = 0;
617 u64 data_length;
405 618
406 data_len = req->r_trail.length; 619 BUG_ON(which >= req->r_num_ops);
407 if (flags & CEPH_OSD_FLAG_WRITE) { 620 src = &req->r_ops[which];
408 req->r_request->hdr.data_off = cpu_to_le16(off); 621 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
409 data_len += len; 622 pr_err("unrecognized osd opcode %d\n", src->op);
623
624 return 0;
410 } 625 }
411 req->r_request->hdr.data_len = cpu_to_le32(data_len);
412 req->r_request->page_alignment = req->r_page_alignment;
413 626
414 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 627 switch (src->op) {
415 msg_size = p - msg->front.iov_base; 628 case CEPH_OSD_OP_STAT:
416 msg->front.iov_len = msg_size; 629 osd_data = &src->raw_data_in;
417 msg->hdr.front_len = cpu_to_le32(msg_size); 630 ceph_osdc_msg_data_add(req->r_reply, osd_data);
631 break;
632 case CEPH_OSD_OP_READ:
633 case CEPH_OSD_OP_WRITE:
634 if (src->op == CEPH_OSD_OP_WRITE)
635 request_data_len = src->extent.length;
636 dst->extent.offset = cpu_to_le64(src->extent.offset);
637 dst->extent.length = cpu_to_le64(src->extent.length);
638 dst->extent.truncate_size =
639 cpu_to_le64(src->extent.truncate_size);
640 dst->extent.truncate_seq =
641 cpu_to_le32(src->extent.truncate_seq);
642 osd_data = &src->extent.osd_data;
643 if (src->op == CEPH_OSD_OP_WRITE)
644 ceph_osdc_msg_data_add(req->r_request, osd_data);
645 else
646 ceph_osdc_msg_data_add(req->r_reply, osd_data);
647 break;
648 case CEPH_OSD_OP_CALL:
649 dst->cls.class_len = src->cls.class_len;
650 dst->cls.method_len = src->cls.method_len;
651 osd_data = &src->cls.request_info;
652 ceph_osdc_msg_data_add(req->r_request, osd_data);
653 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
654 request_data_len = osd_data->pagelist->length;
655
656 osd_data = &src->cls.request_data;
657 data_length = ceph_osd_data_length(osd_data);
658 if (data_length) {
659 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
660 dst->cls.indata_len = cpu_to_le32(data_length);
661 ceph_osdc_msg_data_add(req->r_request, osd_data);
662 src->payload_len += data_length;
663 request_data_len += data_length;
664 }
665 osd_data = &src->cls.response_data;
666 ceph_osdc_msg_data_add(req->r_reply, osd_data);
667 break;
668 case CEPH_OSD_OP_STARTSYNC:
669 break;
670 case CEPH_OSD_OP_NOTIFY_ACK:
671 case CEPH_OSD_OP_WATCH:
672 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
673 dst->watch.ver = cpu_to_le64(src->watch.ver);
674 dst->watch.flag = src->watch.flag;
675 break;
676 default:
677 pr_err("unsupported osd opcode %s\n",
678 ceph_osd_op_name(src->op));
679 WARN_ON(1);
418 680
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 681 return 0;
420 num_ops); 682 }
421 return; 683 dst->op = cpu_to_le16(src->op);
684 dst->payload_len = cpu_to_le32(src->payload_len);
685
686 return request_data_len;
422} 687}
423EXPORT_SYMBOL(ceph_osdc_build_request);
424 688
425/* 689/*
426 * build new request AND message, calculate layout, and adjust file 690 * build new request AND message, calculate layout, and adjust file
@@ -436,51 +700,63 @@ EXPORT_SYMBOL(ceph_osdc_build_request);
436struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 700struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
437 struct ceph_file_layout *layout, 701 struct ceph_file_layout *layout,
438 struct ceph_vino vino, 702 struct ceph_vino vino,
439 u64 off, u64 *plen, 703 u64 off, u64 *plen, int num_ops,
440 int opcode, int flags, 704 int opcode, int flags,
441 struct ceph_snap_context *snapc, 705 struct ceph_snap_context *snapc,
442 int do_sync,
443 u32 truncate_seq, 706 u32 truncate_seq,
444 u64 truncate_size, 707 u64 truncate_size,
445 struct timespec *mtime, 708 bool use_mempool)
446 bool use_mempool,
447 int page_align)
448{ 709{
449 struct ceph_osd_req_op ops[2];
450 struct ceph_osd_request *req; 710 struct ceph_osd_request *req;
451 unsigned int num_op = 1; 711 u64 objnum = 0;
712 u64 objoff = 0;
713 u64 objlen = 0;
714 u32 object_size;
715 u64 object_base;
452 int r; 716 int r;
453 717
454 memset(&ops, 0, sizeof ops); 718 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
455
456 ops[0].op = opcode;
457 ops[0].extent.truncate_seq = truncate_seq;
458 ops[0].extent.truncate_size = truncate_size;
459
460 if (do_sync) {
461 ops[1].op = CEPH_OSD_OP_STARTSYNC;
462 num_op++;
463 }
464 719
465 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 720 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
466 GFP_NOFS); 721 GFP_NOFS);
467 if (!req) 722 if (!req)
468 return ERR_PTR(-ENOMEM); 723 return ERR_PTR(-ENOMEM);
724
469 req->r_flags = flags; 725 req->r_flags = flags;
470 726
471 /* calculate max write size */ 727 /* calculate max write size */
472 r = calc_layout(vino, layout, off, plen, req, ops); 728 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
473 if (r < 0) 729 if (r < 0) {
730 ceph_osdc_put_request(req);
474 return ERR_PTR(r); 731 return ERR_PTR(r);
475 req->r_file_layout = *layout; /* keep a copy */ 732 }
476 733
477 /* in case it differs from natural (file) alignment that 734 object_size = le32_to_cpu(layout->fl_object_size);
478 calc_layout filled in for us */ 735 object_base = off - objoff;
479 req->r_num_pages = calc_pages_for(page_align, *plen); 736 if (truncate_size <= object_base) {
480 req->r_page_alignment = page_align; 737 truncate_size = 0;
738 } else {
739 truncate_size -= object_base;
740 if (truncate_size > object_size)
741 truncate_size = object_size;
742 }
743
744 osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
745 truncate_size, truncate_seq);
746
747 /*
748 * A second op in the ops array means the caller wants to
749 * also issue a include a 'startsync' command so that the
750 * osd will flush data quickly.
751 */
752 if (num_ops > 1)
753 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
754
755 req->r_file_layout = *layout; /* keep a copy */
481 756
482 ceph_osdc_build_request(req, off, *plen, num_op, ops, 757 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
483 snapc, vino.snap, mtime); 758 vino.ino, objnum);
759 req->r_oid_len = strlen(req->r_oid);
484 760
485 return req; 761 return req;
486} 762}
@@ -558,21 +834,46 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
558 struct ceph_osd *osd) 834 struct ceph_osd *osd)
559{ 835{
560 struct ceph_osd_request *req, *nreq; 836 struct ceph_osd_request *req, *nreq;
837 LIST_HEAD(resend);
561 int err; 838 int err;
562 839
563 dout("__kick_osd_requests osd%d\n", osd->o_osd); 840 dout("__kick_osd_requests osd%d\n", osd->o_osd);
564 err = __reset_osd(osdc, osd); 841 err = __reset_osd(osdc, osd);
565 if (err) 842 if (err)
566 return; 843 return;
567 844 /*
845 * Build up a list of requests to resend by traversing the
846 * osd's list of requests. Requests for a given object are
847 * sent in tid order, and that is also the order they're
848 * kept on this list. Therefore all requests that are in
849 * flight will be found first, followed by all requests that
850 * have not yet been sent. And to resend requests while
851 * preserving this order we will want to put any sent
852 * requests back on the front of the osd client's unsent
853 * list.
854 *
855 * So we build a separate ordered list of already-sent
856 * requests for the affected osd and splice it onto the
857 * front of the osd client's unsent list. Once we've seen a
858 * request that has not yet been sent we're done. Those
859 * requests are already sitting right where they belong.
860 */
568 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 861 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
569 list_move(&req->r_req_lru_item, &osdc->req_unsent); 862 if (!req->r_sent)
570 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 863 break;
864 list_move_tail(&req->r_req_lru_item, &resend);
865 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
571 osd->o_osd); 866 osd->o_osd);
572 if (!req->r_linger) 867 if (!req->r_linger)
573 req->r_flags |= CEPH_OSD_FLAG_RETRY; 868 req->r_flags |= CEPH_OSD_FLAG_RETRY;
574 } 869 }
870 list_splice(&resend, &osdc->req_unsent);
575 871
872 /*
873 * Linger requests are re-registered before sending, which
874 * sets up a new tid for each. We add them to the unsent
875 * list at the end to keep things in tid order.
876 */
576 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 877 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
577 r_linger_osd) { 878 r_linger_osd) {
578 /* 879 /*
@@ -581,8 +882,8 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
581 */ 882 */
582 BUG_ON(!list_empty(&req->r_req_lru_item)); 883 BUG_ON(!list_empty(&req->r_req_lru_item));
583 __register_request(osdc, req); 884 __register_request(osdc, req);
584 list_add(&req->r_req_lru_item, &osdc->req_unsent); 885 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
585 list_add(&req->r_osd_item, &req->r_osd->o_requests); 886 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
586 __unregister_linger_request(osdc, req); 887 __unregister_linger_request(osdc, req);
587 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 888 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
588 osd->o_osd); 889 osd->o_osd);
@@ -654,8 +955,7 @@ static void put_osd(struct ceph_osd *osd)
654 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 955 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
655 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 956 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
656 957
657 if (ac->ops && ac->ops->destroy_authorizer) 958 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
658 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
659 kfree(osd); 959 kfree(osd);
660 } 960 }
661} 961}
@@ -820,14 +1120,6 @@ static void __register_request(struct ceph_osd_client *osdc,
820 } 1120 }
821} 1121}
822 1122
823static void register_request(struct ceph_osd_client *osdc,
824 struct ceph_osd_request *req)
825{
826 mutex_lock(&osdc->request_mutex);
827 __register_request(osdc, req);
828 mutex_unlock(&osdc->request_mutex);
829}
830
831/* 1123/*
832 * called under osdc->request_mutex 1124 * called under osdc->request_mutex
833 */ 1125 */
@@ -952,8 +1244,8 @@ static int __map_request(struct ceph_osd_client *osdc,
952 int err; 1244 int err;
953 1245
954 dout("map_request %p tid %lld\n", req, req->r_tid); 1246 dout("map_request %p tid %lld\n", req, req->r_tid);
955 err = ceph_calc_object_layout(&pgid, req->r_oid, 1247 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
956 &req->r_file_layout, osdc->osdmap); 1248 ceph_file_layout_pg_pool(req->r_file_layout));
957 if (err) { 1249 if (err) {
958 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1250 list_move(&req->r_req_lru_item, &osdc->req_notarget);
959 return err; 1251 return err;
@@ -1007,10 +1299,10 @@ static int __map_request(struct ceph_osd_client *osdc,
1007 1299
1008 if (req->r_osd) { 1300 if (req->r_osd) {
1009 __remove_osd_from_lru(req->r_osd); 1301 __remove_osd_from_lru(req->r_osd);
1010 list_add(&req->r_osd_item, &req->r_osd->o_requests); 1302 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1011 list_move(&req->r_req_lru_item, &osdc->req_unsent); 1303 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1012 } else { 1304 } else {
1013 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1305 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1014 } 1306 }
1015 err = 1; /* osd or pg changed */ 1307 err = 1; /* osd or pg changed */
1016 1308
@@ -1045,8 +1337,14 @@ static void __send_request(struct ceph_osd_client *osdc,
1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1337 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1046 1338
1047 ceph_msg_get(req->r_request); /* send consumes a ref */ 1339 ceph_msg_get(req->r_request); /* send consumes a ref */
1048 ceph_con_send(&req->r_osd->o_con, req->r_request); 1340
1341 /* Mark the request unsafe if this is the first timet's being sent. */
1342
1343 if (!req->r_sent && req->r_unsafe_callback)
1344 req->r_unsafe_callback(req, true);
1049 req->r_sent = req->r_osd->o_incarnation; 1345 req->r_sent = req->r_osd->o_incarnation;
1346
1347 ceph_con_send(&req->r_osd->o_con, req->r_request);
1050} 1348}
1051 1349
1052/* 1350/*
@@ -1134,31 +1432,11 @@ static void handle_osds_timeout(struct work_struct *work)
1134 1432
1135static void complete_request(struct ceph_osd_request *req) 1433static void complete_request(struct ceph_osd_request *req)
1136{ 1434{
1137 if (req->r_safe_callback) 1435 if (req->r_unsafe_callback)
1138 req->r_safe_callback(req, NULL); 1436 req->r_unsafe_callback(req, false);
1139 complete_all(&req->r_safe_completion); /* fsync waiter */ 1437 complete_all(&req->r_safe_completion); /* fsync waiter */
1140} 1438}
1141 1439
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1162/* 1440/*
1163 * handle osd op reply. either call the callback if it is specified, 1441 * handle osd op reply. either call the callback if it is specified,
1164 * or do the completion to wake up the waiting thread. 1442 * or do the completion to wake up the waiting thread.
@@ -1170,7 +1448,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1170 struct ceph_osd_request *req; 1448 struct ceph_osd_request *req;
1171 u64 tid; 1449 u64 tid;
1172 int object_len; 1450 int object_len;
1173 int numops, payload_len, flags; 1451 unsigned int numops;
1452 int payload_len, flags;
1174 s32 result; 1453 s32 result;
1175 s32 retry_attempt; 1454 s32 retry_attempt;
1176 struct ceph_pg pg; 1455 struct ceph_pg pg;
@@ -1178,7 +1457,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1178 u32 reassert_epoch; 1457 u32 reassert_epoch;
1179 u64 reassert_version; 1458 u64 reassert_version;
1180 u32 osdmap_epoch; 1459 u32 osdmap_epoch;
1181 int i; 1460 int already_completed;
1461 u32 bytes;
1462 unsigned int i;
1182 1463
1183 tid = le64_to_cpu(msg->hdr.tid); 1464 tid = le64_to_cpu(msg->hdr.tid);
1184 dout("handle_reply %p tid %llu\n", msg, tid); 1465 dout("handle_reply %p tid %llu\n", msg, tid);
@@ -1191,7 +1472,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1191 ceph_decode_need(&p, end, object_len, bad); 1472 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len; 1473 p += object_len;
1193 1474
1194 err = __decode_pgid(&p, end, &pg); 1475 err = ceph_decode_pgid(&p, end, &pg);
1195 if (err) 1476 if (err)
1196 goto bad; 1477 goto bad;
1197 1478
@@ -1207,8 +1488,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1207 req = __lookup_request(osdc, tid); 1488 req = __lookup_request(osdc, tid);
1208 if (req == NULL) { 1489 if (req == NULL) {
1209 dout("handle_reply tid %llu dne\n", tid); 1490 dout("handle_reply tid %llu dne\n", tid);
1210 mutex_unlock(&osdc->request_mutex); 1491 goto bad_mutex;
1211 return;
1212 } 1492 }
1213 ceph_osdc_get_request(req); 1493 ceph_osdc_get_request(req);
1214 1494
@@ -1233,9 +1513,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1233 payload_len += len; 1513 payload_len += len;
1234 p += sizeof(*op); 1514 p += sizeof(*op);
1235 } 1515 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1516 bytes = le32_to_cpu(msg->hdr.data_len);
1517 if (payload_len != bytes) {
1237 pr_warning("sum of op payload lens %d != data_len %d", 1518 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len)); 1519 payload_len, bytes);
1239 goto bad_put; 1520 goto bad_put;
1240 } 1521 }
1241 1522
@@ -1244,21 +1525,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1244 for (i = 0; i < numops; i++) 1525 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p); 1526 req->r_reply_op_result[i] = ceph_decode_32(&p);
1246 1527
1247 /*
1248 * if this connection filled our message, drop our reference now, to
1249 * avoid a (safe but slower) revoke later.
1250 */
1251 if (req->r_con_filling_msg == con && req->r_reply == msg) {
1252 dout(" dropping con_filling_msg ref %p\n", con);
1253 req->r_con_filling_msg = NULL;
1254 con->ops->put(con);
1255 }
1256
1257 if (!req->r_got_reply) { 1528 if (!req->r_got_reply) {
1258 unsigned int bytes;
1259 1529
1260 req->r_result = result; 1530 req->r_result = result;
1261 bytes = le32_to_cpu(msg->hdr.data_len);
1262 dout("handle_reply result %d bytes %d\n", req->r_result, 1531 dout("handle_reply result %d bytes %d\n", req->r_result,
1263 bytes); 1532 bytes);
1264 if (req->r_result == 0) 1533 if (req->r_result == 0)
@@ -1286,7 +1555,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1286 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1555 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1287 __unregister_request(osdc, req); 1556 __unregister_request(osdc, req);
1288 1557
1558 already_completed = req->r_completed;
1559 req->r_completed = 1;
1289 mutex_unlock(&osdc->request_mutex); 1560 mutex_unlock(&osdc->request_mutex);
1561 if (already_completed)
1562 goto done;
1290 1563
1291 if (req->r_callback) 1564 if (req->r_callback)
1292 req->r_callback(req, msg); 1565 req->r_callback(req, msg);
@@ -1303,6 +1576,8 @@ done:
1303 1576
1304bad_put: 1577bad_put:
1305 ceph_osdc_put_request(req); 1578 ceph_osdc_put_request(req);
1579bad_mutex:
1580 mutex_unlock(&osdc->request_mutex);
1306bad: 1581bad:
1307 pr_err("corrupt osd_op_reply got %d %d\n", 1582 pr_err("corrupt osd_op_reply got %d %d\n",
1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1583 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -1736,6 +2011,104 @@ bad:
1736} 2011}
1737 2012
1738/* 2013/*
2014 * build new request AND message
2015 *
2016 */
2017void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
2018 struct ceph_snap_context *snapc, u64 snap_id,
2019 struct timespec *mtime)
2020{
2021 struct ceph_msg *msg = req->r_request;
2022 void *p;
2023 size_t msg_size;
2024 int flags = req->r_flags;
2025 u64 data_len;
2026 unsigned int i;
2027
2028 req->r_snapid = snap_id;
2029 req->r_snapc = ceph_get_snap_context(snapc);
2030
2031 /* encode request */
2032 msg->hdr.version = cpu_to_le16(4);
2033
2034 p = msg->front.iov_base;
2035 ceph_encode_32(&p, 1); /* client_inc is always 1 */
2036 req->r_request_osdmap_epoch = p;
2037 p += 4;
2038 req->r_request_flags = p;
2039 p += 4;
2040 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
2041 ceph_encode_timespec(p, mtime);
2042 p += sizeof(struct ceph_timespec);
2043 req->r_request_reassert_version = p;
2044 p += sizeof(struct ceph_eversion); /* will get filled in */
2045
2046 /* oloc */
2047 ceph_encode_8(&p, 4);
2048 ceph_encode_8(&p, 4);
2049 ceph_encode_32(&p, 8 + 4 + 4);
2050 req->r_request_pool = p;
2051 p += 8;
2052 ceph_encode_32(&p, -1); /* preferred */
2053 ceph_encode_32(&p, 0); /* key len */
2054
2055 ceph_encode_8(&p, 1);
2056 req->r_request_pgid = p;
2057 p += 8 + 4;
2058 ceph_encode_32(&p, -1); /* preferred */
2059
2060 /* oid */
2061 ceph_encode_32(&p, req->r_oid_len);
2062 memcpy(p, req->r_oid, req->r_oid_len);
2063 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
2064 p += req->r_oid_len;
2065
2066 /* ops--can imply data */
2067 ceph_encode_16(&p, (u16)req->r_num_ops);
2068 data_len = 0;
2069 for (i = 0; i < req->r_num_ops; i++) {
2070 data_len += osd_req_encode_op(req, p, i);
2071 p += sizeof(struct ceph_osd_op);
2072 }
2073
2074 /* snaps */
2075 ceph_encode_64(&p, req->r_snapid);
2076 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
2077 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
2078 if (req->r_snapc) {
2079 for (i = 0; i < snapc->num_snaps; i++) {
2080 ceph_encode_64(&p, req->r_snapc->snaps[i]);
2081 }
2082 }
2083
2084 req->r_request_attempts = p;
2085 p += 4;
2086
2087 /* data */
2088 if (flags & CEPH_OSD_FLAG_WRITE) {
2089 u16 data_off;
2090
2091 /*
2092 * The header "data_off" is a hint to the receiver
2093 * allowing it to align received data into its
2094 * buffers such that there's no need to re-copy
2095 * it before writing it to disk (direct I/O).
2096 */
2097 data_off = (u16) (off & 0xffff);
2098 req->r_request->hdr.data_off = cpu_to_le16(data_off);
2099 }
2100 req->r_request->hdr.data_len = cpu_to_le32(data_len);
2101
2102 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
2103 msg_size = p - msg->front.iov_base;
2104 msg->front.iov_len = msg_size;
2105 msg->hdr.front_len = cpu_to_le32(msg_size);
2106
2107 dout("build_request msg_size was %d\n", (int)msg_size);
2108}
2109EXPORT_SYMBOL(ceph_osdc_build_request);
2110
2111/*
1739 * Register request, send initial attempt. 2112 * Register request, send initial attempt.
1740 */ 2113 */
1741int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2114int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -1744,41 +2117,26 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1744{ 2117{
1745 int rc = 0; 2118 int rc = 0;
1746 2119
1747 req->r_request->pages = req->r_pages;
1748 req->r_request->nr_pages = req->r_num_pages;
1749#ifdef CONFIG_BLOCK
1750 req->r_request->bio = req->r_bio;
1751#endif
1752 req->r_request->trail = &req->r_trail;
1753
1754 register_request(osdc, req);
1755
1756 down_read(&osdc->map_sem); 2120 down_read(&osdc->map_sem);
1757 mutex_lock(&osdc->request_mutex); 2121 mutex_lock(&osdc->request_mutex);
1758 /* 2122 __register_request(osdc, req);
1759 * a racing kick_requests() may have sent the message for us 2123 WARN_ON(req->r_sent);
1760 * while we dropped request_mutex above, so only send now if 2124 rc = __map_request(osdc, req, 0);
1761 * the request still han't been touched yet. 2125 if (rc < 0) {
1762 */ 2126 if (nofail) {
1763 if (req->r_sent == 0) { 2127 dout("osdc_start_request failed map, "
1764 rc = __map_request(osdc, req, 0); 2128 " will retry %lld\n", req->r_tid);
1765 if (rc < 0) { 2129 rc = 0;
1766 if (nofail) {
1767 dout("osdc_start_request failed map, "
1768 " will retry %lld\n", req->r_tid);
1769 rc = 0;
1770 }
1771 goto out_unlock;
1772 }
1773 if (req->r_osd == NULL) {
1774 dout("send_request %p no up osds in pg\n", req);
1775 ceph_monc_request_next_osdmap(&osdc->client->monc);
1776 } else {
1777 __send_request(osdc, req);
1778 } 2130 }
1779 rc = 0; 2131 goto out_unlock;
1780 } 2132 }
1781 2133 if (req->r_osd == NULL) {
2134 dout("send_request %p no up osds in pg\n", req);
2135 ceph_monc_request_next_osdmap(&osdc->client->monc);
2136 } else {
2137 __send_queued(osdc);
2138 }
2139 rc = 0;
1782out_unlock: 2140out_unlock:
1783 mutex_unlock(&osdc->request_mutex); 2141 mutex_unlock(&osdc->request_mutex);
1784 up_read(&osdc->map_sem); 2142 up_read(&osdc->map_sem);
@@ -1940,18 +2298,22 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1940 2298
1941 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2299 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1942 vino.snap, off, *plen); 2300 vino.snap, off, *plen);
1943 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 2301 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
1944 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2302 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1945 NULL, 0, truncate_seq, truncate_size, NULL, 2303 NULL, truncate_seq, truncate_size,
1946 false, page_align); 2304 false);
1947 if (IS_ERR(req)) 2305 if (IS_ERR(req))
1948 return PTR_ERR(req); 2306 return PTR_ERR(req);
1949 2307
1950 /* it may be a short read due to an object boundary */ 2308 /* it may be a short read due to an object boundary */
1951 req->r_pages = pages;
1952 2309
1953 dout("readpages final extent is %llu~%llu (%d pages align %d)\n", 2310 osd_req_op_extent_osd_data_pages(req, 0,
1954 off, *plen, req->r_num_pages, page_align); 2311 pages, *plen, page_align, false, false);
2312
2313 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
2314 off, *plen, *plen, page_align);
2315
2316 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
1955 2317
1956 rc = ceph_osdc_start_request(osdc, req, false); 2318 rc = ceph_osdc_start_request(osdc, req, false);
1957 if (!rc) 2319 if (!rc)
@@ -1978,20 +2340,21 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1978 int rc = 0; 2340 int rc = 0;
1979 int page_align = off & ~PAGE_MASK; 2341 int page_align = off & ~PAGE_MASK;
1980 2342
1981 BUG_ON(vino.snap != CEPH_NOSNAP); 2343 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
1982 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2344 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
1983 CEPH_OSD_OP_WRITE, 2345 CEPH_OSD_OP_WRITE,
1984 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2346 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1985 snapc, 0, 2347 snapc, truncate_seq, truncate_size,
1986 truncate_seq, truncate_size, mtime, 2348 true);
1987 true, page_align);
1988 if (IS_ERR(req)) 2349 if (IS_ERR(req))
1989 return PTR_ERR(req); 2350 return PTR_ERR(req);
1990 2351
1991 /* it may be a short write due to an object boundary */ 2352 /* it may be a short write due to an object boundary */
1992 req->r_pages = pages; 2353 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
1993 dout("writepages %llu~%llu (%d pages)\n", off, len, 2354 false, false);
1994 req->r_num_pages); 2355 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2356
2357 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
1995 2358
1996 rc = ceph_osdc_start_request(osdc, req, true); 2359 rc = ceph_osdc_start_request(osdc, req, true);
1997 if (!rc) 2360 if (!rc)
@@ -2005,6 +2368,26 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2005} 2368}
2006EXPORT_SYMBOL(ceph_osdc_writepages); 2369EXPORT_SYMBOL(ceph_osdc_writepages);
2007 2370
2371int ceph_osdc_setup(void)
2372{
2373 BUG_ON(ceph_osd_request_cache);
2374 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
2375 sizeof (struct ceph_osd_request),
2376 __alignof__(struct ceph_osd_request),
2377 0, NULL);
2378
2379 return ceph_osd_request_cache ? 0 : -ENOMEM;
2380}
2381EXPORT_SYMBOL(ceph_osdc_setup);
2382
2383void ceph_osdc_cleanup(void)
2384{
2385 BUG_ON(!ceph_osd_request_cache);
2386 kmem_cache_destroy(ceph_osd_request_cache);
2387 ceph_osd_request_cache = NULL;
2388}
2389EXPORT_SYMBOL(ceph_osdc_cleanup);
2390
2008/* 2391/*
2009 * handle incoming message 2392 * handle incoming message
2010 */ 2393 */
@@ -2064,13 +2447,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2064 goto out; 2447 goto out;
2065 } 2448 }
2066 2449
2067 if (req->r_con_filling_msg) { 2450 if (req->r_reply->con)
2068 dout("%s revoking msg %p from old con %p\n", __func__, 2451 dout("%s revoking msg %p from old con %p\n", __func__,
2069 req->r_reply, req->r_con_filling_msg); 2452 req->r_reply, req->r_reply->con);
2070 ceph_msg_revoke_incoming(req->r_reply); 2453 ceph_msg_revoke_incoming(req->r_reply);
2071 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2072 req->r_con_filling_msg = NULL;
2073 }
2074 2454
2075 if (front > req->r_reply->front.iov_len) { 2455 if (front > req->r_reply->front.iov_len) {
2076 pr_warning("get_reply front %d > preallocated %d\n", 2456 pr_warning("get_reply front %d > preallocated %d\n",
@@ -2084,26 +2464,29 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2084 m = ceph_msg_get(req->r_reply); 2464 m = ceph_msg_get(req->r_reply);
2085 2465
2086 if (data_len > 0) { 2466 if (data_len > 0) {
2087 int want = calc_pages_for(req->r_page_alignment, data_len); 2467 struct ceph_osd_data *osd_data;
2088 2468
2089 if (req->r_pages && unlikely(req->r_num_pages < want)) { 2469 /*
2090 pr_warning("tid %lld reply has %d bytes %d pages, we" 2470 * XXX This is assuming there is only one op containing
2091 " had only %d pages ready\n", tid, data_len, 2471 * XXX page data. Probably OK for reads, but this
2092 want, req->r_num_pages); 2472 * XXX ought to be done more generally.
2093 *skip = 1; 2473 */
2094 ceph_msg_put(m); 2474 osd_data = osd_req_op_extent_osd_data(req, 0);
2095 m = NULL; 2475 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
2096 goto out; 2476 if (osd_data->pages &&
2477 unlikely(osd_data->length < data_len)) {
2478
2479 pr_warning("tid %lld reply has %d bytes "
2480 "we had only %llu bytes ready\n",
2481 tid, data_len, osd_data->length);
2482 *skip = 1;
2483 ceph_msg_put(m);
2484 m = NULL;
2485 goto out;
2486 }
2097 } 2487 }
2098 m->pages = req->r_pages;
2099 m->nr_pages = req->r_num_pages;
2100 m->page_alignment = req->r_page_alignment;
2101#ifdef CONFIG_BLOCK
2102 m->bio = req->r_bio;
2103#endif
2104 } 2488 }
2105 *skip = 0; 2489 *skip = 0;
2106 req->r_con_filling_msg = con->ops->get(con);
2107 dout("get_reply tid %lld %p\n", tid, m); 2490 dout("get_reply tid %lld %p\n", tid, m);
2108 2491
2109out: 2492out:
@@ -2168,13 +2551,17 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2168 struct ceph_auth_handshake *auth = &o->o_auth; 2551 struct ceph_auth_handshake *auth = &o->o_auth;
2169 2552
2170 if (force_new && auth->authorizer) { 2553 if (force_new && auth->authorizer) {
2171 if (ac->ops && ac->ops->destroy_authorizer) 2554 ceph_auth_destroy_authorizer(ac, auth->authorizer);
2172 ac->ops->destroy_authorizer(ac, auth->authorizer);
2173 auth->authorizer = NULL; 2555 auth->authorizer = NULL;
2174 } 2556 }
2175 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 2557 if (!auth->authorizer) {
2176 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2558 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2177 auth); 2559 auth);
2560 if (ret)
2561 return ERR_PTR(ret);
2562 } else {
2563 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2564 auth);
2178 if (ret) 2565 if (ret)
2179 return ERR_PTR(ret); 2566 return ERR_PTR(ret);
2180 } 2567 }
@@ -2190,11 +2577,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
2190 struct ceph_osd_client *osdc = o->o_osdc; 2577 struct ceph_osd_client *osdc = o->o_osdc;
2191 struct ceph_auth_client *ac = osdc->client->monc.auth; 2578 struct ceph_auth_client *ac = osdc->client->monc.auth;
2192 2579
2193 /* 2580 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2194 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2195 * XXX which do we do: succeed or fail?
2196 */
2197 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2198} 2581}
2199 2582
2200static int invalidate_authorizer(struct ceph_connection *con) 2583static int invalidate_authorizer(struct ceph_connection *con)
@@ -2203,9 +2586,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
2203 struct ceph_osd_client *osdc = o->o_osdc; 2586 struct ceph_osd_client *osdc = o->o_osdc;
2204 struct ceph_auth_client *ac = osdc->client->monc.auth; 2587 struct ceph_auth_client *ac = osdc->client->monc.auth;
2205 2588
2206 if (ac->ops && ac->ops->invalidate_authorizer) 2589 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2207 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2208
2209 return ceph_monc_validate_auth(&osdc->client->monc); 2590 return ceph_monc_validate_auth(&osdc->client->monc);
2210} 2591}
2211 2592
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 4543b9aba40c..603ddd92db19 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -654,24 +654,6 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
654 return 0; 654 return 0;
655} 655}
656 656
657static int __decode_pgid(void **p, void *end, struct ceph_pg *pg)
658{
659 u8 v;
660
661 ceph_decode_need(p, end, 1+8+4+4, bad);
662 v = ceph_decode_8(p);
663 if (v != 1)
664 goto bad;
665 pg->pool = ceph_decode_64(p);
666 pg->seed = ceph_decode_32(p);
667 *p += 4; /* skip preferred */
668 return 0;
669
670bad:
671 dout("error decoding pgid\n");
672 return -EINVAL;
673}
674
675/* 657/*
676 * decode a full map. 658 * decode a full map.
677 */ 659 */
@@ -765,7 +747,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
765 struct ceph_pg pgid; 747 struct ceph_pg pgid;
766 struct ceph_pg_mapping *pg; 748 struct ceph_pg_mapping *pg;
767 749
768 err = __decode_pgid(p, end, &pgid); 750 err = ceph_decode_pgid(p, end, &pgid);
769 if (err) 751 if (err)
770 goto bad; 752 goto bad;
771 ceph_decode_need(p, end, sizeof(u32), bad); 753 ceph_decode_need(p, end, sizeof(u32), bad);
@@ -983,7 +965,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
983 struct ceph_pg pgid; 965 struct ceph_pg pgid;
984 u32 pglen; 966 u32 pglen;
985 967
986 err = __decode_pgid(p, end, &pgid); 968 err = ceph_decode_pgid(p, end, &pgid);
987 if (err) 969 if (err)
988 goto bad; 970 goto bad;
989 ceph_decode_need(p, end, sizeof(u32), bad); 971 ceph_decode_need(p, end, sizeof(u32), bad);
@@ -1111,27 +1093,22 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1111 * calculate an object layout (i.e. pgid) from an oid, 1093 * calculate an object layout (i.e. pgid) from an oid,
1112 * file_layout, and osdmap 1094 * file_layout, and osdmap
1113 */ 1095 */
1114int ceph_calc_object_layout(struct ceph_pg *pg, 1096int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid,
1115 const char *oid, 1097 struct ceph_osdmap *osdmap, uint64_t pool)
1116 struct ceph_file_layout *fl,
1117 struct ceph_osdmap *osdmap)
1118{ 1098{
1119 unsigned int num, num_mask; 1099 struct ceph_pg_pool_info *pool_info;
1120 struct ceph_pg_pool_info *pool;
1121 1100
1122 BUG_ON(!osdmap); 1101 BUG_ON(!osdmap);
1123 pg->pool = le32_to_cpu(fl->fl_pg_pool); 1102 pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool);
1124 pool = __lookup_pg_pool(&osdmap->pg_pools, pg->pool); 1103 if (!pool_info)
1125 if (!pool)
1126 return -EIO; 1104 return -EIO;
1127 pg->seed = ceph_str_hash(pool->object_hash, oid, strlen(oid)); 1105 pg->pool = pool;
1128 num = pool->pg_num; 1106 pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid));
1129 num_mask = pool->pg_num_mask;
1130 1107
1131 dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pg->pool, pg->seed); 1108 dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed);
1132 return 0; 1109 return 0;
1133} 1110}
1134EXPORT_SYMBOL(ceph_calc_object_layout); 1111EXPORT_SYMBOL(ceph_calc_ceph_pg);
1135 1112
1136/* 1113/*
1137 * Calculate raw osd vector for the given pgid. Return pointer to osd 1114 * Calculate raw osd vector for the given pgid. Return pointer to osd
diff --git a/net/ceph/snapshot.c b/net/ceph/snapshot.c
new file mode 100644
index 000000000000..154683f5f14c
--- /dev/null
+++ b/net/ceph/snapshot.c
@@ -0,0 +1,78 @@
1/*
2 * snapshot.c Ceph snapshot context utility routines (part of libceph)
3 *
4 * Copyright (C) 2013 Inktank Storage, Inc.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * version 2 as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
19 */
20
21#include <stddef.h>
22
23#include <linux/types.h>
24#include <linux/export.h>
25#include <linux/ceph/libceph.h>
26
27/*
28 * Ceph snapshot contexts are reference counted objects, and the
29 * returned structure holds a single reference. Acquire additional
30 * references with ceph_get_snap_context(), and release them with
31 * ceph_put_snap_context(). When the reference count reaches zero
32 * the entire structure is freed.
33 */
34
35/*
36 * Create a new ceph snapshot context large enough to hold the
37 * indicated number of snapshot ids (which can be 0). Caller has
38 * to fill in snapc->seq and snapc->snaps[0..snap_count-1].
39 *
40 * Returns a null pointer if an error occurs.
41 */
42struct ceph_snap_context *ceph_create_snap_context(u32 snap_count,
43 gfp_t gfp_flags)
44{
45 struct ceph_snap_context *snapc;
46 size_t size;
47
48 size = sizeof (struct ceph_snap_context);
49 size += snap_count * sizeof (snapc->snaps[0]);
50 snapc = kzalloc(size, gfp_flags);
51 if (!snapc)
52 return NULL;
53
54 atomic_set(&snapc->nref, 1);
55 snapc->num_snaps = snap_count;
56
57 return snapc;
58}
59EXPORT_SYMBOL(ceph_create_snap_context);
60
61struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
62{
63 if (sc)
64 atomic_inc(&sc->nref);
65 return sc;
66}
67EXPORT_SYMBOL(ceph_get_snap_context);
68
69void ceph_put_snap_context(struct ceph_snap_context *sc)
70{
71 if (!sc)
72 return;
73 if (atomic_dec_and_test(&sc->nref)) {
74 /*printk(" deleting snap_context %p\n", sc);*/
75 kfree(sc);
76 }
77}
78EXPORT_SYMBOL(ceph_put_snap_context);