aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2013-02-28 20:43:09 -0500
commit1cf0209c431fa7790253c532039d53b0773193aa (patch)
tree24310eaaf4c9583988d9098f6c85a4a34970b5b9 /net/ceph
parentde1a2262b006220dae2561a299a6ea128c46f4fe (diff)
parent83ca14fdd35821554058e5fd4fa7b118ee504a33 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "A few groups of patches here. Alex has been hard at work improving the RBD code, layout groundwork for understanding the new formats and doing layering. Most of the infrastructure is now in place for the final bits that will come with the next window. There are a few changes to the data layout. Jim Schutt's patch fixes some non-ideal CRUSH behavior, and a set of patches from me updates the client to speak a newer version of the protocol and implement an improved hashing strategy across storage nodes (when the server side supports it too). A pair of patches from Sam Lang fix the atomicity of open+create operations. Several patches from Yan, Zheng fix various mds/client issues that turned up during multi-mds torture tests. A final set of patches expose file layouts via virtual xattrs, and allow the policies to be set on directories via xattrs as well (avoiding the awkward ioctl interface and providing a consistent interface for both kernel mount and ceph-fuse users)." * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (143 commits) libceph: add support for HASHPSPOOL pool flag libceph: update osd request/reply encoding libceph: calculate placement based on the internal data types ceph: update support for PGID64, PGPOOL3, OSDENC protocol features ceph: update "ceph_features.h" libceph: decode into cpu-native ceph_pg type libceph: rename ceph_pg -> ceph_pg_v1 rbd: pass length, not op for osd completions rbd: move rbd_osd_trivial_callback() libceph: use a do..while loop in con_work() libceph: use a flag to indicate a fault has occurred libceph: separate non-locked fault handling libceph: encapsulate connection backoff libceph: eliminate sparse warnings ceph: eliminate sparse warnings in fs code rbd: eliminate sparse warnings libceph: define connection flag helpers rbd: normalize dout() calls rbd: barriers are hard rbd: ignore zero-length requests ...
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/ceph_common.c22
-rw-r--r--net/ceph/ceph_strings.c39
-rw-r--r--net/ceph/crush/mapper.c15
-rw-r--r--net/ceph/crypto.c7
-rw-r--r--net/ceph/debugfs.c29
-rw-r--r--net/ceph/messenger.c260
-rw-r--r--net/ceph/mon_client.c2
-rw-r--r--net/ceph/osd_client.c635
-rw-r--r--net/ceph/osdmap.c290
-rw-r--r--net/ceph/pagevec.c24
10 files changed, 749 insertions, 574 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 1deb29af82fd..e65e6e4be38b 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -28,6 +28,22 @@
28#include "crypto.h" 28#include "crypto.h"
29 29
30 30
31/*
32 * Module compatibility interface. For now it doesn't do anything,
33 * but its existence signals a certain level of functionality.
34 *
35 * The data buffer is used to pass information both to and from
36 * libceph. The return value indicates whether libceph determines
37 * it is compatible with the caller (from another kernel module),
38 * given the provided data.
39 *
40 * The data pointer can be null.
41 */
42bool libceph_compatible(void *data)
43{
44 return true;
45}
46EXPORT_SYMBOL(libceph_compatible);
31 47
32/* 48/*
33 * find filename portion of a path (/foo/bar/baz -> baz) 49 * find filename portion of a path (/foo/bar/baz -> baz)
@@ -590,10 +606,8 @@ static int __init init_ceph_lib(void)
590 if (ret < 0) 606 if (ret < 0)
591 goto out_crypto; 607 goto out_crypto;
592 608
593 pr_info("loaded (mon/osd proto %d/%d, osdmap %d/%d %d/%d)\n", 609 pr_info("loaded (mon/osd proto %d/%d)\n",
594 CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL, 610 CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL);
595 CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
596 CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
597 611
598 return 0; 612 return 0;
599 613
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3fbda04de29c..1348df96fe15 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op)
21 switch (op) { 21 switch (op) {
22 case CEPH_OSD_OP_READ: return "read"; 22 case CEPH_OSD_OP_READ: return "read";
23 case CEPH_OSD_OP_STAT: return "stat"; 23 case CEPH_OSD_OP_STAT: return "stat";
24 case CEPH_OSD_OP_MAPEXT: return "mapext";
25 case CEPH_OSD_OP_SPARSE_READ: return "sparse-read";
26 case CEPH_OSD_OP_NOTIFY: return "notify";
27 case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
28 case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
24 29
25 case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; 30 case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
26 31
32 case CEPH_OSD_OP_CREATE: return "create";
27 case CEPH_OSD_OP_WRITE: return "write"; 33 case CEPH_OSD_OP_WRITE: return "write";
28 case CEPH_OSD_OP_DELETE: return "delete"; 34 case CEPH_OSD_OP_DELETE: return "delete";
29 case CEPH_OSD_OP_TRUNCATE: return "truncate"; 35 case CEPH_OSD_OP_TRUNCATE: return "truncate";
@@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op)
39 case CEPH_OSD_OP_TMAPUP: return "tmapup"; 45 case CEPH_OSD_OP_TMAPUP: return "tmapup";
40 case CEPH_OSD_OP_TMAPGET: return "tmapget"; 46 case CEPH_OSD_OP_TMAPGET: return "tmapget";
41 case CEPH_OSD_OP_TMAPPUT: return "tmapput"; 47 case CEPH_OSD_OP_TMAPPUT: return "tmapput";
48 case CEPH_OSD_OP_WATCH: return "watch";
49
50 case CEPH_OSD_OP_CLONERANGE: return "clonerange";
51 case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
52 case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
42 53
43 case CEPH_OSD_OP_GETXATTR: return "getxattr"; 54 case CEPH_OSD_OP_GETXATTR: return "getxattr";
44 case CEPH_OSD_OP_GETXATTRS: return "getxattrs"; 55 case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
@@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op)
53 case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; 64 case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
54 case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; 65 case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
55 case CEPH_OSD_OP_SCRUB: return "scrub"; 66 case CEPH_OSD_OP_SCRUB: return "scrub";
67 case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
68 case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
69 case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
70 case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
56 71
57 case CEPH_OSD_OP_WRLOCK: return "wrlock"; 72 case CEPH_OSD_OP_WRLOCK: return "wrlock";
58 case CEPH_OSD_OP_WRUNLOCK: return "wrunlock"; 73 case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
@@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op)
64 case CEPH_OSD_OP_CALL: return "call"; 79 case CEPH_OSD_OP_CALL: return "call";
65 80
66 case CEPH_OSD_OP_PGLS: return "pgls"; 81 case CEPH_OSD_OP_PGLS: return "pgls";
82 case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
83 case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
84 case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
85 case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
86 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
87 case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
88 case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
89 case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
90 case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
67 } 91 }
68 return "???"; 92 return "???";
69} 93}
70 94
95const char *ceph_osd_state_name(int s)
96{
97 switch (s) {
98 case CEPH_OSD_EXISTS:
99 return "exists";
100 case CEPH_OSD_UP:
101 return "up";
102 case CEPH_OSD_AUTOOUT:
103 return "autoout";
104 case CEPH_OSD_NEW:
105 return "new";
106 default:
107 return "???";
108 }
109}
71 110
72const char *ceph_pool_op_name(int op) 111const char *ceph_pool_op_name(int op)
73{ 112{
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 35fce755ce10..cbd06a91941c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
287 * @outpos: our position in that vector 287 * @outpos: our position in that vector
288 * @firstn: true if choosing "first n" items, false if choosing "indep" 288 * @firstn: true if choosing "first n" items, false if choosing "indep"
289 * @recurse_to_leaf: true if we want one device under each item of given type 289 * @recurse_to_leaf: true if we want one device under each item of given type
290 * @descend_once: true if we should only try one descent before giving up
290 * @out2: second output vector for leaf items (if @recurse_to_leaf) 291 * @out2: second output vector for leaf items (if @recurse_to_leaf)
291 */ 292 */
292static int crush_choose(const struct crush_map *map, 293static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
295 int x, int numrep, int type, 296 int x, int numrep, int type,
296 int *out, int outpos, 297 int *out, int outpos,
297 int firstn, int recurse_to_leaf, 298 int firstn, int recurse_to_leaf,
298 int *out2) 299 int descend_once, int *out2)
299{ 300{
300 int rep; 301 int rep;
301 unsigned int ftotal, flocal; 302 unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
391 } 392 }
392 393
393 reject = 0; 394 reject = 0;
394 if (recurse_to_leaf) { 395 if (!collide && recurse_to_leaf) {
395 if (item < 0) { 396 if (item < 0) {
396 if (crush_choose(map, 397 if (crush_choose(map,
397 map->buckets[-1-item], 398 map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
399 x, outpos+1, 0, 400 x, outpos+1, 0,
400 out2, outpos, 401 out2, outpos,
401 firstn, 0, 402 firstn, 0,
403 map->chooseleaf_descend_once,
402 NULL) <= outpos) 404 NULL) <= outpos)
403 /* didn't get leaf */ 405 /* didn't get leaf */
404 reject = 1; 406 reject = 1;
@@ -422,7 +424,10 @@ reject:
422 ftotal++; 424 ftotal++;
423 flocal++; 425 flocal++;
424 426
425 if (collide && flocal <= map->choose_local_tries) 427 if (reject && descend_once)
428 /* let outer call try again */
429 skip_rep = 1;
430 else if (collide && flocal <= map->choose_local_tries)
426 /* retry locally a few times */ 431 /* retry locally a few times */
427 retry_bucket = 1; 432 retry_bucket = 1;
428 else if (map->choose_local_fallback_tries > 0 && 433 else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
485 int i, j; 490 int i, j;
486 int numrep; 491 int numrep;
487 int firstn; 492 int firstn;
493 const int descend_once = 0;
488 494
489 if ((__u32)ruleno >= map->max_rules) { 495 if ((__u32)ruleno >= map->max_rules) {
490 dprintk(" bad ruleno %d\n", ruleno); 496 dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
544 curstep->arg2, 550 curstep->arg2,
545 o+osize, j, 551 o+osize, j,
546 firstn, 552 firstn,
547 recurse_to_leaf, c+osize); 553 recurse_to_leaf,
554 descend_once, c+osize);
548 } 555 }
549 556
550 if (recurse_to_leaf) 557 if (recurse_to_leaf)
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index af14cb425164..6e7a236525b6 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -423,7 +423,8 @@ int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
423 } 423 }
424} 424}
425 425
426int ceph_key_instantiate(struct key *key, struct key_preparsed_payload *prep) 426static int ceph_key_instantiate(struct key *key,
427 struct key_preparsed_payload *prep)
427{ 428{
428 struct ceph_crypto_key *ckey; 429 struct ceph_crypto_key *ckey;
429 size_t datalen = prep->datalen; 430 size_t datalen = prep->datalen;
@@ -458,12 +459,12 @@ err:
458 return ret; 459 return ret;
459} 460}
460 461
461int ceph_key_match(const struct key *key, const void *description) 462static int ceph_key_match(const struct key *key, const void *description)
462{ 463{
463 return strcmp(key->description, description) == 0; 464 return strcmp(key->description, description) == 0;
464} 465}
465 466
466void ceph_key_destroy(struct key *key) { 467static void ceph_key_destroy(struct key *key) {
467 struct ceph_crypto_key *ckey = key->payload.data; 468 struct ceph_crypto_key *ckey = key->payload.data;
468 469
469 ceph_crypto_key_destroy(ckey); 470 ceph_crypto_key_destroy(ckey);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 38b5dc1823d4..00d051f4894e 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -66,9 +66,9 @@ static int osdmap_show(struct seq_file *s, void *p)
66 for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { 66 for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
67 struct ceph_pg_pool_info *pool = 67 struct ceph_pg_pool_info *pool =
68 rb_entry(n, struct ceph_pg_pool_info, node); 68 rb_entry(n, struct ceph_pg_pool_info, node);
69 seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n", 69 seq_printf(s, "pg_pool %llu pg_num %d / %d\n",
70 pool->id, pool->v.pg_num, pool->pg_num_mask, 70 (unsigned long long)pool->id, pool->pg_num,
71 pool->v.lpg_num, pool->lpg_num_mask); 71 pool->pg_num_mask);
72 } 72 }
73 for (i = 0; i < client->osdc.osdmap->max_osd; i++) { 73 for (i = 0; i < client->osdc.osdmap->max_osd; i++) {
74 struct ceph_entity_addr *addr = 74 struct ceph_entity_addr *addr =
@@ -123,26 +123,16 @@ static int osdc_show(struct seq_file *s, void *pp)
123 mutex_lock(&osdc->request_mutex); 123 mutex_lock(&osdc->request_mutex);
124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 124 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
125 struct ceph_osd_request *req; 125 struct ceph_osd_request *req;
126 struct ceph_osd_request_head *head; 126 int opcode;
127 struct ceph_osd_op *op;
128 int num_ops;
129 int opcode, olen;
130 int i; 127 int i;
131 128
132 req = rb_entry(p, struct ceph_osd_request, r_node); 129 req = rb_entry(p, struct ceph_osd_request, r_node);
133 130
134 seq_printf(s, "%lld\tosd%d\t%d.%x\t", req->r_tid, 131 seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid,
135 req->r_osd ? req->r_osd->o_osd : -1, 132 req->r_osd ? req->r_osd->o_osd : -1,
136 le32_to_cpu(req->r_pgid.pool), 133 req->r_pgid.pool, req->r_pgid.seed);
137 le16_to_cpu(req->r_pgid.ps));
138 134
139 head = req->r_request->front.iov_base; 135 seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
140 op = (void *)(head + 1);
141
142 num_ops = le16_to_cpu(head->num_ops);
143 olen = le32_to_cpu(head->object_len);
144 seq_printf(s, "%.*s", olen,
145 (const char *)(head->ops + num_ops));
146 136
147 if (req->r_reassert_version.epoch) 137 if (req->r_reassert_version.epoch)
148 seq_printf(s, "\t%u'%llu", 138 seq_printf(s, "\t%u'%llu",
@@ -151,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
151 else 141 else
152 seq_printf(s, "\t"); 142 seq_printf(s, "\t");
153 143
154 for (i = 0; i < num_ops; i++) { 144 for (i = 0; i < req->r_num_ops; i++) {
155 opcode = le16_to_cpu(op->op); 145 opcode = le16_to_cpu(req->r_request_ops[i].op);
156 seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); 146 seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
157 op++;
158 } 147 }
159 148
160 seq_printf(s, "\n"); 149 seq_printf(s, "\n");
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5ccf87ed8d68..2c0669fb54e3 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -9,8 +9,9 @@
9#include <linux/slab.h> 9#include <linux/slab.h>
10#include <linux/socket.h> 10#include <linux/socket.h>
11#include <linux/string.h> 11#include <linux/string.h>
12#ifdef CONFIG_BLOCK
12#include <linux/bio.h> 13#include <linux/bio.h>
13#include <linux/blkdev.h> 14#endif /* CONFIG_BLOCK */
14#include <linux/dns_resolver.h> 15#include <linux/dns_resolver.h>
15#include <net/tcp.h> 16#include <net/tcp.h>
16 17
@@ -97,6 +98,57 @@
97#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ 98#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
98#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ 99#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
99 100
101static bool con_flag_valid(unsigned long con_flag)
102{
103 switch (con_flag) {
104 case CON_FLAG_LOSSYTX:
105 case CON_FLAG_KEEPALIVE_PENDING:
106 case CON_FLAG_WRITE_PENDING:
107 case CON_FLAG_SOCK_CLOSED:
108 case CON_FLAG_BACKOFF:
109 return true;
110 default:
111 return false;
112 }
113}
114
115static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
116{
117 BUG_ON(!con_flag_valid(con_flag));
118
119 clear_bit(con_flag, &con->flags);
120}
121
122static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
123{
124 BUG_ON(!con_flag_valid(con_flag));
125
126 set_bit(con_flag, &con->flags);
127}
128
129static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
130{
131 BUG_ON(!con_flag_valid(con_flag));
132
133 return test_bit(con_flag, &con->flags);
134}
135
136static bool con_flag_test_and_clear(struct ceph_connection *con,
137 unsigned long con_flag)
138{
139 BUG_ON(!con_flag_valid(con_flag));
140
141 return test_and_clear_bit(con_flag, &con->flags);
142}
143
144static bool con_flag_test_and_set(struct ceph_connection *con,
145 unsigned long con_flag)
146{
147 BUG_ON(!con_flag_valid(con_flag));
148
149 return test_and_set_bit(con_flag, &con->flags);
150}
151
100/* static tag bytes (protocol control messages) */ 152/* static tag bytes (protocol control messages) */
101static char tag_msg = CEPH_MSGR_TAG_MSG; 153static char tag_msg = CEPH_MSGR_TAG_MSG;
102static char tag_ack = CEPH_MSGR_TAG_ACK; 154static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -114,7 +166,7 @@ static struct lock_class_key socket_class;
114 166
115static void queue_con(struct ceph_connection *con); 167static void queue_con(struct ceph_connection *con);
116static void con_work(struct work_struct *); 168static void con_work(struct work_struct *);
117static void ceph_fault(struct ceph_connection *con); 169static void con_fault(struct ceph_connection *con);
118 170
119/* 171/*
120 * Nicely render a sockaddr as a string. An array of formatted 172 * Nicely render a sockaddr as a string. An array of formatted
@@ -171,7 +223,7 @@ static void encode_my_addr(struct ceph_messenger *msgr)
171 */ 223 */
172static struct workqueue_struct *ceph_msgr_wq; 224static struct workqueue_struct *ceph_msgr_wq;
173 225
174void _ceph_msgr_exit(void) 226static void _ceph_msgr_exit(void)
175{ 227{
176 if (ceph_msgr_wq) { 228 if (ceph_msgr_wq) {
177 destroy_workqueue(ceph_msgr_wq); 229 destroy_workqueue(ceph_msgr_wq);
@@ -308,7 +360,7 @@ static void ceph_sock_write_space(struct sock *sk)
308 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 360 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
309 * and net/core/stream.c:sk_stream_write_space(). 361 * and net/core/stream.c:sk_stream_write_space().
310 */ 362 */
311 if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { 363 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
312 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 364 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
313 dout("%s %p queueing write work\n", __func__, con); 365 dout("%s %p queueing write work\n", __func__, con);
314 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 366 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -333,7 +385,7 @@ static void ceph_sock_state_change(struct sock *sk)
333 case TCP_CLOSE_WAIT: 385 case TCP_CLOSE_WAIT:
334 dout("%s TCP_CLOSE_WAIT\n", __func__); 386 dout("%s TCP_CLOSE_WAIT\n", __func__);
335 con_sock_state_closing(con); 387 con_sock_state_closing(con);
336 set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); 388 con_flag_set(con, CON_FLAG_SOCK_CLOSED);
337 queue_con(con); 389 queue_con(con);
338 break; 390 break;
339 case TCP_ESTABLISHED: 391 case TCP_ESTABLISHED:
@@ -474,7 +526,7 @@ static int con_close_socket(struct ceph_connection *con)
474 * received a socket close event before we had the chance to 526 * received a socket close event before we had the chance to
475 * shut the socket down. 527 * shut the socket down.
476 */ 528 */
477 clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); 529 con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
478 530
479 con_sock_state_closed(con); 531 con_sock_state_closed(con);
480 return rc; 532 return rc;
@@ -538,11 +590,10 @@ void ceph_con_close(struct ceph_connection *con)
538 ceph_pr_addr(&con->peer_addr.in_addr)); 590 ceph_pr_addr(&con->peer_addr.in_addr));
539 con->state = CON_STATE_CLOSED; 591 con->state = CON_STATE_CLOSED;
540 592
541 clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ 593 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */
542 clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); 594 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
543 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); 595 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
544 clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); 596 con_flag_clear(con, CON_FLAG_BACKOFF);
545 clear_bit(CON_FLAG_BACKOFF, &con->flags);
546 597
547 reset_connection(con); 598 reset_connection(con);
548 con->peer_global_seq = 0; 599 con->peer_global_seq = 0;
@@ -798,7 +849,7 @@ static void prepare_write_message(struct ceph_connection *con)
798 /* no, queue up footer too and be done */ 849 /* no, queue up footer too and be done */
799 prepare_write_message_footer(con); 850 prepare_write_message_footer(con);
800 851
801 set_bit(CON_FLAG_WRITE_PENDING, &con->flags); 852 con_flag_set(con, CON_FLAG_WRITE_PENDING);
802} 853}
803 854
804/* 855/*
@@ -819,7 +870,7 @@ static void prepare_write_ack(struct ceph_connection *con)
819 &con->out_temp_ack); 870 &con->out_temp_ack);
820 871
821 con->out_more = 1; /* more will follow.. eventually.. */ 872 con->out_more = 1; /* more will follow.. eventually.. */
822 set_bit(CON_FLAG_WRITE_PENDING, &con->flags); 873 con_flag_set(con, CON_FLAG_WRITE_PENDING);
823} 874}
824 875
825/* 876/*
@@ -830,7 +881,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
830 dout("prepare_write_keepalive %p\n", con); 881 dout("prepare_write_keepalive %p\n", con);
831 con_out_kvec_reset(con); 882 con_out_kvec_reset(con);
832 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 883 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
833 set_bit(CON_FLAG_WRITE_PENDING, &con->flags); 884 con_flag_set(con, CON_FLAG_WRITE_PENDING);
834} 885}
835 886
836/* 887/*
@@ -873,7 +924,7 @@ static void prepare_write_banner(struct ceph_connection *con)
873 &con->msgr->my_enc_addr); 924 &con->msgr->my_enc_addr);
874 925
875 con->out_more = 0; 926 con->out_more = 0;
876 set_bit(CON_FLAG_WRITE_PENDING, &con->flags); 927 con_flag_set(con, CON_FLAG_WRITE_PENDING);
877} 928}
878 929
879static int prepare_write_connect(struct ceph_connection *con) 930static int prepare_write_connect(struct ceph_connection *con)
@@ -923,7 +974,7 @@ static int prepare_write_connect(struct ceph_connection *con)
923 auth->authorizer_buf); 974 auth->authorizer_buf);
924 975
925 con->out_more = 0; 976 con->out_more = 0;
926 set_bit(CON_FLAG_WRITE_PENDING, &con->flags); 977 con_flag_set(con, CON_FLAG_WRITE_PENDING);
927 978
928 return 0; 979 return 0;
929} 980}
@@ -1643,7 +1694,7 @@ static int process_connect(struct ceph_connection *con)
1643 le32_to_cpu(con->in_reply.connect_seq)); 1694 le32_to_cpu(con->in_reply.connect_seq));
1644 1695
1645 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1696 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1646 set_bit(CON_FLAG_LOSSYTX, &con->flags); 1697 con_flag_set(con, CON_FLAG_LOSSYTX);
1647 1698
1648 con->delay = 0; /* reset backoff memory */ 1699 con->delay = 0; /* reset backoff memory */
1649 1700
@@ -2080,15 +2131,14 @@ do_next:
2080 prepare_write_ack(con); 2131 prepare_write_ack(con);
2081 goto more; 2132 goto more;
2082 } 2133 }
2083 if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, 2134 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
2084 &con->flags)) {
2085 prepare_write_keepalive(con); 2135 prepare_write_keepalive(con);
2086 goto more; 2136 goto more;
2087 } 2137 }
2088 } 2138 }
2089 2139
2090 /* Nothing to do! */ 2140 /* Nothing to do! */
2091 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); 2141 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
2092 dout("try_write nothing else to write.\n"); 2142 dout("try_write nothing else to write.\n");
2093 ret = 0; 2143 ret = 0;
2094out: 2144out:
@@ -2268,7 +2318,7 @@ static void queue_con(struct ceph_connection *con)
2268 2318
2269static bool con_sock_closed(struct ceph_connection *con) 2319static bool con_sock_closed(struct ceph_connection *con)
2270{ 2320{
2271 if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) 2321 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
2272 return false; 2322 return false;
2273 2323
2274#define CASE(x) \ 2324#define CASE(x) \
@@ -2295,6 +2345,41 @@ static bool con_sock_closed(struct ceph_connection *con)
2295 return true; 2345 return true;
2296} 2346}
2297 2347
2348static bool con_backoff(struct ceph_connection *con)
2349{
2350 int ret;
2351
2352 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
2353 return false;
2354
2355 ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2356 if (ret) {
2357 dout("%s: con %p FAILED to back off %lu\n", __func__,
2358 con, con->delay);
2359 BUG_ON(ret == -ENOENT);
2360 con_flag_set(con, CON_FLAG_BACKOFF);
2361 }
2362
2363 return true;
2364}
2365
2366/* Finish fault handling; con->mutex must *not* be held here */
2367
2368static void con_fault_finish(struct ceph_connection *con)
2369{
2370 /*
2371 * in case we faulted due to authentication, invalidate our
2372 * current tickets so that we can get new ones.
2373 */
2374 if (con->auth_retry && con->ops->invalidate_authorizer) {
2375 dout("calling invalidate_authorizer()\n");
2376 con->ops->invalidate_authorizer(con);
2377 }
2378
2379 if (con->ops->fault)
2380 con->ops->fault(con);
2381}
2382
2298/* 2383/*
2299 * Do some work on a connection. Drop a connection ref when we're done. 2384 * Do some work on a connection. Drop a connection ref when we're done.
2300 */ 2385 */
@@ -2302,73 +2387,68 @@ static void con_work(struct work_struct *work)
2302{ 2387{
2303 struct ceph_connection *con = container_of(work, struct ceph_connection, 2388 struct ceph_connection *con = container_of(work, struct ceph_connection,
2304 work.work); 2389 work.work);
2305 int ret; 2390 bool fault;
2306 2391
2307 mutex_lock(&con->mutex); 2392 mutex_lock(&con->mutex);
2308restart: 2393 while (true) {
2309 if (con_sock_closed(con)) 2394 int ret;
2310 goto fault;
2311 2395
2312 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { 2396 if ((fault = con_sock_closed(con))) {
2313 dout("con_work %p backing off\n", con); 2397 dout("%s: con %p SOCK_CLOSED\n", __func__, con);
2314 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); 2398 break;
2315 if (ret) { 2399 }
2316 dout("con_work %p FAILED to back off %lu\n", con, 2400 if (con_backoff(con)) {
2317 con->delay); 2401 dout("%s: con %p BACKOFF\n", __func__, con);
2318 BUG_ON(ret == -ENOENT); 2402 break;
2319 set_bit(CON_FLAG_BACKOFF, &con->flags); 2403 }
2404 if (con->state == CON_STATE_STANDBY) {
2405 dout("%s: con %p STANDBY\n", __func__, con);
2406 break;
2407 }
2408 if (con->state == CON_STATE_CLOSED) {
2409 dout("%s: con %p CLOSED\n", __func__, con);
2410 BUG_ON(con->sock);
2411 break;
2412 }
2413 if (con->state == CON_STATE_PREOPEN) {
2414 dout("%s: con %p PREOPEN\n", __func__, con);
2415 BUG_ON(con->sock);
2320 } 2416 }
2321 goto done;
2322 }
2323 2417
2324 if (con->state == CON_STATE_STANDBY) { 2418 ret = try_read(con);
2325 dout("con_work %p STANDBY\n", con); 2419 if (ret < 0) {
2326 goto done; 2420 if (ret == -EAGAIN)
2327 } 2421 continue;
2328 if (con->state == CON_STATE_CLOSED) { 2422 con->error_msg = "socket error on read";
2329 dout("con_work %p CLOSED\n", con); 2423 fault = true;
2330 BUG_ON(con->sock); 2424 break;
2331 goto done; 2425 }
2332 }
2333 if (con->state == CON_STATE_PREOPEN) {
2334 dout("con_work OPENING\n");
2335 BUG_ON(con->sock);
2336 }
2337 2426
2338 ret = try_read(con); 2427 ret = try_write(con);
2339 if (ret == -EAGAIN) 2428 if (ret < 0) {
2340 goto restart; 2429 if (ret == -EAGAIN)
2341 if (ret < 0) { 2430 continue;
2342 con->error_msg = "socket error on read"; 2431 con->error_msg = "socket error on write";
2343 goto fault; 2432 fault = true;
2344 } 2433 }
2345 2434
2346 ret = try_write(con); 2435 break; /* If we make it to here, we're done */
2347 if (ret == -EAGAIN)
2348 goto restart;
2349 if (ret < 0) {
2350 con->error_msg = "socket error on write";
2351 goto fault;
2352 } 2436 }
2353 2437 if (fault)
2354done: 2438 con_fault(con);
2355 mutex_unlock(&con->mutex); 2439 mutex_unlock(&con->mutex);
2356done_unlocked:
2357 con->ops->put(con);
2358 return;
2359 2440
2360fault: 2441 if (fault)
2361 ceph_fault(con); /* error/fault path */ 2442 con_fault_finish(con);
2362 goto done_unlocked;
2363}
2364 2443
2444 con->ops->put(con);
2445}
2365 2446
2366/* 2447/*
2367 * Generic error/fault handler. A retry mechanism is used with 2448 * Generic error/fault handler. A retry mechanism is used with
2368 * exponential backoff 2449 * exponential backoff
2369 */ 2450 */
2370static void ceph_fault(struct ceph_connection *con) 2451static void con_fault(struct ceph_connection *con)
2371 __releases(con->mutex)
2372{ 2452{
2373 pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2453 pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2374 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2454 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
@@ -2381,10 +2461,10 @@ static void ceph_fault(struct ceph_connection *con)
2381 2461
2382 con_close_socket(con); 2462 con_close_socket(con);
2383 2463
2384 if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { 2464 if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
2385 dout("fault on LOSSYTX channel, marking CLOSED\n"); 2465 dout("fault on LOSSYTX channel, marking CLOSED\n");
2386 con->state = CON_STATE_CLOSED; 2466 con->state = CON_STATE_CLOSED;
2387 goto out_unlock; 2467 return;
2388 } 2468 }
2389 2469
2390 if (con->in_msg) { 2470 if (con->in_msg) {
@@ -2401,9 +2481,9 @@ static void ceph_fault(struct ceph_connection *con)
2401 /* If there are no messages queued or keepalive pending, place 2481 /* If there are no messages queued or keepalive pending, place
2402 * the connection in a STANDBY state */ 2482 * the connection in a STANDBY state */
2403 if (list_empty(&con->out_queue) && 2483 if (list_empty(&con->out_queue) &&
2404 !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { 2484 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
2405 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2485 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2406 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); 2486 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
2407 con->state = CON_STATE_STANDBY; 2487 con->state = CON_STATE_STANDBY;
2408 } else { 2488 } else {
2409 /* retry after a delay. */ 2489 /* retry after a delay. */
@@ -2412,23 +2492,9 @@ static void ceph_fault(struct ceph_connection *con)
2412 con->delay = BASE_DELAY_INTERVAL; 2492 con->delay = BASE_DELAY_INTERVAL;
2413 else if (con->delay < MAX_DELAY_INTERVAL) 2493 else if (con->delay < MAX_DELAY_INTERVAL)
2414 con->delay *= 2; 2494 con->delay *= 2;
2415 set_bit(CON_FLAG_BACKOFF, &con->flags); 2495 con_flag_set(con, CON_FLAG_BACKOFF);
2416 queue_con(con); 2496 queue_con(con);
2417 } 2497 }
2418
2419out_unlock:
2420 mutex_unlock(&con->mutex);
2421 /*
2422 * in case we faulted due to authentication, invalidate our
2423 * current tickets so that we can get new ones.
2424 */
2425 if (con->auth_retry && con->ops->invalidate_authorizer) {
2426 dout("calling invalidate_authorizer()\n");
2427 con->ops->invalidate_authorizer(con);
2428 }
2429
2430 if (con->ops->fault)
2431 con->ops->fault(con);
2432} 2498}
2433 2499
2434 2500
@@ -2469,8 +2535,8 @@ static void clear_standby(struct ceph_connection *con)
2469 dout("clear_standby %p and ++connect_seq\n", con); 2535 dout("clear_standby %p and ++connect_seq\n", con);
2470 con->state = CON_STATE_PREOPEN; 2536 con->state = CON_STATE_PREOPEN;
2471 con->connect_seq++; 2537 con->connect_seq++;
2472 WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); 2538 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
2473 WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); 2539 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
2474 } 2540 }
2475} 2541}
2476 2542
@@ -2511,7 +2577,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2511 2577
2512 /* if there wasn't anything waiting to send before, queue 2578 /* if there wasn't anything waiting to send before, queue
2513 * new work */ 2579 * new work */
2514 if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) 2580 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
2515 queue_con(con); 2581 queue_con(con);
2516} 2582}
2517EXPORT_SYMBOL(ceph_con_send); 2583EXPORT_SYMBOL(ceph_con_send);
@@ -2600,8 +2666,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
2600 mutex_lock(&con->mutex); 2666 mutex_lock(&con->mutex);
2601 clear_standby(con); 2667 clear_standby(con);
2602 mutex_unlock(&con->mutex); 2668 mutex_unlock(&con->mutex);
2603 if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && 2669 if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 &&
2604 test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) 2670 con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
2605 queue_con(con); 2671 queue_con(con);
2606} 2672}
2607EXPORT_SYMBOL(ceph_con_keepalive); 2673EXPORT_SYMBOL(ceph_con_keepalive);
@@ -2651,9 +2717,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2651 m->page_alignment = 0; 2717 m->page_alignment = 0;
2652 m->pages = NULL; 2718 m->pages = NULL;
2653 m->pagelist = NULL; 2719 m->pagelist = NULL;
2720#ifdef CONFIG_BLOCK
2654 m->bio = NULL; 2721 m->bio = NULL;
2655 m->bio_iter = NULL; 2722 m->bio_iter = NULL;
2656 m->bio_seg = 0; 2723 m->bio_seg = 0;
2724#endif /* CONFIG_BLOCK */
2657 m->trail = NULL; 2725 m->trail = NULL;
2658 2726
2659 /* front */ 2727 /* front */
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 812eb3b46c1f..aef5b1062bee 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -697,7 +697,7 @@ int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
697 u32 pool, u64 snapid) 697 u32 pool, u64 snapid)
698{ 698{
699 return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 699 return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
700 pool, snapid, 0, 0); 700 pool, snapid, NULL, 0);
701 701
702} 702}
703 703
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..d730dd4d8eb2 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -23,7 +23,7 @@
23 23
24static const struct ceph_connection_operations osd_con_ops; 24static const struct ceph_connection_operations osd_con_ops;
25 25
26static void send_queued(struct ceph_osd_client *osdc); 26static void __send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc, 28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req); 29 struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 33 struct ceph_osd_request *req);
34 34
35static int op_needs_trail(int op)
36{
37 switch (op) {
38 case CEPH_OSD_OP_GETXATTR:
39 case CEPH_OSD_OP_SETXATTR:
40 case CEPH_OSD_OP_CMPXATTR:
41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
43 return 1;
44 default:
45 return 0;
46 }
47}
48
49static int op_has_extent(int op) 35static int op_has_extent(int op)
50{ 36{
51 return (op == CEPH_OSD_OP_READ || 37 return (op == CEPH_OSD_OP_READ ||
52 op == CEPH_OSD_OP_WRITE); 38 op == CEPH_OSD_OP_WRITE);
53} 39}
54 40
55int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56 struct ceph_file_layout *layout,
57 u64 snapid,
58 u64 off, u64 *plen, u64 *bno,
59 struct ceph_osd_request *req,
60 struct ceph_osd_req_op *op)
61{
62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63 u64 orig_len = *plen;
64 u64 objoff, objlen; /* extent in object */
65 int r;
66
67 reqhead->snapid = cpu_to_le64(snapid);
68
69 /* object extent? */
70 r = ceph_calc_file_object_mapping(layout, off, plen, bno,
71 &objoff, &objlen);
72 if (r < 0)
73 return r;
74 if (*plen < orig_len)
75 dout(" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen);
77
78 if (op_has_extent(op->op)) {
79 op->extent.offset = objoff;
80 op->extent.length = objlen;
81 }
82 req->r_num_pages = calc_pages_for(off, *plen);
83 req->r_page_alignment = off & ~PAGE_MASK;
84 if (op->op == CEPH_OSD_OP_WRITE)
85 op->payload_len = *plen;
86
87 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
88 *bno, objoff, objlen, req->r_num_pages);
89 return 0;
90}
91EXPORT_SYMBOL(ceph_calc_raw_layout);
92
93/* 41/*
94 * Implement client access to distributed object storage cluster. 42 * Implement client access to distributed object storage cluster.
95 * 43 *
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
115 * 63 *
116 * fill osd op in request message. 64 * fill osd op in request message.
117 */ 65 */
118static int calc_layout(struct ceph_osd_client *osdc, 66static int calc_layout(struct ceph_vino vino,
119 struct ceph_vino vino,
120 struct ceph_file_layout *layout, 67 struct ceph_file_layout *layout,
121 u64 off, u64 *plen, 68 u64 off, u64 *plen,
122 struct ceph_osd_request *req, 69 struct ceph_osd_request *req,
123 struct ceph_osd_req_op *op) 70 struct ceph_osd_req_op *op)
124{ 71{
125 u64 bno; 72 u64 orig_len = *plen;
73 u64 bno = 0;
74 u64 objoff = 0;
75 u64 objlen = 0;
126 int r; 76 int r;
127 77
128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 78 /* object extent? */
129 plen, &bno, req, op); 79 r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
80 &objoff, &objlen);
130 if (r < 0) 81 if (r < 0)
131 return r; 82 return r;
83 if (objlen < orig_len) {
84 *plen = objlen;
85 dout(" skipping last %llu, final file extent %llu~%llu\n",
86 orig_len - *plen, off, *plen);
87 }
88
89 if (op_has_extent(op->op)) {
90 u32 osize = le32_to_cpu(layout->fl_object_size);
91 op->extent.offset = objoff;
92 op->extent.length = objlen;
93 if (op->extent.truncate_size <= off - objoff) {
94 op->extent.truncate_size = 0;
95 } else {
96 op->extent.truncate_size -= off - objoff;
97 if (op->extent.truncate_size > osize)
98 op->extent.truncate_size = osize;
99 }
100 }
101 req->r_num_pages = calc_pages_for(off, *plen);
102 req->r_page_alignment = off & ~PAGE_MASK;
103 if (op->op == CEPH_OSD_OP_WRITE)
104 op->payload_len = *plen;
105
106 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
107 bno, objoff, objlen, req->r_num_pages);
132 108
133 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 109 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
134 req->r_oid_len = strlen(req->r_oid); 110 req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
148 if (req->r_request) 124 if (req->r_request)
149 ceph_msg_put(req->r_request); 125 ceph_msg_put(req->r_request);
150 if (req->r_con_filling_msg) { 126 if (req->r_con_filling_msg) {
151 dout("%s revoking pages %p from con %p\n", __func__, 127 dout("%s revoking msg %p from con %p\n", __func__,
152 req->r_pages, req->r_con_filling_msg); 128 req->r_reply, req->r_con_filling_msg);
153 ceph_msg_revoke_incoming(req->r_reply); 129 ceph_msg_revoke_incoming(req->r_reply);
154 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 130 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 req->r_con_filling_msg = NULL;
155 } 132 }
156 if (req->r_reply) 133 if (req->r_reply)
157 ceph_msg_put(req->r_reply); 134 ceph_msg_put(req->r_reply);
158 if (req->r_own_pages) 135 if (req->r_own_pages)
159 ceph_release_page_vector(req->r_pages, 136 ceph_release_page_vector(req->r_pages,
160 req->r_num_pages); 137 req->r_num_pages);
161#ifdef CONFIG_BLOCK
162 if (req->r_bio)
163 bio_put(req->r_bio);
164#endif
165 ceph_put_snap_context(req->r_snapc); 138 ceph_put_snap_context(req->r_snapc);
166 if (req->r_trail) { 139 ceph_pagelist_release(&req->r_trail);
167 ceph_pagelist_release(req->r_trail);
168 kfree(req->r_trail);
169 }
170 if (req->r_mempool) 140 if (req->r_mempool)
171 mempool_free(req, req->r_osdc->req_mempool); 141 mempool_free(req, req->r_osdc->req_mempool);
172 else 142 else
@@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref)
174} 144}
175EXPORT_SYMBOL(ceph_osdc_release_request); 145EXPORT_SYMBOL(ceph_osdc_release_request);
176 146
177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178{
179 int i = 0;
180
181 if (needs_trail)
182 *needs_trail = 0;
183 while (ops[i].op) {
184 if (needs_trail && op_needs_trail(ops[i].op))
185 *needs_trail = 1;
186 i++;
187 }
188
189 return i;
190}
191
192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 int flags,
194 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
195 struct ceph_osd_req_op *ops, 149 unsigned int num_ops,
196 bool use_mempool, 150 bool use_mempool,
197 gfp_t gfp_flags, 151 gfp_t gfp_flags)
198 struct page **pages,
199 struct bio *bio)
200{ 152{
201 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
202 struct ceph_msg *msg; 154 struct ceph_msg *msg;
203 int needs_trail; 155 size_t msg_size;
204 int num_op = get_num_ops(ops, &needs_trail); 156
205 size_t msg_size = sizeof(struct ceph_osd_request_head); 157 msg_size = 4 + 4 + 8 + 8 + 4+8;
206 158 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
207 msg_size += num_op*sizeof(struct ceph_osd_op); 159 msg_size += 1 + 8 + 4 + 4; /* pg_t */
160 msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 msg_size += 8; /* snapid */
163 msg_size += 8; /* snap_seq */
164 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
165 msg_size += 4;
208 166
209 if (use_mempool) { 167 if (use_mempool) {
210 req = mempool_alloc(osdc->req_mempool, gfp_flags); 168 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
228 INIT_LIST_HEAD(&req->r_req_lru_item); 186 INIT_LIST_HEAD(&req->r_req_lru_item);
229 INIT_LIST_HEAD(&req->r_osd_item); 187 INIT_LIST_HEAD(&req->r_osd_item);
230 188
231 req->r_flags = flags;
232
233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
234
235 /* create reply message */ 189 /* create reply message */
236 if (use_mempool) 190 if (use_mempool)
237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 191 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
244 } 198 }
245 req->r_reply = msg; 199 req->r_reply = msg;
246 200
247 /* allocate space for the trailing data */ 201 ceph_pagelist_init(&req->r_trail);
248 if (needs_trail) {
249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
250 if (!req->r_trail) {
251 ceph_osdc_put_request(req);
252 return NULL;
253 }
254 ceph_pagelist_init(req->r_trail);
255 }
256 202
257 /* create request message; allow space for oid */ 203 /* create request message; allow space for oid */
258 msg_size += MAX_OBJ_NAME_SIZE;
259 if (snapc)
260 msg_size += sizeof(u64) * snapc->num_snaps;
261 if (use_mempool) 204 if (use_mempool)
262 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 205 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
263 else 206 else
@@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
270 memset(msg->front.iov_base, 0, msg->front.iov_len); 213 memset(msg->front.iov_base, 0, msg->front.iov_len);
271 214
272 req->r_request = msg; 215 req->r_request = msg;
273 req->r_pages = pages;
274#ifdef CONFIG_BLOCK
275 if (bio) {
276 req->r_bio = bio;
277 bio_get(req->r_bio);
278 }
279#endif
280 216
281 return req; 217 return req;
282} 218}
@@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
289 dst->op = cpu_to_le16(src->op); 225 dst->op = cpu_to_le16(src->op);
290 226
291 switch (src->op) { 227 switch (src->op) {
228 case CEPH_OSD_OP_STAT:
229 break;
292 case CEPH_OSD_OP_READ: 230 case CEPH_OSD_OP_READ:
293 case CEPH_OSD_OP_WRITE: 231 case CEPH_OSD_OP_WRITE:
294 dst->extent.offset = 232 dst->extent.offset =
@@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
300 dst->extent.truncate_seq = 238 dst->extent.truncate_seq =
301 cpu_to_le32(src->extent.truncate_seq); 239 cpu_to_le32(src->extent.truncate_seq);
302 break; 240 break;
303
304 case CEPH_OSD_OP_GETXATTR:
305 case CEPH_OSD_OP_SETXATTR:
306 case CEPH_OSD_OP_CMPXATTR:
307 BUG_ON(!req->r_trail);
308
309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
311 dst->xattr.cmp_op = src->xattr.cmp_op;
312 dst->xattr.cmp_mode = src->xattr.cmp_mode;
313 ceph_pagelist_append(req->r_trail, src->xattr.name,
314 src->xattr.name_len);
315 ceph_pagelist_append(req->r_trail, src->xattr.val,
316 src->xattr.value_len);
317 break;
318 case CEPH_OSD_OP_CALL: 241 case CEPH_OSD_OP_CALL:
319 BUG_ON(!req->r_trail);
320
321 dst->cls.class_len = src->cls.class_len; 242 dst->cls.class_len = src->cls.class_len;
322 dst->cls.method_len = src->cls.method_len; 243 dst->cls.method_len = src->cls.method_len;
323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 244 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
324 245
325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 246 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
326 src->cls.class_len); 247 src->cls.class_len);
327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 248 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
328 src->cls.method_len); 249 src->cls.method_len);
329 ceph_pagelist_append(req->r_trail, src->cls.indata, 250 ceph_pagelist_append(&req->r_trail, src->cls.indata,
330 src->cls.indata_len); 251 src->cls.indata_len);
331 break; 252 break;
332 case CEPH_OSD_OP_ROLLBACK:
333 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
334 break;
335 case CEPH_OSD_OP_STARTSYNC: 253 case CEPH_OSD_OP_STARTSYNC:
336 break; 254 break;
337 case CEPH_OSD_OP_NOTIFY:
338 {
339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
340 __le32 timeout = cpu_to_le32(src->watch.timeout);
341
342 BUG_ON(!req->r_trail);
343
344 ceph_pagelist_append(req->r_trail,
345 &prot_ver, sizeof(prot_ver));
346 ceph_pagelist_append(req->r_trail,
347 &timeout, sizeof(timeout));
348 }
349 case CEPH_OSD_OP_NOTIFY_ACK: 255 case CEPH_OSD_OP_NOTIFY_ACK:
350 case CEPH_OSD_OP_WATCH: 256 case CEPH_OSD_OP_WATCH:
351 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 257 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
356 pr_err("unrecognized osd opcode %d\n", dst->op); 262 pr_err("unrecognized osd opcode %d\n", dst->op);
357 WARN_ON(1); 263 WARN_ON(1);
358 break; 264 break;
265 case CEPH_OSD_OP_MAPEXT:
266 case CEPH_OSD_OP_MASKTRUNC:
267 case CEPH_OSD_OP_SPARSE_READ:
268 case CEPH_OSD_OP_NOTIFY:
269 case CEPH_OSD_OP_ASSERT_VER:
270 case CEPH_OSD_OP_WRITEFULL:
271 case CEPH_OSD_OP_TRUNCATE:
272 case CEPH_OSD_OP_ZERO:
273 case CEPH_OSD_OP_DELETE:
274 case CEPH_OSD_OP_APPEND:
275 case CEPH_OSD_OP_SETTRUNC:
276 case CEPH_OSD_OP_TRIMTRUNC:
277 case CEPH_OSD_OP_TMAPUP:
278 case CEPH_OSD_OP_TMAPPUT:
279 case CEPH_OSD_OP_TMAPGET:
280 case CEPH_OSD_OP_CREATE:
281 case CEPH_OSD_OP_ROLLBACK:
282 case CEPH_OSD_OP_OMAPGETKEYS:
283 case CEPH_OSD_OP_OMAPGETVALS:
284 case CEPH_OSD_OP_OMAPGETHEADER:
285 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286 case CEPH_OSD_OP_MODE_RD:
287 case CEPH_OSD_OP_OMAPSETVALS:
288 case CEPH_OSD_OP_OMAPSETHEADER:
289 case CEPH_OSD_OP_OMAPCLEAR:
290 case CEPH_OSD_OP_OMAPRMKEYS:
291 case CEPH_OSD_OP_OMAP_CMP:
292 case CEPH_OSD_OP_CLONERANGE:
293 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
294 case CEPH_OSD_OP_SRC_CMPXATTR:
295 case CEPH_OSD_OP_GETXATTR:
296 case CEPH_OSD_OP_GETXATTRS:
297 case CEPH_OSD_OP_CMPXATTR:
298 case CEPH_OSD_OP_SETXATTR:
299 case CEPH_OSD_OP_SETXATTRS:
300 case CEPH_OSD_OP_RESETXATTRS:
301 case CEPH_OSD_OP_RMXATTR:
302 case CEPH_OSD_OP_PULL:
303 case CEPH_OSD_OP_PUSH:
304 case CEPH_OSD_OP_BALANCEREADS:
305 case CEPH_OSD_OP_UNBALANCEREADS:
306 case CEPH_OSD_OP_SCRUB:
307 case CEPH_OSD_OP_SCRUB_RESERVE:
308 case CEPH_OSD_OP_SCRUB_UNRESERVE:
309 case CEPH_OSD_OP_SCRUB_STOP:
310 case CEPH_OSD_OP_SCRUB_MAP:
311 case CEPH_OSD_OP_WRLOCK:
312 case CEPH_OSD_OP_WRUNLOCK:
313 case CEPH_OSD_OP_RDLOCK:
314 case CEPH_OSD_OP_RDUNLOCK:
315 case CEPH_OSD_OP_UPLOCK:
316 case CEPH_OSD_OP_DNLOCK:
317 case CEPH_OSD_OP_PGLS:
318 case CEPH_OSD_OP_PGLS_FILTER:
319 pr_err("unsupported osd opcode %s\n",
320 ceph_osd_op_name(dst->op));
321 WARN_ON(1);
322 break;
359 } 323 }
360 dst->payload_len = cpu_to_le32(src->payload_len); 324 dst->payload_len = cpu_to_le32(src->payload_len);
361} 325}
@@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
365 * 329 *
366 */ 330 */
367void ceph_osdc_build_request(struct ceph_osd_request *req, 331void ceph_osdc_build_request(struct ceph_osd_request *req,
368 u64 off, u64 *plen, 332 u64 off, u64 len, unsigned int num_ops,
369 struct ceph_osd_req_op *src_ops, 333 struct ceph_osd_req_op *src_ops,
370 struct ceph_snap_context *snapc, 334 struct ceph_snap_context *snapc, u64 snap_id,
371 struct timespec *mtime, 335 struct timespec *mtime)
372 const char *oid,
373 int oid_len)
374{ 336{
375 struct ceph_msg *msg = req->r_request; 337 struct ceph_msg *msg = req->r_request;
376 struct ceph_osd_request_head *head;
377 struct ceph_osd_req_op *src_op; 338 struct ceph_osd_req_op *src_op;
378 struct ceph_osd_op *op;
379 void *p; 339 void *p;
380 int num_op = get_num_ops(src_ops, NULL); 340 size_t msg_size;
381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
382 int flags = req->r_flags; 341 int flags = req->r_flags;
383 u64 data_len = 0; 342 u64 data_len;
384 int i; 343 int i;
385 344
386 head = msg->front.iov_base; 345 req->r_num_ops = num_ops;
387 op = (void *)(head + 1); 346 req->r_snapid = snap_id;
388 p = (void *)(op + num_op);
389
390 req->r_snapc = ceph_get_snap_context(snapc); 347 req->r_snapc = ceph_get_snap_context(snapc);
391 348
392 head->client_inc = cpu_to_le32(1); /* always, for now. */ 349 /* encode request */
393 head->flags = cpu_to_le32(flags); 350 msg->hdr.version = cpu_to_le16(4);
394 if (flags & CEPH_OSD_FLAG_WRITE)
395 ceph_encode_timespec(&head->mtime, mtime);
396 head->num_ops = cpu_to_le16(num_op);
397
398
399 /* fill in oid */
400 head->object_len = cpu_to_le32(oid_len);
401 memcpy(p, oid, oid_len);
402 p += oid_len;
403 351
352 p = msg->front.iov_base;
353 ceph_encode_32(&p, 1); /* client_inc is always 1 */
354 req->r_request_osdmap_epoch = p;
355 p += 4;
356 req->r_request_flags = p;
357 p += 4;
358 if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 ceph_encode_timespec(p, mtime);
360 p += sizeof(struct ceph_timespec);
361 req->r_request_reassert_version = p;
362 p += sizeof(struct ceph_eversion); /* will get filled in */
363
364 /* oloc */
365 ceph_encode_8(&p, 4);
366 ceph_encode_8(&p, 4);
367 ceph_encode_32(&p, 8 + 4 + 4);
368 req->r_request_pool = p;
369 p += 8;
370 ceph_encode_32(&p, -1); /* preferred */
371 ceph_encode_32(&p, 0); /* key len */
372
373 ceph_encode_8(&p, 1);
374 req->r_request_pgid = p;
375 p += 8 + 4;
376 ceph_encode_32(&p, -1); /* preferred */
377
378 /* oid */
379 ceph_encode_32(&p, req->r_oid_len);
380 memcpy(p, req->r_oid, req->r_oid_len);
381 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382 p += req->r_oid_len;
383
384 /* ops */
385 ceph_encode_16(&p, num_ops);
404 src_op = src_ops; 386 src_op = src_ops;
405 while (src_op->op) { 387 req->r_request_ops = p;
406 osd_req_encode_op(req, op, src_op); 388 for (i = 0; i < num_ops; i++, src_op++) {
407 src_op++; 389 osd_req_encode_op(req, p, src_op);
408 op++; 390 p += sizeof(struct ceph_osd_op);
409 } 391 }
410 392
411 if (req->r_trail) 393 /* snaps */
412 data_len += req->r_trail->length; 394 ceph_encode_64(&p, req->r_snapid);
413 395 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
414 if (snapc) { 396 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
415 head->snap_seq = cpu_to_le64(snapc->seq); 397 if (req->r_snapc) {
416 head->num_snaps = cpu_to_le32(snapc->num_snaps);
417 for (i = 0; i < snapc->num_snaps; i++) { 398 for (i = 0; i < snapc->num_snaps; i++) {
418 put_unaligned_le64(snapc->snaps[i], p); 399 ceph_encode_64(&p, req->r_snapc->snaps[i]);
419 p += sizeof(u64);
420 } 400 }
421 } 401 }
422 402
403 req->r_request_attempts = p;
404 p += 4;
405
406 data_len = req->r_trail.length;
423 if (flags & CEPH_OSD_FLAG_WRITE) { 407 if (flags & CEPH_OSD_FLAG_WRITE) {
424 req->r_request->hdr.data_off = cpu_to_le16(off); 408 req->r_request->hdr.data_off = cpu_to_le16(off);
425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 409 data_len += len;
426 } else if (data_len) {
427 req->r_request->hdr.data_off = 0;
428 req->r_request->hdr.data_len = cpu_to_le32(data_len);
429 } 410 }
430 411 req->r_request->hdr.data_len = cpu_to_le32(data_len);
431 req->r_request->page_alignment = req->r_page_alignment; 412 req->r_request->page_alignment = req->r_page_alignment;
432 413
433 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 414 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
434 msg_size = p - msg->front.iov_base; 415 msg_size = p - msg->front.iov_base;
435 msg->front.iov_len = msg_size; 416 msg->front.iov_len = msg_size;
436 msg->hdr.front_len = cpu_to_le32(msg_size); 417 msg->hdr.front_len = cpu_to_le32(msg_size);
418
419 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 num_ops);
437 return; 421 return;
438} 422}
439EXPORT_SYMBOL(ceph_osdc_build_request); 423EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
459 u32 truncate_seq, 443 u32 truncate_seq,
460 u64 truncate_size, 444 u64 truncate_size,
461 struct timespec *mtime, 445 struct timespec *mtime,
462 bool use_mempool, int num_reply, 446 bool use_mempool,
463 int page_align) 447 int page_align)
464{ 448{
465 struct ceph_osd_req_op ops[3]; 449 struct ceph_osd_req_op ops[2];
466 struct ceph_osd_request *req; 450 struct ceph_osd_request *req;
451 unsigned int num_op = 1;
467 int r; 452 int r;
468 453
454 memset(&ops, 0, sizeof ops);
455
469 ops[0].op = opcode; 456 ops[0].op = opcode;
470 ops[0].extent.truncate_seq = truncate_seq; 457 ops[0].extent.truncate_seq = truncate_seq;
471 ops[0].extent.truncate_size = truncate_size; 458 ops[0].extent.truncate_size = truncate_size;
472 ops[0].payload_len = 0;
473 459
474 if (do_sync) { 460 if (do_sync) {
475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 461 ops[1].op = CEPH_OSD_OP_STARTSYNC;
476 ops[1].payload_len = 0; 462 num_op++;
477 ops[2].op = 0; 463 }
478 } else 464
479 ops[1].op = 0; 465 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
480 466 GFP_NOFS);
481 req = ceph_osdc_alloc_request(osdc, flags,
482 snapc, ops,
483 use_mempool,
484 GFP_NOFS, NULL, NULL);
485 if (!req) 467 if (!req)
486 return ERR_PTR(-ENOMEM); 468 return ERR_PTR(-ENOMEM);
469 req->r_flags = flags;
487 470
488 /* calculate max write size */ 471 /* calculate max write size */
489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 472 r = calc_layout(vino, layout, off, plen, req, ops);
490 if (r < 0) 473 if (r < 0)
491 return ERR_PTR(r); 474 return ERR_PTR(r);
492 req->r_file_layout = *layout; /* keep a copy */ 475 req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
496 req->r_num_pages = calc_pages_for(page_align, *plen); 479 req->r_num_pages = calc_pages_for(page_align, *plen);
497 req->r_page_alignment = page_align; 480 req->r_page_alignment = page_align;
498 481
499 ceph_osdc_build_request(req, off, plen, ops, 482 ceph_osdc_build_request(req, off, *plen, num_op, ops,
500 snapc, 483 snapc, vino.snap, mtime);
501 mtime,
502 req->r_oid, req->r_oid_len);
503 484
504 return req; 485 return req;
505} 486}
@@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con)
623 down_read(&osdc->map_sem); 604 down_read(&osdc->map_sem);
624 mutex_lock(&osdc->request_mutex); 605 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd); 606 __kick_osd_requests(osdc, osd);
607 __send_queued(osdc);
626 mutex_unlock(&osdc->request_mutex); 608 mutex_unlock(&osdc->request_mutex);
627 send_queued(osdc);
628 up_read(&osdc->map_sem); 609 up_read(&osdc->map_sem);
629} 610}
630 611
@@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
739 */ 720 */
740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 721static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
741{ 722{
742 struct ceph_osd_request *req; 723 struct ceph_entity_addr *peer_addr;
743 int ret = 0;
744 724
745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 725 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
746 if (list_empty(&osd->o_requests) && 726 if (list_empty(&osd->o_requests) &&
747 list_empty(&osd->o_linger_requests)) { 727 list_empty(&osd->o_linger_requests)) {
748 __remove_osd(osdc, osd); 728 __remove_osd(osdc, osd);
749 ret = -ENODEV; 729
750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 730 return -ENODEV;
751 &osd->o_con.peer_addr, 731 }
752 sizeof(osd->o_con.peer_addr)) == 0 && 732
753 !ceph_con_opened(&osd->o_con)) { 733 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
734 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
735 !ceph_con_opened(&osd->o_con)) {
736 struct ceph_osd_request *req;
737
754 dout(" osd addr hasn't changed and connection never opened," 738 dout(" osd addr hasn't changed and connection never opened,"
755 " letting msgr retry"); 739 " letting msgr retry");
756 /* touch each r_stamp for handle_timeout()'s benfit */ 740 /* touch each r_stamp for handle_timeout()'s benfit */
757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 741 list_for_each_entry(req, &osd->o_requests, r_osd_item)
758 req->r_stamp = jiffies; 742 req->r_stamp = jiffies;
759 ret = -EAGAIN; 743
760 } else { 744 return -EAGAIN;
761 ceph_con_close(&osd->o_con);
762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
763 &osdc->osdmap->osd_addr[osd->o_osd]);
764 osd->o_incarnation++;
765 } 745 }
766 return ret; 746
747 ceph_con_close(&osd->o_con);
748 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
749 osd->o_incarnation++;
750
751 return 0;
767} 752}
768 753
769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 754static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
961static int __map_request(struct ceph_osd_client *osdc, 946static int __map_request(struct ceph_osd_client *osdc,
962 struct ceph_osd_request *req, int force_resend) 947 struct ceph_osd_request *req, int force_resend)
963{ 948{
964 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
965 struct ceph_pg pgid; 949 struct ceph_pg pgid;
966 int acting[CEPH_PG_MAX_SIZE]; 950 int acting[CEPH_PG_MAX_SIZE];
967 int o = -1, num = 0; 951 int o = -1, num = 0;
968 int err; 952 int err;
969 953
970 dout("map_request %p tid %lld\n", req, req->r_tid); 954 dout("map_request %p tid %lld\n", req, req->r_tid);
971 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 955 err = ceph_calc_object_layout(&pgid, req->r_oid,
972 &req->r_file_layout, osdc->osdmap); 956 &req->r_file_layout, osdc->osdmap);
973 if (err) { 957 if (err) {
974 list_move(&req->r_req_lru_item, &osdc->req_notarget); 958 list_move(&req->r_req_lru_item, &osdc->req_notarget);
975 return err; 959 return err;
976 } 960 }
977 pgid = reqhead->layout.ol_pgid;
978 req->r_pgid = pgid; 961 req->r_pgid = pgid;
979 962
980 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 963 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
@@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc,
991 (req->r_osd == NULL && o == -1)) 974 (req->r_osd == NULL && o == -1))
992 return 0; /* no change */ 975 return 0; /* no change */
993 976
994 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", 977 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
995 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 978 req->r_tid, pgid.pool, pgid.seed, o,
996 req->r_osd ? req->r_osd->o_osd : -1); 979 req->r_osd ? req->r_osd->o_osd : -1);
997 980
998 /* record full pg acting set */ 981 /* record full pg acting set */
@@ -1041,15 +1024,22 @@ out:
1041static void __send_request(struct ceph_osd_client *osdc, 1024static void __send_request(struct ceph_osd_client *osdc,
1042 struct ceph_osd_request *req) 1025 struct ceph_osd_request *req)
1043{ 1026{
1044 struct ceph_osd_request_head *reqhead; 1027 void *p;
1045
1046 dout("send_request %p tid %llu to osd%d flags %d\n",
1047 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1048 1028
1049 reqhead = req->r_request->front.iov_base; 1029 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1050 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 1030 req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1051 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 1031 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1052 reqhead->reassert_version = req->r_reassert_version; 1032
1033 /* fill in message content that changes each time we send it */
1034 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 p = req->r_request_pgid;
1038 ceph_encode_64(&p, req->r_pgid.pool);
1039 ceph_encode_32(&p, req->r_pgid.seed);
1040 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
1041 memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 sizeof(req->r_reassert_version));
1053 1043
1054 req->r_stamp = jiffies; 1044 req->r_stamp = jiffies;
1055 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1045 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc,
1062/* 1052/*
1063 * Send any requests in the queue (req_unsent). 1053 * Send any requests in the queue (req_unsent).
1064 */ 1054 */
1065static void send_queued(struct ceph_osd_client *osdc) 1055static void __send_queued(struct ceph_osd_client *osdc)
1066{ 1056{
1067 struct ceph_osd_request *req, *tmp; 1057 struct ceph_osd_request *req, *tmp;
1068 1058
1069 dout("send_queued\n"); 1059 dout("__send_queued\n");
1070 mutex_lock(&osdc->request_mutex); 1060 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1071 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1072 __send_request(osdc, req); 1061 __send_request(osdc, req);
1073 }
1074 mutex_unlock(&osdc->request_mutex);
1075} 1062}
1076 1063
1077/* 1064/*
@@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work)
1123 } 1110 }
1124 1111
1125 __schedule_osd_timeout(osdc); 1112 __schedule_osd_timeout(osdc);
1113 __send_queued(osdc);
1126 mutex_unlock(&osdc->request_mutex); 1114 mutex_unlock(&osdc->request_mutex);
1127 send_queued(osdc);
1128 up_read(&osdc->map_sem); 1115 up_read(&osdc->map_sem);
1129} 1116}
1130 1117
@@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
1152 complete_all(&req->r_safe_completion); /* fsync waiter */ 1139 complete_all(&req->r_safe_completion); /* fsync waiter */
1153} 1140}
1154 1141
1142static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143{
1144 __u8 v;
1145
1146 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 v = ceph_decode_8(p);
1148 if (v > 1) {
1149 pr_warning("do not understand pg encoding %d > 1", v);
1150 return -EINVAL;
1151 }
1152 pgid->pool = ceph_decode_64(p);
1153 pgid->seed = ceph_decode_32(p);
1154 *p += 4;
1155 return 0;
1156
1157bad:
1158 pr_warning("incomplete pg encoding");
1159 return -EINVAL;
1160}
1161
1155/* 1162/*
1156 * handle osd op reply. either call the callback if it is specified, 1163 * handle osd op reply. either call the callback if it is specified,
1157 * or do the completion to wake up the waiting thread. 1164 * or do the completion to wake up the waiting thread.
@@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
1159static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1166static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160 struct ceph_connection *con) 1167 struct ceph_connection *con)
1161{ 1168{
1162 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1169 void *p, *end;
1163 struct ceph_osd_request *req; 1170 struct ceph_osd_request *req;
1164 u64 tid; 1171 u64 tid;
1165 int numops, object_len, flags; 1172 int object_len;
1173 int numops, payload_len, flags;
1166 s32 result; 1174 s32 result;
1175 s32 retry_attempt;
1176 struct ceph_pg pg;
1177 int err;
1178 u32 reassert_epoch;
1179 u64 reassert_version;
1180 u32 osdmap_epoch;
1181 int i;
1167 1182
1168 tid = le64_to_cpu(msg->hdr.tid); 1183 tid = le64_to_cpu(msg->hdr.tid);
1169 if (msg->front.iov_len < sizeof(*rhead)) 1184 dout("handle_reply %p tid %llu\n", msg, tid);
1170 goto bad; 1185
1171 numops = le32_to_cpu(rhead->num_ops); 1186 p = msg->front.iov_base;
1172 object_len = le32_to_cpu(rhead->object_len); 1187 end = p + msg->front.iov_len;
1173 result = le32_to_cpu(rhead->result); 1188
1174 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1189 ceph_decode_need(&p, end, 4, bad);
1175 numops * sizeof(struct ceph_osd_op)) 1190 object_len = ceph_decode_32(&p);
1191 ceph_decode_need(&p, end, object_len, bad);
1192 p += object_len;
1193
1194 err = __decode_pgid(&p, end, &pg);
1195 if (err)
1176 goto bad; 1196 goto bad;
1177 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1197
1198 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 flags = ceph_decode_64(&p);
1200 result = ceph_decode_32(&p);
1201 reassert_epoch = ceph_decode_32(&p);
1202 reassert_version = ceph_decode_64(&p);
1203 osdmap_epoch = ceph_decode_32(&p);
1204
1178 /* lookup */ 1205 /* lookup */
1179 mutex_lock(&osdc->request_mutex); 1206 mutex_lock(&osdc->request_mutex);
1180 req = __lookup_request(osdc, tid); 1207 req = __lookup_request(osdc, tid);
@@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1184 return; 1211 return;
1185 } 1212 }
1186 ceph_osdc_get_request(req); 1213 ceph_osdc_get_request(req);
1187 flags = le32_to_cpu(rhead->flags); 1214
1215 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 req, result);
1217
1218 ceph_decode_need(&p, end, 4, bad);
1219 numops = ceph_decode_32(&p);
1220 if (numops > CEPH_OSD_MAX_OP)
1221 goto bad_put;
1222 if (numops != req->r_num_ops)
1223 goto bad_put;
1224 payload_len = 0;
1225 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 for (i = 0; i < numops; i++) {
1227 struct ceph_osd_op *op = p;
1228 int len;
1229
1230 len = le32_to_cpu(op->payload_len);
1231 req->r_reply_op_len[i] = len;
1232 dout(" op %d has %d bytes\n", i, len);
1233 payload_len += len;
1234 p += sizeof(*op);
1235 }
1236 if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 pr_warning("sum of op payload lens %d != data_len %d",
1238 payload_len, le32_to_cpu(msg->hdr.data_len));
1239 goto bad_put;
1240 }
1241
1242 ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 retry_attempt = ceph_decode_32(&p);
1244 for (i = 0; i < numops; i++)
1245 req->r_reply_op_result[i] = ceph_decode_32(&p);
1188 1246
1189 /* 1247 /*
1190 * if this connection filled our message, drop our reference now, to 1248 * if this connection filled our message, drop our reference now, to
@@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1199 if (!req->r_got_reply) { 1257 if (!req->r_got_reply) {
1200 unsigned int bytes; 1258 unsigned int bytes;
1201 1259
1202 req->r_result = le32_to_cpu(rhead->result); 1260 req->r_result = result;
1203 bytes = le32_to_cpu(msg->hdr.data_len); 1261 bytes = le32_to_cpu(msg->hdr.data_len);
1204 dout("handle_reply result %d bytes %d\n", req->r_result, 1262 dout("handle_reply result %d bytes %d\n", req->r_result,
1205 bytes); 1263 bytes);
@@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1207 req->r_result = bytes; 1265 req->r_result = bytes;
1208 1266
1209 /* in case this is a write and we need to replay, */ 1267 /* in case this is a write and we need to replay, */
1210 req->r_reassert_version = rhead->reassert_version; 1268 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1211 1270
1212 req->r_got_reply = 1; 1271 req->r_got_reply = 1;
1213 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1272 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1242,10 +1301,11 @@ done:
1242 ceph_osdc_put_request(req); 1301 ceph_osdc_put_request(req);
1243 return; 1302 return;
1244 1303
1304bad_put:
1305 ceph_osdc_put_request(req);
1245bad: 1306bad:
1246 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1307 pr_err("corrupt osd_op_reply got %d %d\n",
1247 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1308 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1248 (int)sizeof(*rhead));
1249 ceph_msg_dump(msg); 1309 ceph_msg_dump(msg);
1250} 1310}
1251 1311
@@ -1462,7 +1522,9 @@ done:
1462 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1522 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1463 ceph_monc_request_next_osdmap(&osdc->client->monc); 1523 ceph_monc_request_next_osdmap(&osdc->client->monc);
1464 1524
1465 send_queued(osdc); 1525 mutex_lock(&osdc->request_mutex);
1526 __send_queued(osdc);
1527 mutex_unlock(&osdc->request_mutex);
1466 up_read(&osdc->map_sem); 1528 up_read(&osdc->map_sem);
1467 wake_up_all(&osdc->client->auth_wq); 1529 wake_up_all(&osdc->client->auth_wq);
1468 return; 1530 return;
@@ -1556,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event)
1556 1618
1557int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1619int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1558 void (*event_cb)(u64, u64, u8, void *), 1620 void (*event_cb)(u64, u64, u8, void *),
1559 int one_shot, void *data, 1621 void *data, struct ceph_osd_event **pevent)
1560 struct ceph_osd_event **pevent)
1561{ 1622{
1562 struct ceph_osd_event *event; 1623 struct ceph_osd_event *event;
1563 1624
@@ -1567,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1567 1628
1568 dout("create_event %p\n", event); 1629 dout("create_event %p\n", event);
1569 event->cb = event_cb; 1630 event->cb = event_cb;
1570 event->one_shot = one_shot; 1631 event->one_shot = 0;
1571 event->data = data; 1632 event->data = data;
1572 event->osdc = osdc; 1633 event->osdc = osdc;
1573 INIT_LIST_HEAD(&event->osd_node); 1634 INIT_LIST_HEAD(&event->osd_node);
1574 RB_CLEAR_NODE(&event->node); 1635 RB_CLEAR_NODE(&event->node);
1575 kref_init(&event->kref); /* one ref for us */ 1636 kref_init(&event->kref); /* one ref for us */
1576 kref_get(&event->kref); /* one ref for the caller */ 1637 kref_get(&event->kref); /* one ref for the caller */
1577 init_completion(&event->completion);
1578 1638
1579 spin_lock(&osdc->event_lock); 1639 spin_lock(&osdc->event_lock);
1580 event->cookie = ++osdc->event_count; 1640 event->cookie = ++osdc->event_count;
@@ -1610,7 +1670,6 @@ static void do_event_work(struct work_struct *work)
1610 1670
1611 dout("do_event_work completing %p\n", event); 1671 dout("do_event_work completing %p\n", event);
1612 event->cb(ver, notify_id, opcode, event->data); 1672 event->cb(ver, notify_id, opcode, event->data);
1613 complete(&event->completion);
1614 dout("do_event_work completed %p\n", event); 1673 dout("do_event_work completed %p\n", event);
1615 ceph_osdc_put_event(event); 1674 ceph_osdc_put_event(event);
1616 kfree(event_work); 1675 kfree(event_work);
@@ -1620,7 +1679,8 @@ static void do_event_work(struct work_struct *work)
1620/* 1679/*
1621 * Process osd watch notifications 1680 * Process osd watch notifications
1622 */ 1681 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1682static void handle_watch_notify(struct ceph_osd_client *osdc,
1683 struct ceph_msg *msg)
1624{ 1684{
1625 void *p, *end; 1685 void *p, *end;
1626 u8 proto_ver; 1686 u8 proto_ver;
@@ -1641,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1641 spin_lock(&osdc->event_lock); 1701 spin_lock(&osdc->event_lock);
1642 event = __find_event(osdc, cookie); 1702 event = __find_event(osdc, cookie);
1643 if (event) { 1703 if (event) {
1704 BUG_ON(event->one_shot);
1644 get_event(event); 1705 get_event(event);
1645 if (event->one_shot)
1646 __remove_event(event);
1647 } 1706 }
1648 spin_unlock(&osdc->event_lock); 1707 spin_unlock(&osdc->event_lock);
1649 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1708 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1668 return; 1727 return;
1669 1728
1670done_err: 1729done_err:
1671 complete(&event->completion);
1672 ceph_osdc_put_event(event); 1730 ceph_osdc_put_event(event);
1673 return; 1731 return;
1674 1732
@@ -1677,21 +1735,6 @@ bad:
1677 return; 1735 return;
1678} 1736}
1679 1737
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682 int err;
1683
1684 dout("wait_event %p\n", event);
1685 err = wait_for_completion_interruptible_timeout(&event->completion,
1686 timeout * HZ);
1687 ceph_osdc_put_event(event);
1688 if (err > 0)
1689 err = 0;
1690 dout("wait_event %p returns %d\n", event, err);
1691 return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/* 1738/*
1696 * Register request, send initial attempt. 1739 * Register request, send initial attempt.
1697 */ 1740 */
@@ -1706,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1706#ifdef CONFIG_BLOCK 1749#ifdef CONFIG_BLOCK
1707 req->r_request->bio = req->r_bio; 1750 req->r_request->bio = req->r_bio;
1708#endif 1751#endif
1709 req->r_request->trail = req->r_trail; 1752 req->r_request->trail = &req->r_trail;
1710 1753
1711 register_request(osdc, req); 1754 register_request(osdc, req);
1712 1755
@@ -1865,7 +1908,6 @@ out_mempool:
1865out: 1908out:
1866 return err; 1909 return err;
1867} 1910}
1868EXPORT_SYMBOL(ceph_osdc_init);
1869 1911
1870void ceph_osdc_stop(struct ceph_osd_client *osdc) 1912void ceph_osdc_stop(struct ceph_osd_client *osdc)
1871{ 1913{
@@ -1882,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1882 ceph_msgpool_destroy(&osdc->msgpool_op); 1924 ceph_msgpool_destroy(&osdc->msgpool_op);
1883 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1925 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1884} 1926}
1885EXPORT_SYMBOL(ceph_osdc_stop);
1886 1927
1887/* 1928/*
1888 * Read some contiguous pages. If we cross a stripe boundary, shorten 1929 * Read some contiguous pages. If we cross a stripe boundary, shorten
@@ -1902,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1902 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1943 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1903 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1944 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1904 NULL, 0, truncate_seq, truncate_size, NULL, 1945 NULL, 0, truncate_seq, truncate_size, NULL,
1905 false, 1, page_align); 1946 false, page_align);
1906 if (IS_ERR(req)) 1947 if (IS_ERR(req))
1907 return PTR_ERR(req); 1948 return PTR_ERR(req);
1908 1949
@@ -1931,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1931 u64 off, u64 len, 1972 u64 off, u64 len,
1932 u32 truncate_seq, u64 truncate_size, 1973 u32 truncate_seq, u64 truncate_size,
1933 struct timespec *mtime, 1974 struct timespec *mtime,
1934 struct page **pages, int num_pages, 1975 struct page **pages, int num_pages)
1935 int flags, int do_sync, bool nofail)
1936{ 1976{
1937 struct ceph_osd_request *req; 1977 struct ceph_osd_request *req;
1938 int rc = 0; 1978 int rc = 0;
@@ -1941,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1941 BUG_ON(vino.snap != CEPH_NOSNAP); 1981 BUG_ON(vino.snap != CEPH_NOSNAP);
1942 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1982 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1943 CEPH_OSD_OP_WRITE, 1983 CEPH_OSD_OP_WRITE,
1944 flags | CEPH_OSD_FLAG_ONDISK | 1984 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1945 CEPH_OSD_FLAG_WRITE, 1985 snapc, 0,
1946 snapc, do_sync,
1947 truncate_seq, truncate_size, mtime, 1986 truncate_seq, truncate_size, mtime,
1948 nofail, 1, page_align); 1987 true, page_align);
1949 if (IS_ERR(req)) 1988 if (IS_ERR(req))
1950 return PTR_ERR(req); 1989 return PTR_ERR(req);
1951 1990
@@ -1954,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1954 dout("writepages %llu~%llu (%d pages)\n", off, len, 1993 dout("writepages %llu~%llu (%d pages)\n", off, len,
1955 req->r_num_pages); 1994 req->r_num_pages);
1956 1995
1957 rc = ceph_osdc_start_request(osdc, req, nofail); 1996 rc = ceph_osdc_start_request(osdc, req, true);
1958 if (!rc) 1997 if (!rc)
1959 rc = ceph_osdc_wait_request(osdc, req); 1998 rc = ceph_osdc_wait_request(osdc, req);
1960 1999
@@ -2047,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2047 if (data_len > 0) { 2086 if (data_len > 0) {
2048 int want = calc_pages_for(req->r_page_alignment, data_len); 2087 int want = calc_pages_for(req->r_page_alignment, data_len);
2049 2088
2050 if (unlikely(req->r_num_pages < want)) { 2089 if (req->r_pages && unlikely(req->r_num_pages < want)) {
2051 pr_warning("tid %lld reply has %d bytes %d pages, we" 2090 pr_warning("tid %lld reply has %d bytes %d pages, we"
2052 " had only %d pages ready\n", tid, data_len, 2091 " had only %d pages ready\n", tid, data_len,
2053 want, req->r_num_pages); 2092 want, req->r_num_pages);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index de73214b5d26..69bc4bf89e3e 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -13,26 +13,18 @@
13 13
14char *ceph_osdmap_state_str(char *str, int len, int state) 14char *ceph_osdmap_state_str(char *str, int len, int state)
15{ 15{
16 int flag = 0;
17
18 if (!len) 16 if (!len)
19 goto done; 17 return str;
20 18
21 *str = '\0'; 19 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
22 if (state) { 20 snprintf(str, len, "exists, up");
23 if (state & CEPH_OSD_EXISTS) { 21 else if (state & CEPH_OSD_EXISTS)
24 snprintf(str, len, "exists"); 22 snprintf(str, len, "exists");
25 flag = 1; 23 else if (state & CEPH_OSD_UP)
26 } 24 snprintf(str, len, "up");
27 if (state & CEPH_OSD_UP) { 25 else
28 snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
29 "up");
30 flag = 1;
31 }
32 } else {
33 snprintf(str, len, "doesn't exist"); 26 snprintf(str, len, "doesn't exist");
34 } 27
35done:
36 return str; 28 return str;
37} 29}
38 30
@@ -53,13 +45,8 @@ static int calc_bits_of(unsigned int t)
53 */ 45 */
54static void calc_pg_masks(struct ceph_pg_pool_info *pi) 46static void calc_pg_masks(struct ceph_pg_pool_info *pi)
55{ 47{
56 pi->pg_num_mask = (1 << calc_bits_of(le32_to_cpu(pi->v.pg_num)-1)) - 1; 48 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
57 pi->pgp_num_mask = 49 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
58 (1 << calc_bits_of(le32_to_cpu(pi->v.pgp_num)-1)) - 1;
59 pi->lpg_num_mask =
60 (1 << calc_bits_of(le32_to_cpu(pi->v.lpg_num)-1)) - 1;
61 pi->lpgp_num_mask =
62 (1 << calc_bits_of(le32_to_cpu(pi->v.lpgp_num)-1)) - 1;
63} 50}
64 51
65/* 52/*
@@ -170,6 +157,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
170 c->choose_local_tries = 2; 157 c->choose_local_tries = 2;
171 c->choose_local_fallback_tries = 5; 158 c->choose_local_fallback_tries = 5;
172 c->choose_total_tries = 19; 159 c->choose_total_tries = 19;
160 c->chooseleaf_descend_once = 0;
173 161
174 ceph_decode_need(p, end, 4*sizeof(u32), bad); 162 ceph_decode_need(p, end, 4*sizeof(u32), bad);
175 magic = ceph_decode_32(p); 163 magic = ceph_decode_32(p);
@@ -336,6 +324,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
336 dout("crush decode tunable choose_total_tries = %d", 324 dout("crush decode tunable choose_total_tries = %d",
337 c->choose_total_tries); 325 c->choose_total_tries);
338 326
327 ceph_decode_need(p, end, sizeof(u32), done);
328 c->chooseleaf_descend_once = ceph_decode_32(p);
329 dout("crush decode tunable chooseleaf_descend_once = %d",
330 c->chooseleaf_descend_once);
331
339done: 332done:
340 dout("crush_decode success\n"); 333 dout("crush_decode success\n");
341 return c; 334 return c;
@@ -354,12 +347,13 @@ bad:
354 */ 347 */
355static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) 348static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)
356{ 349{
357 u64 a = *(u64 *)&l; 350 if (l.pool < r.pool)
358 u64 b = *(u64 *)&r; 351 return -1;
359 352 if (l.pool > r.pool)
360 if (a < b) 353 return 1;
354 if (l.seed < r.seed)
361 return -1; 355 return -1;
362 if (a > b) 356 if (l.seed > r.seed)
363 return 1; 357 return 1;
364 return 0; 358 return 0;
365} 359}
@@ -405,8 +399,8 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
405 } else if (c > 0) { 399 } else if (c > 0) {
406 n = n->rb_right; 400 n = n->rb_right;
407 } else { 401 } else {
408 dout("__lookup_pg_mapping %llx got %p\n", 402 dout("__lookup_pg_mapping %lld.%x got %p\n",
409 *(u64 *)&pgid, pg); 403 pgid.pool, pgid.seed, pg);
410 return pg; 404 return pg;
411 } 405 }
412 } 406 }
@@ -418,12 +412,13 @@ static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid)
418 struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); 412 struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid);
419 413
420 if (pg) { 414 if (pg) {
421 dout("__remove_pg_mapping %llx %p\n", *(u64 *)&pgid, pg); 415 dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
416 pg);
422 rb_erase(&pg->node, root); 417 rb_erase(&pg->node, root);
423 kfree(pg); 418 kfree(pg);
424 return 0; 419 return 0;
425 } 420 }
426 dout("__remove_pg_mapping %llx dne\n", *(u64 *)&pgid); 421 dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
427 return -ENOENT; 422 return -ENOENT;
428} 423}
429 424
@@ -452,7 +447,7 @@ static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
452 return 0; 447 return 0;
453} 448}
454 449
455static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) 450static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
456{ 451{
457 struct ceph_pg_pool_info *pi; 452 struct ceph_pg_pool_info *pi;
458 struct rb_node *n = root->rb_node; 453 struct rb_node *n = root->rb_node;
@@ -508,24 +503,57 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
508 503
509static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 504static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
510{ 505{
511 unsigned int n, m; 506 u8 ev, cv;
507 unsigned len, num;
508 void *pool_end;
509
510 ceph_decode_need(p, end, 2 + 4, bad);
511 ev = ceph_decode_8(p); /* encoding version */
512 cv = ceph_decode_8(p); /* compat version */
513 if (ev < 5) {
514 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
515 return -EINVAL;
516 }
517 if (cv > 7) {
518 pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv);
519 return -EINVAL;
520 }
521 len = ceph_decode_32(p);
522 ceph_decode_need(p, end, len, bad);
523 pool_end = *p + len;
512 524
513 ceph_decode_copy(p, &pi->v, sizeof(pi->v)); 525 pi->type = ceph_decode_8(p);
514 calc_pg_masks(pi); 526 pi->size = ceph_decode_8(p);
527 pi->crush_ruleset = ceph_decode_8(p);
528 pi->object_hash = ceph_decode_8(p);
529
530 pi->pg_num = ceph_decode_32(p);
531 pi->pgp_num = ceph_decode_32(p);
532
533 *p += 4 + 4; /* skip lpg* */
534 *p += 4; /* skip last_change */
535 *p += 8 + 4; /* skip snap_seq, snap_epoch */
515 536
516 /* num_snaps * snap_info_t */ 537 /* skip snaps */
517 n = le32_to_cpu(pi->v.num_snaps); 538 num = ceph_decode_32(p);
518 while (n--) { 539 while (num--) {
519 ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) + 540 *p += 8; /* snapid key */
520 sizeof(struct ceph_timespec), bad); 541 *p += 1 + 1; /* versions */
521 *p += sizeof(u64) + /* key */ 542 len = ceph_decode_32(p);
522 1 + sizeof(u64) + /* u8, snapid */ 543 *p += len;
523 sizeof(struct ceph_timespec);
524 m = ceph_decode_32(p); /* snap name */
525 *p += m;
526 } 544 }
527 545
528 *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; 546 /* skip removed snaps */
547 num = ceph_decode_32(p);
548 *p += num * (8 + 8);
549
550 *p += 8; /* skip auid */
551 pi->flags = ceph_decode_64(p);
552
553 /* ignore the rest */
554
555 *p = pool_end;
556 calc_pg_masks(pi);
529 return 0; 557 return 0;
530 558
531bad: 559bad:
@@ -535,14 +563,15 @@ bad:
535static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 563static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
536{ 564{
537 struct ceph_pg_pool_info *pi; 565 struct ceph_pg_pool_info *pi;
538 u32 num, len, pool; 566 u32 num, len;
567 u64 pool;
539 568
540 ceph_decode_32_safe(p, end, num, bad); 569 ceph_decode_32_safe(p, end, num, bad);
541 dout(" %d pool names\n", num); 570 dout(" %d pool names\n", num);
542 while (num--) { 571 while (num--) {
543 ceph_decode_32_safe(p, end, pool, bad); 572 ceph_decode_64_safe(p, end, pool, bad);
544 ceph_decode_32_safe(p, end, len, bad); 573 ceph_decode_32_safe(p, end, len, bad);
545 dout(" pool %d len %d\n", pool, len); 574 dout(" pool %llu len %d\n", pool, len);
546 ceph_decode_need(p, end, len, bad); 575 ceph_decode_need(p, end, len, bad);
547 pi = __lookup_pg_pool(&map->pg_pools, pool); 576 pi = __lookup_pg_pool(&map->pg_pools, pool);
548 if (pi) { 577 if (pi) {
@@ -633,7 +662,6 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
633 struct ceph_osdmap *map; 662 struct ceph_osdmap *map;
634 u16 version; 663 u16 version;
635 u32 len, max, i; 664 u32 len, max, i;
636 u8 ev;
637 int err = -EINVAL; 665 int err = -EINVAL;
638 void *start = *p; 666 void *start = *p;
639 struct ceph_pg_pool_info *pi; 667 struct ceph_pg_pool_info *pi;
@@ -646,9 +674,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
646 map->pg_temp = RB_ROOT; 674 map->pg_temp = RB_ROOT;
647 675
648 ceph_decode_16_safe(p, end, version, bad); 676 ceph_decode_16_safe(p, end, version, bad);
649 if (version > CEPH_OSDMAP_VERSION) { 677 if (version > 6) {
650 pr_warning("got unknown v %d > %d of osdmap\n", version, 678 pr_warning("got unknown v %d > 6 of osdmap\n", version);
651 CEPH_OSDMAP_VERSION); 679 goto bad;
680 }
681 if (version < 6) {
682 pr_warning("got old v %d < 6 of osdmap\n", version);
652 goto bad; 683 goto bad;
653 } 684 }
654 685
@@ -660,20 +691,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
660 691
661 ceph_decode_32_safe(p, end, max, bad); 692 ceph_decode_32_safe(p, end, max, bad);
662 while (max--) { 693 while (max--) {
663 ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); 694 ceph_decode_need(p, end, 8 + 2, bad);
664 err = -ENOMEM; 695 err = -ENOMEM;
665 pi = kzalloc(sizeof(*pi), GFP_NOFS); 696 pi = kzalloc(sizeof(*pi), GFP_NOFS);
666 if (!pi) 697 if (!pi)
667 goto bad; 698 goto bad;
668 pi->id = ceph_decode_32(p); 699 pi->id = ceph_decode_64(p);
669 err = -EINVAL;
670 ev = ceph_decode_8(p); /* encoding version */
671 if (ev > CEPH_PG_POOL_VERSION) {
672 pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
673 ev, CEPH_PG_POOL_VERSION);
674 kfree(pi);
675 goto bad;
676 }
677 err = __decode_pool(p, end, pi); 700 err = __decode_pool(p, end, pi);
678 if (err < 0) { 701 if (err < 0) {
679 kfree(pi); 702 kfree(pi);
@@ -682,12 +705,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
682 __insert_pg_pool(&map->pg_pools, pi); 705 __insert_pg_pool(&map->pg_pools, pi);
683 } 706 }
684 707
685 if (version >= 5) { 708 err = __decode_pool_names(p, end, map);
686 err = __decode_pool_names(p, end, map); 709 if (err < 0) {
687 if (err < 0) { 710 dout("fail to decode pool names");
688 dout("fail to decode pool names"); 711 goto bad;
689 goto bad;
690 }
691 } 712 }
692 713
693 ceph_decode_32_safe(p, end, map->pool_max, bad); 714 ceph_decode_32_safe(p, end, map->pool_max, bad);
@@ -724,10 +745,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
724 for (i = 0; i < len; i++) { 745 for (i = 0; i < len; i++) {
725 int n, j; 746 int n, j;
726 struct ceph_pg pgid; 747 struct ceph_pg pgid;
748 struct ceph_pg_v1 pgid_v1;
727 struct ceph_pg_mapping *pg; 749 struct ceph_pg_mapping *pg;
728 750
729 ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); 751 ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
730 ceph_decode_copy(p, &pgid, sizeof(pgid)); 752 ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1));
753 pgid.pool = le32_to_cpu(pgid_v1.pool);
754 pgid.seed = le16_to_cpu(pgid_v1.ps);
731 n = ceph_decode_32(p); 755 n = ceph_decode_32(p);
732 err = -EINVAL; 756 err = -EINVAL;
733 if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) 757 if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
@@ -745,7 +769,8 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
745 err = __insert_pg_mapping(pg, &map->pg_temp); 769 err = __insert_pg_mapping(pg, &map->pg_temp);
746 if (err) 770 if (err)
747 goto bad; 771 goto bad;
748 dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, len); 772 dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed,
773 len);
749 } 774 }
750 775
751 /* crush */ 776 /* crush */
@@ -784,16 +809,17 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
784 struct ceph_fsid fsid; 809 struct ceph_fsid fsid;
785 u32 epoch = 0; 810 u32 epoch = 0;
786 struct ceph_timespec modified; 811 struct ceph_timespec modified;
787 u32 len, pool; 812 s32 len;
788 __s32 new_pool_max, new_flags, max; 813 u64 pool;
814 __s64 new_pool_max;
815 __s32 new_flags, max;
789 void *start = *p; 816 void *start = *p;
790 int err = -EINVAL; 817 int err = -EINVAL;
791 u16 version; 818 u16 version;
792 819
793 ceph_decode_16_safe(p, end, version, bad); 820 ceph_decode_16_safe(p, end, version, bad);
794 if (version > CEPH_OSDMAP_INC_VERSION) { 821 if (version > 6) {
795 pr_warning("got unknown v %d > %d of inc osdmap\n", version, 822 pr_warning("got unknown v %d > %d of inc osdmap\n", version, 6);
796 CEPH_OSDMAP_INC_VERSION);
797 goto bad; 823 goto bad;
798 } 824 }
799 825
@@ -803,7 +829,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
803 epoch = ceph_decode_32(p); 829 epoch = ceph_decode_32(p);
804 BUG_ON(epoch != map->epoch+1); 830 BUG_ON(epoch != map->epoch+1);
805 ceph_decode_copy(p, &modified, sizeof(modified)); 831 ceph_decode_copy(p, &modified, sizeof(modified));
806 new_pool_max = ceph_decode_32(p); 832 new_pool_max = ceph_decode_64(p);
807 new_flags = ceph_decode_32(p); 833 new_flags = ceph_decode_32(p);
808 834
809 /* full map? */ 835 /* full map? */
@@ -853,18 +879,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
853 /* new_pool */ 879 /* new_pool */
854 ceph_decode_32_safe(p, end, len, bad); 880 ceph_decode_32_safe(p, end, len, bad);
855 while (len--) { 881 while (len--) {
856 __u8 ev;
857 struct ceph_pg_pool_info *pi; 882 struct ceph_pg_pool_info *pi;
858 883
859 ceph_decode_32_safe(p, end, pool, bad); 884 ceph_decode_64_safe(p, end, pool, bad);
860 ceph_decode_need(p, end, 1 + sizeof(pi->v), bad);
861 ev = ceph_decode_8(p); /* encoding version */
862 if (ev > CEPH_PG_POOL_VERSION) {
863 pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
864 ev, CEPH_PG_POOL_VERSION);
865 err = -EINVAL;
866 goto bad;
867 }
868 pi = __lookup_pg_pool(&map->pg_pools, pool); 885 pi = __lookup_pg_pool(&map->pg_pools, pool);
869 if (!pi) { 886 if (!pi) {
870 pi = kzalloc(sizeof(*pi), GFP_NOFS); 887 pi = kzalloc(sizeof(*pi), GFP_NOFS);
@@ -890,7 +907,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
890 while (len--) { 907 while (len--) {
891 struct ceph_pg_pool_info *pi; 908 struct ceph_pg_pool_info *pi;
892 909
893 ceph_decode_32_safe(p, end, pool, bad); 910 ceph_decode_64_safe(p, end, pool, bad);
894 pi = __lookup_pg_pool(&map->pg_pools, pool); 911 pi = __lookup_pg_pool(&map->pg_pools, pool);
895 if (pi) 912 if (pi)
896 __remove_pg_pool(&map->pg_pools, pi); 913 __remove_pg_pool(&map->pg_pools, pi);
@@ -946,10 +963,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
946 while (len--) { 963 while (len--) {
947 struct ceph_pg_mapping *pg; 964 struct ceph_pg_mapping *pg;
948 int j; 965 int j;
966 struct ceph_pg_v1 pgid_v1;
949 struct ceph_pg pgid; 967 struct ceph_pg pgid;
950 u32 pglen; 968 u32 pglen;
951 ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); 969 ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
952 ceph_decode_copy(p, &pgid, sizeof(pgid)); 970 ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1));
971 pgid.pool = le32_to_cpu(pgid_v1.pool);
972 pgid.seed = le16_to_cpu(pgid_v1.ps);
953 pglen = ceph_decode_32(p); 973 pglen = ceph_decode_32(p);
954 974
955 if (pglen) { 975 if (pglen) {
@@ -975,8 +995,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
975 kfree(pg); 995 kfree(pg);
976 goto bad; 996 goto bad;
977 } 997 }
978 dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, 998 dout(" added pg_temp %lld.%x len %d\n", pgid.pool,
979 pglen); 999 pgid.seed, pglen);
980 } else { 1000 } else {
981 /* remove */ 1001 /* remove */
982 __remove_pg_mapping(&map->pg_temp, pgid); 1002 __remove_pg_mapping(&map->pg_temp, pgid);
@@ -1010,7 +1030,7 @@ bad:
1010 * pass a stride back to the caller. 1030 * pass a stride back to the caller.
1011 */ 1031 */
1012int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, 1032int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1013 u64 off, u64 *plen, 1033 u64 off, u64 len,
1014 u64 *ono, 1034 u64 *ono,
1015 u64 *oxoff, u64 *oxlen) 1035 u64 *oxoff, u64 *oxlen)
1016{ 1036{
@@ -1021,7 +1041,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1021 u32 su_per_object; 1041 u32 su_per_object;
1022 u64 t, su_offset; 1042 u64 t, su_offset;
1023 1043
1024 dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, 1044 dout("mapping %llu~%llu osize %u fl_su %u\n", off, len,
1025 osize, su); 1045 osize, su);
1026 if (su == 0 || sc == 0) 1046 if (su == 0 || sc == 0)
1027 goto invalid; 1047 goto invalid;
@@ -1054,11 +1074,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1054 1074
1055 /* 1075 /*
1056 * Calculate the length of the extent being written to the selected 1076 * Calculate the length of the extent being written to the selected
1057 * object. This is the minimum of the full length requested (plen) or 1077 * object. This is the minimum of the full length requested (len) or
1058 * the remainder of the current stripe being written to. 1078 * the remainder of the current stripe being written to.
1059 */ 1079 */
1060 *oxlen = min_t(u64, *plen, su - su_offset); 1080 *oxlen = min_t(u64, len, su - su_offset);
1061 *plen = *oxlen;
1062 1081
1063 dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); 1082 dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
1064 return 0; 1083 return 0;
@@ -1076,33 +1095,24 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
1076 * calculate an object layout (i.e. pgid) from an oid, 1095 * calculate an object layout (i.e. pgid) from an oid,
1077 * file_layout, and osdmap 1096 * file_layout, and osdmap
1078 */ 1097 */
1079int ceph_calc_object_layout(struct ceph_object_layout *ol, 1098int ceph_calc_object_layout(struct ceph_pg *pg,
1080 const char *oid, 1099 const char *oid,
1081 struct ceph_file_layout *fl, 1100 struct ceph_file_layout *fl,
1082 struct ceph_osdmap *osdmap) 1101 struct ceph_osdmap *osdmap)
1083{ 1102{
1084 unsigned int num, num_mask; 1103 unsigned int num, num_mask;
1085 struct ceph_pg pgid;
1086 int poolid = le32_to_cpu(fl->fl_pg_pool);
1087 struct ceph_pg_pool_info *pool; 1104 struct ceph_pg_pool_info *pool;
1088 unsigned int ps;
1089 1105
1090 BUG_ON(!osdmap); 1106 BUG_ON(!osdmap);
1091 1107 pg->pool = le32_to_cpu(fl->fl_pg_pool);
1092 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); 1108 pool = __lookup_pg_pool(&osdmap->pg_pools, pg->pool);
1093 if (!pool) 1109 if (!pool)
1094 return -EIO; 1110 return -EIO;
1095 ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); 1111 pg->seed = ceph_str_hash(pool->object_hash, oid, strlen(oid));
1096 num = le32_to_cpu(pool->v.pg_num); 1112 num = pool->pg_num;
1097 num_mask = pool->pg_num_mask; 1113 num_mask = pool->pg_num_mask;
1098 1114
1099 pgid.ps = cpu_to_le16(ps); 1115 dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pg->pool, pg->seed);
1100 pgid.preferred = cpu_to_le16(-1);
1101 pgid.pool = fl->fl_pg_pool;
1102 dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
1103
1104 ol->ol_pgid = pgid;
1105 ol->ol_stripe_unit = fl->fl_object_stripe_unit;
1106 return 0; 1116 return 0;
1107} 1117}
1108EXPORT_SYMBOL(ceph_calc_object_layout); 1118EXPORT_SYMBOL(ceph_calc_object_layout);
@@ -1117,19 +1127,16 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1117 struct ceph_pg_mapping *pg; 1127 struct ceph_pg_mapping *pg;
1118 struct ceph_pg_pool_info *pool; 1128 struct ceph_pg_pool_info *pool;
1119 int ruleno; 1129 int ruleno;
1120 unsigned int poolid, ps, pps, t, r; 1130 int r;
1121 1131 u32 pps;
1122 poolid = le32_to_cpu(pgid.pool);
1123 ps = le16_to_cpu(pgid.ps);
1124 1132
1125 pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); 1133 pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
1126 if (!pool) 1134 if (!pool)
1127 return NULL; 1135 return NULL;
1128 1136
1129 /* pg_temp? */ 1137 /* pg_temp? */
1130 t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), 1138 pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
1131 pool->pgp_num_mask); 1139 pool->pgp_num_mask);
1132 pgid.ps = cpu_to_le16(t);
1133 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); 1140 pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
1134 if (pg) { 1141 if (pg) {
1135 *num = pg->len; 1142 *num = pg->len;
@@ -1137,26 +1144,39 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
1137 } 1144 }
1138 1145
1139 /* crush */ 1146 /* crush */
1140 ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, 1147 ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
1141 pool->v.type, pool->v.size); 1148 pool->type, pool->size);
1142 if (ruleno < 0) { 1149 if (ruleno < 0) {
1143 pr_err("no crush rule pool %d ruleset %d type %d size %d\n", 1150 pr_err("no crush rule pool %lld ruleset %d type %d size %d\n",
1144 poolid, pool->v.crush_ruleset, pool->v.type, 1151 pgid.pool, pool->crush_ruleset, pool->type,
1145 pool->v.size); 1152 pool->size);
1146 return NULL; 1153 return NULL;
1147 } 1154 }
1148 1155
1149 pps = ceph_stable_mod(ps, 1156 if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
1150 le32_to_cpu(pool->v.pgp_num), 1157 /* hash pool id and seed sothat pool PGs do not overlap */
1151 pool->pgp_num_mask); 1158 pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
1152 pps += poolid; 1159 ceph_stable_mod(pgid.seed, pool->pgp_num,
1160 pool->pgp_num_mask),
1161 pgid.pool);
1162 } else {
1163 /*
1164 * legacy ehavior: add ps and pool together. this is
1165 * not a great approach because the PGs from each pool
1166 * will overlap on top of each other: 0.5 == 1.4 ==
1167 * 2.3 == ...
1168 */
1169 pps = ceph_stable_mod(pgid.seed, pool->pgp_num,
1170 pool->pgp_num_mask) +
1171 (unsigned)pgid.pool;
1172 }
1153 r = crush_do_rule(osdmap->crush, ruleno, pps, osds, 1173 r = crush_do_rule(osdmap->crush, ruleno, pps, osds,
1154 min_t(int, pool->v.size, *num), 1174 min_t(int, pool->size, *num),
1155 osdmap->osd_weight); 1175 osdmap->osd_weight);
1156 if (r < 0) { 1176 if (r < 0) {
1157 pr_err("error %d from crush rule: pool %d ruleset %d type %d" 1177 pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
1158 " size %d\n", r, poolid, pool->v.crush_ruleset, 1178 " size %d\n", r, pgid.pool, pool->crush_ruleset,
1159 pool->v.type, pool->v.size); 1179 pool->type, pool->size);
1160 return NULL; 1180 return NULL;
1161 } 1181 }
1162 *num = r; 1182 *num = r;
diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c
index cd9c21df87d1..815a2249cfa9 100644
--- a/net/ceph/pagevec.c
+++ b/net/ceph/pagevec.c
@@ -12,7 +12,7 @@
12/* 12/*
13 * build a vector of user pages 13 * build a vector of user pages
14 */ 14 */
15struct page **ceph_get_direct_page_vector(const char __user *data, 15struct page **ceph_get_direct_page_vector(const void __user *data,
16 int num_pages, bool write_page) 16 int num_pages, bool write_page)
17{ 17{
18 struct page **pages; 18 struct page **pages;
@@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector);
93 * copy user data into a page vector 93 * copy user data into a page vector
94 */ 94 */
95int ceph_copy_user_to_page_vector(struct page **pages, 95int ceph_copy_user_to_page_vector(struct page **pages,
96 const char __user *data, 96 const void __user *data,
97 loff_t off, size_t len) 97 loff_t off, size_t len)
98{ 98{
99 int i = 0; 99 int i = 0;
@@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages,
118} 118}
119EXPORT_SYMBOL(ceph_copy_user_to_page_vector); 119EXPORT_SYMBOL(ceph_copy_user_to_page_vector);
120 120
121int ceph_copy_to_page_vector(struct page **pages, 121void ceph_copy_to_page_vector(struct page **pages,
122 const char *data, 122 const void *data,
123 loff_t off, size_t len) 123 loff_t off, size_t len)
124{ 124{
125 int i = 0; 125 int i = 0;
126 size_t po = off & ~PAGE_CACHE_MASK; 126 size_t po = off & ~PAGE_CACHE_MASK;
127 size_t left = len; 127 size_t left = len;
128 size_t l;
129 128
130 while (left > 0) { 129 while (left > 0) {
131 l = min_t(size_t, PAGE_CACHE_SIZE-po, left); 130 size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
131
132 memcpy(page_address(pages[i]) + po, data, l); 132 memcpy(page_address(pages[i]) + po, data, l);
133 data += l; 133 data += l;
134 left -= l; 134 left -= l;
@@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages,
138 i++; 138 i++;
139 } 139 }
140 } 140 }
141 return len;
142} 141}
143EXPORT_SYMBOL(ceph_copy_to_page_vector); 142EXPORT_SYMBOL(ceph_copy_to_page_vector);
144 143
145int ceph_copy_from_page_vector(struct page **pages, 144void ceph_copy_from_page_vector(struct page **pages,
146 char *data, 145 void *data,
147 loff_t off, size_t len) 146 loff_t off, size_t len)
148{ 147{
149 int i = 0; 148 int i = 0;
150 size_t po = off & ~PAGE_CACHE_MASK; 149 size_t po = off & ~PAGE_CACHE_MASK;
151 size_t left = len; 150 size_t left = len;
152 size_t l;
153 151
154 while (left > 0) { 152 while (left > 0) {
155 l = min_t(size_t, PAGE_CACHE_SIZE-po, left); 153 size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
154
156 memcpy(data, page_address(pages[i]) + po, l); 155 memcpy(data, page_address(pages[i]) + po, l);
157 data += l; 156 data += l;
158 left -= l; 157 left -= l;
@@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages,
162 i++; 161 i++;
163 } 162 }
164 } 163 }
165 return len;
166} 164}
167EXPORT_SYMBOL(ceph_copy_from_page_vector); 165EXPORT_SYMBOL(ceph_copy_from_page_vector);
168 166
@@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector);
170 * copy user data from a page vector into a user pointer 168 * copy user data from a page vector into a user pointer
171 */ 169 */
172int ceph_copy_page_vector_to_user(struct page **pages, 170int ceph_copy_page_vector_to_user(struct page **pages,
173 char __user *data, 171 void __user *data,
174 loff_t off, size_t len) 172 loff_t off, size_t len)
175{ 173{
176 int i = 0; 174 int i = 0;