aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-02-19 20:20:56 -0500
committerAlex Elder <elder@inktank.com>2013-02-19 20:21:08 -0500
commit4c7a08c83a7842e88838dde16684d6bafffdfaf0 (patch)
treec5fe0057b2ff9f98a64ceb6fa076e75da8225cdd /net
parent19f949f52599ba7c3f67a5897ac6be14bfcb1200 (diff)
parent903bb32e890237ca43ab847e561e5377cfe0fdb3 (diff)
Merge branch 'testing' of github.com:ceph/ceph-client into into linux-3.8-ceph
Diffstat (limited to 'net')
-rw-r--r--net/ceph/ceph_common.c16
-rw-r--r--net/ceph/ceph_strings.c39
-rw-r--r--net/ceph/crush/mapper.c15
-rw-r--r--net/ceph/messenger.c5
-rw-r--r--net/ceph/osd_client.c418
-rw-r--r--net/ceph/osdmap.c43
-rw-r--r--net/ceph/pagevec.c24
7 files changed, 274 insertions, 286 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index ee71ea26777a..c236c235c4a2 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -26,6 +26,22 @@
26#include "crypto.h" 26#include "crypto.h"
27 27
28 28
29/*
30 * Module compatibility interface. For now it doesn't do anything,
31 * but its existence signals a certain level of functionality.
32 *
33 * The data buffer is used to pass information both to and from
34 * libceph. The return value indicates whether libceph determines
35 * it is compatible with the caller (from another kernel module),
36 * given the provided data.
37 *
38 * The data pointer can be null.
39 */
40bool libceph_compatible(void *data)
41{
42 return true;
43}
44EXPORT_SYMBOL(libceph_compatible);
29 45
30/* 46/*
31 * find filename portion of a path (/foo/bar/baz -> baz) 47 * find filename portion of a path (/foo/bar/baz -> baz)
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3fbda04de29c..1348df96fe15 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op)
21 switch (op) { 21 switch (op) {
22 case CEPH_OSD_OP_READ: return "read"; 22 case CEPH_OSD_OP_READ: return "read";
23 case CEPH_OSD_OP_STAT: return "stat"; 23 case CEPH_OSD_OP_STAT: return "stat";
24 case CEPH_OSD_OP_MAPEXT: return "mapext";
25 case CEPH_OSD_OP_SPARSE_READ: return "sparse-read";
26 case CEPH_OSD_OP_NOTIFY: return "notify";
27 case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
28 case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
24 29
25 case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; 30 case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
26 31
32 case CEPH_OSD_OP_CREATE: return "create";
27 case CEPH_OSD_OP_WRITE: return "write"; 33 case CEPH_OSD_OP_WRITE: return "write";
28 case CEPH_OSD_OP_DELETE: return "delete"; 34 case CEPH_OSD_OP_DELETE: return "delete";
29 case CEPH_OSD_OP_TRUNCATE: return "truncate"; 35 case CEPH_OSD_OP_TRUNCATE: return "truncate";
@@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op)
39 case CEPH_OSD_OP_TMAPUP: return "tmapup"; 45 case CEPH_OSD_OP_TMAPUP: return "tmapup";
40 case CEPH_OSD_OP_TMAPGET: return "tmapget"; 46 case CEPH_OSD_OP_TMAPGET: return "tmapget";
41 case CEPH_OSD_OP_TMAPPUT: return "tmapput"; 47 case CEPH_OSD_OP_TMAPPUT: return "tmapput";
48 case CEPH_OSD_OP_WATCH: return "watch";
49
50 case CEPH_OSD_OP_CLONERANGE: return "clonerange";
51 case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
52 case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
42 53
43 case CEPH_OSD_OP_GETXATTR: return "getxattr"; 54 case CEPH_OSD_OP_GETXATTR: return "getxattr";
44 case CEPH_OSD_OP_GETXATTRS: return "getxattrs"; 55 case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
@@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op)
53 case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; 64 case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
54 case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; 65 case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
55 case CEPH_OSD_OP_SCRUB: return "scrub"; 66 case CEPH_OSD_OP_SCRUB: return "scrub";
67 case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
68 case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
69 case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
70 case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
56 71
57 case CEPH_OSD_OP_WRLOCK: return "wrlock"; 72 case CEPH_OSD_OP_WRLOCK: return "wrlock";
58 case CEPH_OSD_OP_WRUNLOCK: return "wrunlock"; 73 case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
@@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op)
64 case CEPH_OSD_OP_CALL: return "call"; 79 case CEPH_OSD_OP_CALL: return "call";
65 80
66 case CEPH_OSD_OP_PGLS: return "pgls"; 81 case CEPH_OSD_OP_PGLS: return "pgls";
82 case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
83 case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
84 case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
85 case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
86 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
87 case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
88 case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
89 case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
90 case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
67 } 91 }
68 return "???"; 92 return "???";
69} 93}
70 94
95const char *ceph_osd_state_name(int s)
96{
97 switch (s) {
98 case CEPH_OSD_EXISTS:
99 return "exists";
100 case CEPH_OSD_UP:
101 return "up";
102 case CEPH_OSD_AUTOOUT:
103 return "autoout";
104 case CEPH_OSD_NEW:
105 return "new";
106 default:
107 return "???";
108 }
109}
71 110
72const char *ceph_pool_op_name(int op) 111const char *ceph_pool_op_name(int op)
73{ 112{
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 35fce755ce10..cbd06a91941c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
287 * @outpos: our position in that vector 287 * @outpos: our position in that vector
288 * @firstn: true if choosing "first n" items, false if choosing "indep" 288 * @firstn: true if choosing "first n" items, false if choosing "indep"
289 * @recurse_to_leaf: true if we want one device under each item of given type 289 * @recurse_to_leaf: true if we want one device under each item of given type
290 * @descend_once: true if we should only try one descent before giving up
290 * @out2: second output vector for leaf items (if @recurse_to_leaf) 291 * @out2: second output vector for leaf items (if @recurse_to_leaf)
291 */ 292 */
292static int crush_choose(const struct crush_map *map, 293static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
295 int x, int numrep, int type, 296 int x, int numrep, int type,
296 int *out, int outpos, 297 int *out, int outpos,
297 int firstn, int recurse_to_leaf, 298 int firstn, int recurse_to_leaf,
298 int *out2) 299 int descend_once, int *out2)
299{ 300{
300 int rep; 301 int rep;
301 unsigned int ftotal, flocal; 302 unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
391 } 392 }
392 393
393 reject = 0; 394 reject = 0;
394 if (recurse_to_leaf) { 395 if (!collide && recurse_to_leaf) {
395 if (item < 0) { 396 if (item < 0) {
396 if (crush_choose(map, 397 if (crush_choose(map,
397 map->buckets[-1-item], 398 map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
399 x, outpos+1, 0, 400 x, outpos+1, 0,
400 out2, outpos, 401 out2, outpos,
401 firstn, 0, 402 firstn, 0,
403 map->chooseleaf_descend_once,
402 NULL) <= outpos) 404 NULL) <= outpos)
403 /* didn't get leaf */ 405 /* didn't get leaf */
404 reject = 1; 406 reject = 1;
@@ -422,7 +424,10 @@ reject:
422 ftotal++; 424 ftotal++;
423 flocal++; 425 flocal++;
424 426
425 if (collide && flocal <= map->choose_local_tries) 427 if (reject && descend_once)
428 /* let outer call try again */
429 skip_rep = 1;
430 else if (collide && flocal <= map->choose_local_tries)
426 /* retry locally a few times */ 431 /* retry locally a few times */
427 retry_bucket = 1; 432 retry_bucket = 1;
428 else if (map->choose_local_fallback_tries > 0 && 433 else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
485 int i, j; 490 int i, j;
486 int numrep; 491 int numrep;
487 int firstn; 492 int firstn;
493 const int descend_once = 0;
488 494
489 if ((__u32)ruleno >= map->max_rules) { 495 if ((__u32)ruleno >= map->max_rules) {
490 dprintk(" bad ruleno %d\n", ruleno); 496 dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
544 curstep->arg2, 550 curstep->arg2,
545 o+osize, j, 551 o+osize, j,
546 firstn, 552 firstn,
547 recurse_to_leaf, c+osize); 553 recurse_to_leaf,
554 descend_once, c+osize);
548 } 555 }
549 556
550 if (recurse_to_leaf) 557 if (recurse_to_leaf)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5ccf87ed8d68..8a62a559a2aa 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -9,8 +9,9 @@
9#include <linux/slab.h> 9#include <linux/slab.h>
10#include <linux/socket.h> 10#include <linux/socket.h>
11#include <linux/string.h> 11#include <linux/string.h>
12#ifdef CONFIG_BLOCK
12#include <linux/bio.h> 13#include <linux/bio.h>
13#include <linux/blkdev.h> 14#endif /* CONFIG_BLOCK */
14#include <linux/dns_resolver.h> 15#include <linux/dns_resolver.h>
15#include <net/tcp.h> 16#include <net/tcp.h>
16 17
@@ -2651,9 +2652,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2651 m->page_alignment = 0; 2652 m->page_alignment = 0;
2652 m->pages = NULL; 2653 m->pages = NULL;
2653 m->pagelist = NULL; 2654 m->pagelist = NULL;
2655#ifdef CONFIG_BLOCK
2654 m->bio = NULL; 2656 m->bio = NULL;
2655 m->bio_iter = NULL; 2657 m->bio_iter = NULL;
2656 m->bio_seg = 0; 2658 m->bio_seg = 0;
2659#endif /* CONFIG_BLOCK */
2657 m->trail = NULL; 2660 m->trail = NULL;
2658 2661
2659 /* front */ 2662 /* front */
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..39629b66f3b1 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -23,7 +23,7 @@
23 23
24static const struct ceph_connection_operations osd_con_ops; 24static const struct ceph_connection_operations osd_con_ops;
25 25
26static void send_queued(struct ceph_osd_client *osdc); 26static void __send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc, 28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req); 29 struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
32static void __send_request(struct ceph_osd_client *osdc, 32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req); 33 struct ceph_osd_request *req);
34 34
35static int op_needs_trail(int op)
36{
37 switch (op) {
38 case CEPH_OSD_OP_GETXATTR:
39 case CEPH_OSD_OP_SETXATTR:
40 case CEPH_OSD_OP_CMPXATTR:
41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
43 return 1;
44 default:
45 return 0;
46 }
47}
48
49static int op_has_extent(int op) 35static int op_has_extent(int op)
50{ 36{
51 return (op == CEPH_OSD_OP_READ || 37 return (op == CEPH_OSD_OP_READ ||
52 op == CEPH_OSD_OP_WRITE); 38 op == CEPH_OSD_OP_WRITE);
53} 39}
54 40
55int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56 struct ceph_file_layout *layout,
57 u64 snapid,
58 u64 off, u64 *plen, u64 *bno,
59 struct ceph_osd_request *req,
60 struct ceph_osd_req_op *op)
61{
62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63 u64 orig_len = *plen;
64 u64 objoff, objlen; /* extent in object */
65 int r;
66
67 reqhead->snapid = cpu_to_le64(snapid);
68
69 /* object extent? */
70 r = ceph_calc_file_object_mapping(layout, off, plen, bno,
71 &objoff, &objlen);
72 if (r < 0)
73 return r;
74 if (*plen < orig_len)
75 dout(" skipping last %llu, final file extent %llu~%llu\n",
76 orig_len - *plen, off, *plen);
77
78 if (op_has_extent(op->op)) {
79 op->extent.offset = objoff;
80 op->extent.length = objlen;
81 }
82 req->r_num_pages = calc_pages_for(off, *plen);
83 req->r_page_alignment = off & ~PAGE_MASK;
84 if (op->op == CEPH_OSD_OP_WRITE)
85 op->payload_len = *plen;
86
87 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
88 *bno, objoff, objlen, req->r_num_pages);
89 return 0;
90}
91EXPORT_SYMBOL(ceph_calc_raw_layout);
92
93/* 41/*
94 * Implement client access to distributed object storage cluster. 42 * Implement client access to distributed object storage cluster.
95 * 43 *
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
115 * 63 *
116 * fill osd op in request message. 64 * fill osd op in request message.
117 */ 65 */
118static int calc_layout(struct ceph_osd_client *osdc, 66static int calc_layout(struct ceph_vino vino,
119 struct ceph_vino vino,
120 struct ceph_file_layout *layout, 67 struct ceph_file_layout *layout,
121 u64 off, u64 *plen, 68 u64 off, u64 *plen,
122 struct ceph_osd_request *req, 69 struct ceph_osd_request *req,
123 struct ceph_osd_req_op *op) 70 struct ceph_osd_req_op *op)
124{ 71{
125 u64 bno; 72 u64 orig_len = *plen;
73 u64 bno = 0;
74 u64 objoff = 0;
75 u64 objlen = 0;
126 int r; 76 int r;
127 77
128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 78 /* object extent? */
129 plen, &bno, req, op); 79 r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
80 &objoff, &objlen);
130 if (r < 0) 81 if (r < 0)
131 return r; 82 return r;
83 if (objlen < orig_len) {
84 *plen = objlen;
85 dout(" skipping last %llu, final file extent %llu~%llu\n",
86 orig_len - *plen, off, *plen);
87 }
88
89 if (op_has_extent(op->op)) {
90 u32 osize = le32_to_cpu(layout->fl_object_size);
91 op->extent.offset = objoff;
92 op->extent.length = objlen;
93 if (op->extent.truncate_size <= off - objoff) {
94 op->extent.truncate_size = 0;
95 } else {
96 op->extent.truncate_size -= off - objoff;
97 if (op->extent.truncate_size > osize)
98 op->extent.truncate_size = osize;
99 }
100 }
101 req->r_num_pages = calc_pages_for(off, *plen);
102 req->r_page_alignment = off & ~PAGE_MASK;
103 if (op->op == CEPH_OSD_OP_WRITE)
104 op->payload_len = *plen;
105
106 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
107 bno, objoff, objlen, req->r_num_pages);
132 108
133 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 109 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
134 req->r_oid_len = strlen(req->r_oid); 110 req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
148 if (req->r_request) 124 if (req->r_request)
149 ceph_msg_put(req->r_request); 125 ceph_msg_put(req->r_request);
150 if (req->r_con_filling_msg) { 126 if (req->r_con_filling_msg) {
151 dout("%s revoking pages %p from con %p\n", __func__, 127 dout("%s revoking msg %p from con %p\n", __func__,
152 req->r_pages, req->r_con_filling_msg); 128 req->r_reply, req->r_con_filling_msg);
153 ceph_msg_revoke_incoming(req->r_reply); 129 ceph_msg_revoke_incoming(req->r_reply);
154 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 130 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 req->r_con_filling_msg = NULL;
155 } 132 }
156 if (req->r_reply) 133 if (req->r_reply)
157 ceph_msg_put(req->r_reply); 134 ceph_msg_put(req->r_reply);
158 if (req->r_own_pages) 135 if (req->r_own_pages)
159 ceph_release_page_vector(req->r_pages, 136 ceph_release_page_vector(req->r_pages,
160 req->r_num_pages); 137 req->r_num_pages);
161#ifdef CONFIG_BLOCK
162 if (req->r_bio)
163 bio_put(req->r_bio);
164#endif
165 ceph_put_snap_context(req->r_snapc); 138 ceph_put_snap_context(req->r_snapc);
166 if (req->r_trail) { 139 ceph_pagelist_release(&req->r_trail);
167 ceph_pagelist_release(req->r_trail);
168 kfree(req->r_trail);
169 }
170 if (req->r_mempool) 140 if (req->r_mempool)
171 mempool_free(req, req->r_osdc->req_mempool); 141 mempool_free(req, req->r_osdc->req_mempool);
172 else 142 else
@@ -174,34 +144,14 @@ void ceph_osdc_release_request(struct kref *kref)
174} 144}
175EXPORT_SYMBOL(ceph_osdc_release_request); 145EXPORT_SYMBOL(ceph_osdc_release_request);
176 146
177static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178{
179 int i = 0;
180
181 if (needs_trail)
182 *needs_trail = 0;
183 while (ops[i].op) {
184 if (needs_trail && op_needs_trail(ops[i].op))
185 *needs_trail = 1;
186 i++;
187 }
188
189 return i;
190}
191
192struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 147struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
193 int flags,
194 struct ceph_snap_context *snapc, 148 struct ceph_snap_context *snapc,
195 struct ceph_osd_req_op *ops, 149 unsigned int num_op,
196 bool use_mempool, 150 bool use_mempool,
197 gfp_t gfp_flags, 151 gfp_t gfp_flags)
198 struct page **pages,
199 struct bio *bio)
200{ 152{
201 struct ceph_osd_request *req; 153 struct ceph_osd_request *req;
202 struct ceph_msg *msg; 154 struct ceph_msg *msg;
203 int needs_trail;
204 int num_op = get_num_ops(ops, &needs_trail);
205 size_t msg_size = sizeof(struct ceph_osd_request_head); 155 size_t msg_size = sizeof(struct ceph_osd_request_head);
206 156
207 msg_size += num_op*sizeof(struct ceph_osd_op); 157 msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +178,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
228 INIT_LIST_HEAD(&req->r_req_lru_item); 178 INIT_LIST_HEAD(&req->r_req_lru_item);
229 INIT_LIST_HEAD(&req->r_osd_item); 179 INIT_LIST_HEAD(&req->r_osd_item);
230 180
231 req->r_flags = flags;
232
233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
234
235 /* create reply message */ 181 /* create reply message */
236 if (use_mempool) 182 if (use_mempool)
237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 183 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +190,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
244 } 190 }
245 req->r_reply = msg; 191 req->r_reply = msg;
246 192
247 /* allocate space for the trailing data */ 193 ceph_pagelist_init(&req->r_trail);
248 if (needs_trail) {
249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
250 if (!req->r_trail) {
251 ceph_osdc_put_request(req);
252 return NULL;
253 }
254 ceph_pagelist_init(req->r_trail);
255 }
256 194
257 /* create request message; allow space for oid */ 195 /* create request message; allow space for oid */
258 msg_size += MAX_OBJ_NAME_SIZE; 196 msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +208,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
270 memset(msg->front.iov_base, 0, msg->front.iov_len); 208 memset(msg->front.iov_base, 0, msg->front.iov_len);
271 209
272 req->r_request = msg; 210 req->r_request = msg;
273 req->r_pages = pages;
274#ifdef CONFIG_BLOCK
275 if (bio) {
276 req->r_bio = bio;
277 bio_get(req->r_bio);
278 }
279#endif
280 211
281 return req; 212 return req;
282} 213}
@@ -289,6 +220,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
289 dst->op = cpu_to_le16(src->op); 220 dst->op = cpu_to_le16(src->op);
290 221
291 switch (src->op) { 222 switch (src->op) {
223 case CEPH_OSD_OP_STAT:
224 break;
292 case CEPH_OSD_OP_READ: 225 case CEPH_OSD_OP_READ:
293 case CEPH_OSD_OP_WRITE: 226 case CEPH_OSD_OP_WRITE:
294 dst->extent.offset = 227 dst->extent.offset =
@@ -300,52 +233,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
300 dst->extent.truncate_seq = 233 dst->extent.truncate_seq =
301 cpu_to_le32(src->extent.truncate_seq); 234 cpu_to_le32(src->extent.truncate_seq);
302 break; 235 break;
303
304 case CEPH_OSD_OP_GETXATTR:
305 case CEPH_OSD_OP_SETXATTR:
306 case CEPH_OSD_OP_CMPXATTR:
307 BUG_ON(!req->r_trail);
308
309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
311 dst->xattr.cmp_op = src->xattr.cmp_op;
312 dst->xattr.cmp_mode = src->xattr.cmp_mode;
313 ceph_pagelist_append(req->r_trail, src->xattr.name,
314 src->xattr.name_len);
315 ceph_pagelist_append(req->r_trail, src->xattr.val,
316 src->xattr.value_len);
317 break;
318 case CEPH_OSD_OP_CALL: 236 case CEPH_OSD_OP_CALL:
319 BUG_ON(!req->r_trail);
320
321 dst->cls.class_len = src->cls.class_len; 237 dst->cls.class_len = src->cls.class_len;
322 dst->cls.method_len = src->cls.method_len; 238 dst->cls.method_len = src->cls.method_len;
323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 239 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
324 240
325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 241 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
326 src->cls.class_len); 242 src->cls.class_len);
327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 243 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
328 src->cls.method_len); 244 src->cls.method_len);
329 ceph_pagelist_append(req->r_trail, src->cls.indata, 245 ceph_pagelist_append(&req->r_trail, src->cls.indata,
330 src->cls.indata_len); 246 src->cls.indata_len);
331 break; 247 break;
332 case CEPH_OSD_OP_ROLLBACK:
333 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
334 break;
335 case CEPH_OSD_OP_STARTSYNC: 248 case CEPH_OSD_OP_STARTSYNC:
336 break; 249 break;
337 case CEPH_OSD_OP_NOTIFY:
338 {
339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
340 __le32 timeout = cpu_to_le32(src->watch.timeout);
341
342 BUG_ON(!req->r_trail);
343
344 ceph_pagelist_append(req->r_trail,
345 &prot_ver, sizeof(prot_ver));
346 ceph_pagelist_append(req->r_trail,
347 &timeout, sizeof(timeout));
348 }
349 case CEPH_OSD_OP_NOTIFY_ACK: 250 case CEPH_OSD_OP_NOTIFY_ACK:
350 case CEPH_OSD_OP_WATCH: 251 case CEPH_OSD_OP_WATCH:
351 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 252 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +257,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
356 pr_err("unrecognized osd opcode %d\n", dst->op); 257 pr_err("unrecognized osd opcode %d\n", dst->op);
357 WARN_ON(1); 258 WARN_ON(1);
358 break; 259 break;
260 case CEPH_OSD_OP_MAPEXT:
261 case CEPH_OSD_OP_MASKTRUNC:
262 case CEPH_OSD_OP_SPARSE_READ:
263 case CEPH_OSD_OP_NOTIFY:
264 case CEPH_OSD_OP_ASSERT_VER:
265 case CEPH_OSD_OP_WRITEFULL:
266 case CEPH_OSD_OP_TRUNCATE:
267 case CEPH_OSD_OP_ZERO:
268 case CEPH_OSD_OP_DELETE:
269 case CEPH_OSD_OP_APPEND:
270 case CEPH_OSD_OP_SETTRUNC:
271 case CEPH_OSD_OP_TRIMTRUNC:
272 case CEPH_OSD_OP_TMAPUP:
273 case CEPH_OSD_OP_TMAPPUT:
274 case CEPH_OSD_OP_TMAPGET:
275 case CEPH_OSD_OP_CREATE:
276 case CEPH_OSD_OP_ROLLBACK:
277 case CEPH_OSD_OP_OMAPGETKEYS:
278 case CEPH_OSD_OP_OMAPGETVALS:
279 case CEPH_OSD_OP_OMAPGETHEADER:
280 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
281 case CEPH_OSD_OP_MODE_RD:
282 case CEPH_OSD_OP_OMAPSETVALS:
283 case CEPH_OSD_OP_OMAPSETHEADER:
284 case CEPH_OSD_OP_OMAPCLEAR:
285 case CEPH_OSD_OP_OMAPRMKEYS:
286 case CEPH_OSD_OP_OMAP_CMP:
287 case CEPH_OSD_OP_CLONERANGE:
288 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
289 case CEPH_OSD_OP_SRC_CMPXATTR:
290 case CEPH_OSD_OP_GETXATTR:
291 case CEPH_OSD_OP_GETXATTRS:
292 case CEPH_OSD_OP_CMPXATTR:
293 case CEPH_OSD_OP_SETXATTR:
294 case CEPH_OSD_OP_SETXATTRS:
295 case CEPH_OSD_OP_RESETXATTRS:
296 case CEPH_OSD_OP_RMXATTR:
297 case CEPH_OSD_OP_PULL:
298 case CEPH_OSD_OP_PUSH:
299 case CEPH_OSD_OP_BALANCEREADS:
300 case CEPH_OSD_OP_UNBALANCEREADS:
301 case CEPH_OSD_OP_SCRUB:
302 case CEPH_OSD_OP_SCRUB_RESERVE:
303 case CEPH_OSD_OP_SCRUB_UNRESERVE:
304 case CEPH_OSD_OP_SCRUB_STOP:
305 case CEPH_OSD_OP_SCRUB_MAP:
306 case CEPH_OSD_OP_WRLOCK:
307 case CEPH_OSD_OP_WRUNLOCK:
308 case CEPH_OSD_OP_RDLOCK:
309 case CEPH_OSD_OP_RDUNLOCK:
310 case CEPH_OSD_OP_UPLOCK:
311 case CEPH_OSD_OP_DNLOCK:
312 case CEPH_OSD_OP_PGLS:
313 case CEPH_OSD_OP_PGLS_FILTER:
314 pr_err("unsupported osd opcode %s\n",
315 ceph_osd_op_name(dst->op));
316 WARN_ON(1);
317 break;
359 } 318 }
360 dst->payload_len = cpu_to_le32(src->payload_len); 319 dst->payload_len = cpu_to_le32(src->payload_len);
361} 320}
@@ -365,25 +324,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
365 * 324 *
366 */ 325 */
367void ceph_osdc_build_request(struct ceph_osd_request *req, 326void ceph_osdc_build_request(struct ceph_osd_request *req,
368 u64 off, u64 *plen, 327 u64 off, u64 len, unsigned int num_op,
369 struct ceph_osd_req_op *src_ops, 328 struct ceph_osd_req_op *src_ops,
370 struct ceph_snap_context *snapc, 329 struct ceph_snap_context *snapc, u64 snap_id,
371 struct timespec *mtime, 330 struct timespec *mtime)
372 const char *oid,
373 int oid_len)
374{ 331{
375 struct ceph_msg *msg = req->r_request; 332 struct ceph_msg *msg = req->r_request;
376 struct ceph_osd_request_head *head; 333 struct ceph_osd_request_head *head;
377 struct ceph_osd_req_op *src_op; 334 struct ceph_osd_req_op *src_op;
378 struct ceph_osd_op *op; 335 struct ceph_osd_op *op;
379 void *p; 336 void *p;
380 int num_op = get_num_ops(src_ops, NULL);
381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 337 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
382 int flags = req->r_flags; 338 int flags = req->r_flags;
383 u64 data_len = 0; 339 u64 data_len;
384 int i; 340 int i;
385 341
342 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
343
386 head = msg->front.iov_base; 344 head = msg->front.iov_base;
345 head->snapid = cpu_to_le64(snap_id);
387 op = (void *)(head + 1); 346 op = (void *)(head + 1);
388 p = (void *)(op + num_op); 347 p = (void *)(op + num_op);
389 348
@@ -393,23 +352,17 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
393 head->flags = cpu_to_le32(flags); 352 head->flags = cpu_to_le32(flags);
394 if (flags & CEPH_OSD_FLAG_WRITE) 353 if (flags & CEPH_OSD_FLAG_WRITE)
395 ceph_encode_timespec(&head->mtime, mtime); 354 ceph_encode_timespec(&head->mtime, mtime);
355 BUG_ON(num_op > (unsigned int) ((u16) -1));
396 head->num_ops = cpu_to_le16(num_op); 356 head->num_ops = cpu_to_le16(num_op);
397 357
398
399 /* fill in oid */ 358 /* fill in oid */
400 head->object_len = cpu_to_le32(oid_len); 359 head->object_len = cpu_to_le32(req->r_oid_len);
401 memcpy(p, oid, oid_len); 360 memcpy(p, req->r_oid, req->r_oid_len);
402 p += oid_len; 361 p += req->r_oid_len;
403 362
404 src_op = src_ops; 363 src_op = src_ops;
405 while (src_op->op) { 364 while (num_op--)
406 osd_req_encode_op(req, op, src_op); 365 osd_req_encode_op(req, op++, src_op++);
407 src_op++;
408 op++;
409 }
410
411 if (req->r_trail)
412 data_len += req->r_trail->length;
413 366
414 if (snapc) { 367 if (snapc) {
415 head->snap_seq = cpu_to_le64(snapc->seq); 368 head->snap_seq = cpu_to_le64(snapc->seq);
@@ -420,14 +373,12 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
420 } 373 }
421 } 374 }
422 375
376 data_len = req->r_trail.length;
423 if (flags & CEPH_OSD_FLAG_WRITE) { 377 if (flags & CEPH_OSD_FLAG_WRITE) {
424 req->r_request->hdr.data_off = cpu_to_le16(off); 378 req->r_request->hdr.data_off = cpu_to_le16(off);
425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 379 data_len += len;
426 } else if (data_len) {
427 req->r_request->hdr.data_off = 0;
428 req->r_request->hdr.data_len = cpu_to_le32(data_len);
429 } 380 }
430 381 req->r_request->hdr.data_len = cpu_to_le32(data_len);
431 req->r_request->page_alignment = req->r_page_alignment; 382 req->r_request->page_alignment = req->r_page_alignment;
432 383
433 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 384 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
@@ -459,34 +410,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
459 u32 truncate_seq, 410 u32 truncate_seq,
460 u64 truncate_size, 411 u64 truncate_size,
461 struct timespec *mtime, 412 struct timespec *mtime,
462 bool use_mempool, int num_reply, 413 bool use_mempool,
463 int page_align) 414 int page_align)
464{ 415{
465 struct ceph_osd_req_op ops[3]; 416 struct ceph_osd_req_op ops[2];
466 struct ceph_osd_request *req; 417 struct ceph_osd_request *req;
418 unsigned int num_op = 1;
467 int r; 419 int r;
468 420
421 memset(&ops, 0, sizeof ops);
422
469 ops[0].op = opcode; 423 ops[0].op = opcode;
470 ops[0].extent.truncate_seq = truncate_seq; 424 ops[0].extent.truncate_seq = truncate_seq;
471 ops[0].extent.truncate_size = truncate_size; 425 ops[0].extent.truncate_size = truncate_size;
472 ops[0].payload_len = 0;
473 426
474 if (do_sync) { 427 if (do_sync) {
475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 428 ops[1].op = CEPH_OSD_OP_STARTSYNC;
476 ops[1].payload_len = 0; 429 num_op++;
477 ops[2].op = 0; 430 }
478 } else 431
479 ops[1].op = 0; 432 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
480 433 GFP_NOFS);
481 req = ceph_osdc_alloc_request(osdc, flags,
482 snapc, ops,
483 use_mempool,
484 GFP_NOFS, NULL, NULL);
485 if (!req) 434 if (!req)
486 return ERR_PTR(-ENOMEM); 435 return ERR_PTR(-ENOMEM);
436 req->r_flags = flags;
487 437
488 /* calculate max write size */ 438 /* calculate max write size */
489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 439 r = calc_layout(vino, layout, off, plen, req, ops);
490 if (r < 0) 440 if (r < 0)
491 return ERR_PTR(r); 441 return ERR_PTR(r);
492 req->r_file_layout = *layout; /* keep a copy */ 442 req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +446,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
496 req->r_num_pages = calc_pages_for(page_align, *plen); 446 req->r_num_pages = calc_pages_for(page_align, *plen);
497 req->r_page_alignment = page_align; 447 req->r_page_alignment = page_align;
498 448
499 ceph_osdc_build_request(req, off, plen, ops, 449 ceph_osdc_build_request(req, off, *plen, num_op, ops,
500 snapc, 450 snapc, vino.snap, mtime);
501 mtime,
502 req->r_oid, req->r_oid_len);
503 451
504 return req; 452 return req;
505} 453}
@@ -623,8 +571,8 @@ static void osd_reset(struct ceph_connection *con)
623 down_read(&osdc->map_sem); 571 down_read(&osdc->map_sem);
624 mutex_lock(&osdc->request_mutex); 572 mutex_lock(&osdc->request_mutex);
625 __kick_osd_requests(osdc, osd); 573 __kick_osd_requests(osdc, osd);
574 __send_queued(osdc);
626 mutex_unlock(&osdc->request_mutex); 575 mutex_unlock(&osdc->request_mutex);
627 send_queued(osdc);
628 up_read(&osdc->map_sem); 576 up_read(&osdc->map_sem);
629} 577}
630 578
@@ -739,31 +687,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
739 */ 687 */
740static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 688static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
741{ 689{
742 struct ceph_osd_request *req; 690 struct ceph_entity_addr *peer_addr;
743 int ret = 0;
744 691
745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 692 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
746 if (list_empty(&osd->o_requests) && 693 if (list_empty(&osd->o_requests) &&
747 list_empty(&osd->o_linger_requests)) { 694 list_empty(&osd->o_linger_requests)) {
748 __remove_osd(osdc, osd); 695 __remove_osd(osdc, osd);
749 ret = -ENODEV; 696
750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 697 return -ENODEV;
751 &osd->o_con.peer_addr, 698 }
752 sizeof(osd->o_con.peer_addr)) == 0 && 699
753 !ceph_con_opened(&osd->o_con)) { 700 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
701 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
702 !ceph_con_opened(&osd->o_con)) {
703 struct ceph_osd_request *req;
704
754 dout(" osd addr hasn't changed and connection never opened," 705 dout(" osd addr hasn't changed and connection never opened,"
755 " letting msgr retry"); 706 " letting msgr retry");
756 /* touch each r_stamp for handle_timeout()'s benfit */ 707 /* touch each r_stamp for handle_timeout()'s benfit */
757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 708 list_for_each_entry(req, &osd->o_requests, r_osd_item)
758 req->r_stamp = jiffies; 709 req->r_stamp = jiffies;
759 ret = -EAGAIN; 710
760 } else { 711 return -EAGAIN;
761 ceph_con_close(&osd->o_con);
762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
763 &osdc->osdmap->osd_addr[osd->o_osd]);
764 osd->o_incarnation++;
765 } 712 }
766 return ret; 713
714 ceph_con_close(&osd->o_con);
715 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
716 osd->o_incarnation++;
717
718 return 0;
767} 719}
768 720
769static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 721static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1062,16 +1014,13 @@ static void __send_request(struct ceph_osd_client *osdc,
1062/* 1014/*
1063 * Send any requests in the queue (req_unsent). 1015 * Send any requests in the queue (req_unsent).
1064 */ 1016 */
1065static void send_queued(struct ceph_osd_client *osdc) 1017static void __send_queued(struct ceph_osd_client *osdc)
1066{ 1018{
1067 struct ceph_osd_request *req, *tmp; 1019 struct ceph_osd_request *req, *tmp;
1068 1020
1069 dout("send_queued\n"); 1021 dout("__send_queued\n");
1070 mutex_lock(&osdc->request_mutex); 1022 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1071 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1072 __send_request(osdc, req); 1023 __send_request(osdc, req);
1073 }
1074 mutex_unlock(&osdc->request_mutex);
1075} 1024}
1076 1025
1077/* 1026/*
@@ -1123,8 +1072,8 @@ static void handle_timeout(struct work_struct *work)
1123 } 1072 }
1124 1073
1125 __schedule_osd_timeout(osdc); 1074 __schedule_osd_timeout(osdc);
1075 __send_queued(osdc);
1126 mutex_unlock(&osdc->request_mutex); 1076 mutex_unlock(&osdc->request_mutex);
1127 send_queued(osdc);
1128 up_read(&osdc->map_sem); 1077 up_read(&osdc->map_sem);
1129} 1078}
1130 1079
@@ -1462,7 +1411,9 @@ done:
1462 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1411 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1463 ceph_monc_request_next_osdmap(&osdc->client->monc); 1412 ceph_monc_request_next_osdmap(&osdc->client->monc);
1464 1413
1465 send_queued(osdc); 1414 mutex_lock(&osdc->request_mutex);
1415 __send_queued(osdc);
1416 mutex_unlock(&osdc->request_mutex);
1466 up_read(&osdc->map_sem); 1417 up_read(&osdc->map_sem);
1467 wake_up_all(&osdc->client->auth_wq); 1418 wake_up_all(&osdc->client->auth_wq);
1468 return; 1419 return;
@@ -1556,8 +1507,7 @@ static void __remove_event(struct ceph_osd_event *event)
1556 1507
1557int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1508int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1558 void (*event_cb)(u64, u64, u8, void *), 1509 void (*event_cb)(u64, u64, u8, void *),
1559 int one_shot, void *data, 1510 void *data, struct ceph_osd_event **pevent)
1560 struct ceph_osd_event **pevent)
1561{ 1511{
1562 struct ceph_osd_event *event; 1512 struct ceph_osd_event *event;
1563 1513
@@ -1567,14 +1517,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1567 1517
1568 dout("create_event %p\n", event); 1518 dout("create_event %p\n", event);
1569 event->cb = event_cb; 1519 event->cb = event_cb;
1570 event->one_shot = one_shot; 1520 event->one_shot = 0;
1571 event->data = data; 1521 event->data = data;
1572 event->osdc = osdc; 1522 event->osdc = osdc;
1573 INIT_LIST_HEAD(&event->osd_node); 1523 INIT_LIST_HEAD(&event->osd_node);
1574 RB_CLEAR_NODE(&event->node); 1524 RB_CLEAR_NODE(&event->node);
1575 kref_init(&event->kref); /* one ref for us */ 1525 kref_init(&event->kref); /* one ref for us */
1576 kref_get(&event->kref); /* one ref for the caller */ 1526 kref_get(&event->kref); /* one ref for the caller */
1577 init_completion(&event->completion);
1578 1527
1579 spin_lock(&osdc->event_lock); 1528 spin_lock(&osdc->event_lock);
1580 event->cookie = ++osdc->event_count; 1529 event->cookie = ++osdc->event_count;
@@ -1610,7 +1559,6 @@ static void do_event_work(struct work_struct *work)
1610 1559
1611 dout("do_event_work completing %p\n", event); 1560 dout("do_event_work completing %p\n", event);
1612 event->cb(ver, notify_id, opcode, event->data); 1561 event->cb(ver, notify_id, opcode, event->data);
1613 complete(&event->completion);
1614 dout("do_event_work completed %p\n", event); 1562 dout("do_event_work completed %p\n", event);
1615 ceph_osdc_put_event(event); 1563 ceph_osdc_put_event(event);
1616 kfree(event_work); 1564 kfree(event_work);
@@ -1620,7 +1568,8 @@ static void do_event_work(struct work_struct *work)
1620/* 1568/*
1621 * Process osd watch notifications 1569 * Process osd watch notifications
1622 */ 1570 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1571static void handle_watch_notify(struct ceph_osd_client *osdc,
1572 struct ceph_msg *msg)
1624{ 1573{
1625 void *p, *end; 1574 void *p, *end;
1626 u8 proto_ver; 1575 u8 proto_ver;
@@ -1641,9 +1590,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1641 spin_lock(&osdc->event_lock); 1590 spin_lock(&osdc->event_lock);
1642 event = __find_event(osdc, cookie); 1591 event = __find_event(osdc, cookie);
1643 if (event) { 1592 if (event) {
1593 BUG_ON(event->one_shot);
1644 get_event(event); 1594 get_event(event);
1645 if (event->one_shot)
1646 __remove_event(event);
1647 } 1595 }
1648 spin_unlock(&osdc->event_lock); 1596 spin_unlock(&osdc->event_lock);
1649 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1597 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +1616,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1668 return; 1616 return;
1669 1617
1670done_err: 1618done_err:
1671 complete(&event->completion);
1672 ceph_osdc_put_event(event); 1619 ceph_osdc_put_event(event);
1673 return; 1620 return;
1674 1621
@@ -1677,21 +1624,6 @@ bad:
1677 return; 1624 return;
1678} 1625}
1679 1626
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682 int err;
1683
1684 dout("wait_event %p\n", event);
1685 err = wait_for_completion_interruptible_timeout(&event->completion,
1686 timeout * HZ);
1687 ceph_osdc_put_event(event);
1688 if (err > 0)
1689 err = 0;
1690 dout("wait_event %p returns %d\n", event, err);
1691 return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/* 1627/*
1696 * Register request, send initial attempt. 1628 * Register request, send initial attempt.
1697 */ 1629 */
@@ -1706,7 +1638,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1706#ifdef CONFIG_BLOCK 1638#ifdef CONFIG_BLOCK
1707 req->r_request->bio = req->r_bio; 1639 req->r_request->bio = req->r_bio;
1708#endif 1640#endif
1709 req->r_request->trail = req->r_trail; 1641 req->r_request->trail = &req->r_trail;
1710 1642
1711 register_request(osdc, req); 1643 register_request(osdc, req);
1712 1644
@@ -1865,7 +1797,6 @@ out_mempool:
1865out: 1797out:
1866 return err; 1798 return err;
1867} 1799}
1868EXPORT_SYMBOL(ceph_osdc_init);
1869 1800
1870void ceph_osdc_stop(struct ceph_osd_client *osdc) 1801void ceph_osdc_stop(struct ceph_osd_client *osdc)
1871{ 1802{
@@ -1882,7 +1813,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1882 ceph_msgpool_destroy(&osdc->msgpool_op); 1813 ceph_msgpool_destroy(&osdc->msgpool_op);
1883 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1814 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1884} 1815}
1885EXPORT_SYMBOL(ceph_osdc_stop);
1886 1816
1887/* 1817/*
1888 * Read some contiguous pages. If we cross a stripe boundary, shorten 1818 * Read some contiguous pages. If we cross a stripe boundary, shorten
@@ -1902,7 +1832,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1902 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1832 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1903 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1833 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1904 NULL, 0, truncate_seq, truncate_size, NULL, 1834 NULL, 0, truncate_seq, truncate_size, NULL,
1905 false, 1, page_align); 1835 false, page_align);
1906 if (IS_ERR(req)) 1836 if (IS_ERR(req))
1907 return PTR_ERR(req); 1837 return PTR_ERR(req);
1908 1838
@@ -1931,8 +1861,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1931 u64 off, u64 len, 1861 u64 off, u64 len,
1932 u32 truncate_seq, u64 truncate_size, 1862 u32 truncate_seq, u64 truncate_size,
1933 struct timespec *mtime, 1863 struct timespec *mtime,
1934 struct page **pages, int num_pages, 1864 struct page **pages, int num_pages)
1935 int flags, int do_sync, bool nofail)
1936{ 1865{
1937 struct ceph_osd_request *req; 1866 struct ceph_osd_request *req;
1938 int rc = 0; 1867 int rc = 0;
@@ -1941,11 +1870,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1941 BUG_ON(vino.snap != CEPH_NOSNAP); 1870 BUG_ON(vino.snap != CEPH_NOSNAP);
1942 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1871 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1943 CEPH_OSD_OP_WRITE, 1872 CEPH_OSD_OP_WRITE,
1944 flags | CEPH_OSD_FLAG_ONDISK | 1873 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1945 CEPH_OSD_FLAG_WRITE, 1874 snapc, 0,
1946 snapc, do_sync,
1947 truncate_seq, truncate_size, mtime, 1875 truncate_seq, truncate_size, mtime,
1948 nofail, 1, page_align); 1876 true, page_align);
1949 if (IS_ERR(req)) 1877 if (IS_ERR(req))
1950 return PTR_ERR(req); 1878 return PTR_ERR(req);
1951 1879
@@ -1954,7 +1882,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1954 dout("writepages %llu~%llu (%d pages)\n", off, len, 1882 dout("writepages %llu~%llu (%d pages)\n", off, len,
1955 req->r_num_pages); 1883 req->r_num_pages);
1956 1884
1957 rc = ceph_osdc_start_request(osdc, req, nofail); 1885 rc = ceph_osdc_start_request(osdc, req, true);
1958 if (!rc) 1886 if (!rc)
1959 rc = ceph_osdc_wait_request(osdc, req); 1887 rc = ceph_osdc_wait_request(osdc, req);
1960 1888
@@ -2047,7 +1975,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2047 if (data_len > 0) { 1975 if (data_len > 0) {
2048 int want = calc_pages_for(req->r_page_alignment, data_len); 1976 int want = calc_pages_for(req->r_page_alignment, data_len);
2049 1977
2050 if (unlikely(req->r_num_pages < want)) { 1978 if (req->r_pages && unlikely(req->r_num_pages < want)) {
2051 pr_warning("tid %lld reply has %d bytes %d pages, we" 1979 pr_warning("tid %lld reply has %d bytes %d pages, we"
2052 " had only %d pages ready\n", tid, data_len, 1980 " had only %d pages ready\n", tid, data_len,
2053 want, req->r_num_pages); 1981 want, req->r_num_pages);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index de73214b5d26..3c61e21611d3 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -13,26 +13,18 @@
13 13
14char *ceph_osdmap_state_str(char *str, int len, int state) 14char *ceph_osdmap_state_str(char *str, int len, int state)
15{ 15{
16 int flag = 0;
17
18 if (!len) 16 if (!len)
19 goto done; 17 return str;
20 18
21 *str = '\0'; 19 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
22 if (state) { 20 snprintf(str, len, "exists, up");
23 if (state & CEPH_OSD_EXISTS) { 21 else if (state & CEPH_OSD_EXISTS)
24 snprintf(str, len, "exists"); 22 snprintf(str, len, "exists");
25 flag = 1; 23 else if (state & CEPH_OSD_UP)
26 } 24 snprintf(str, len, "up");
27 if (state & CEPH_OSD_UP) { 25 else
28 snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
29 "up");
30 flag = 1;
31 }
32 } else {
33 snprintf(str, len, "doesn't exist"); 26 snprintf(str, len, "doesn't exist");
34 } 27
35done:
36 return str; 28 return str;
37} 29}
38 30
@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
170 c->choose_local_tries = 2; 162 c->choose_local_tries = 2;
171 c->choose_local_fallback_tries = 5; 163 c->choose_local_fallback_tries = 5;
172 c->choose_total_tries = 19; 164 c->choose_total_tries = 19;
165 c->chooseleaf_descend_once = 0;
173 166
174 ceph_decode_need(p, end, 4*sizeof(u32), bad); 167 ceph_decode_need(p, end, 4*sizeof(u32), bad);
175 magic = ceph_decode_32(p); 168 magic = ceph_decode_32(p);
@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
336 dout("crush decode tunable choose_total_tries = %d", 329 dout("crush decode tunable choose_total_tries = %d",
337 c->choose_total_tries); 330 c->choose_total_tries);
338 331
332 ceph_decode_need(p, end, sizeof(u32), done);
333 c->chooseleaf_descend_once = ceph_decode_32(p);
334 dout("crush decode tunable chooseleaf_descend_once = %d",
335 c->chooseleaf_descend_once);
336
339done: 337done:
340 dout("crush_decode success\n"); 338 dout("crush_decode success\n");
341 return c; 339 return c;
@@ -1010,7 +1008,7 @@ bad:
1010 * pass a stride back to the caller. 1008 * pass a stride back to the caller.
1011 */ 1009 */
1012int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, 1010int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1013 u64 off, u64 *plen, 1011 u64 off, u64 len,
1014 u64 *ono, 1012 u64 *ono,
1015 u64 *oxoff, u64 *oxlen) 1013 u64 *oxoff, u64 *oxlen)
1016{ 1014{
@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1021 u32 su_per_object; 1019 u32 su_per_object;
1022 u64 t, su_offset; 1020 u64 t, su_offset;
1023 1021
1024 dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, 1022 dout("mapping %llu~%llu osize %u fl_su %u\n", off, len,
1025 osize, su); 1023 osize, su);
1026 if (su == 0 || sc == 0) 1024 if (su == 0 || sc == 0)
1027 goto invalid; 1025 goto invalid;
@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
1054 1052
1055 /* 1053 /*
1056 * Calculate the length of the extent being written to the selected 1054 * Calculate the length of the extent being written to the selected
1057 * object. This is the minimum of the full length requested (plen) or 1055 * object. This is the minimum of the full length requested (len) or
1058 * the remainder of the current stripe being written to. 1056 * the remainder of the current stripe being written to.
1059 */ 1057 */
1060 *oxlen = min_t(u64, *plen, su - su_offset); 1058 *oxlen = min_t(u64, len, su - su_offset);
1061 *plen = *oxlen;
1062 1059
1063 dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); 1060 dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
1064 return 0; 1061 return 0;
diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c
index cd9c21df87d1..815a2249cfa9 100644
--- a/net/ceph/pagevec.c
+++ b/net/ceph/pagevec.c
@@ -12,7 +12,7 @@
12/* 12/*
13 * build a vector of user pages 13 * build a vector of user pages
14 */ 14 */
15struct page **ceph_get_direct_page_vector(const char __user *data, 15struct page **ceph_get_direct_page_vector(const void __user *data,
16 int num_pages, bool write_page) 16 int num_pages, bool write_page)
17{ 17{
18 struct page **pages; 18 struct page **pages;
@@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector);
93 * copy user data into a page vector 93 * copy user data into a page vector
94 */ 94 */
95int ceph_copy_user_to_page_vector(struct page **pages, 95int ceph_copy_user_to_page_vector(struct page **pages,
96 const char __user *data, 96 const void __user *data,
97 loff_t off, size_t len) 97 loff_t off, size_t len)
98{ 98{
99 int i = 0; 99 int i = 0;
@@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages,
118} 118}
119EXPORT_SYMBOL(ceph_copy_user_to_page_vector); 119EXPORT_SYMBOL(ceph_copy_user_to_page_vector);
120 120
121int ceph_copy_to_page_vector(struct page **pages, 121void ceph_copy_to_page_vector(struct page **pages,
122 const char *data, 122 const void *data,
123 loff_t off, size_t len) 123 loff_t off, size_t len)
124{ 124{
125 int i = 0; 125 int i = 0;
126 size_t po = off & ~PAGE_CACHE_MASK; 126 size_t po = off & ~PAGE_CACHE_MASK;
127 size_t left = len; 127 size_t left = len;
128 size_t l;
129 128
130 while (left > 0) { 129 while (left > 0) {
131 l = min_t(size_t, PAGE_CACHE_SIZE-po, left); 130 size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
131
132 memcpy(page_address(pages[i]) + po, data, l); 132 memcpy(page_address(pages[i]) + po, data, l);
133 data += l; 133 data += l;
134 left -= l; 134 left -= l;
@@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages,
138 i++; 138 i++;
139 } 139 }
140 } 140 }
141 return len;
142} 141}
143EXPORT_SYMBOL(ceph_copy_to_page_vector); 142EXPORT_SYMBOL(ceph_copy_to_page_vector);
144 143
145int ceph_copy_from_page_vector(struct page **pages, 144void ceph_copy_from_page_vector(struct page **pages,
146 char *data, 145 void *data,
147 loff_t off, size_t len) 146 loff_t off, size_t len)
148{ 147{
149 int i = 0; 148 int i = 0;
150 size_t po = off & ~PAGE_CACHE_MASK; 149 size_t po = off & ~PAGE_CACHE_MASK;
151 size_t left = len; 150 size_t left = len;
152 size_t l;
153 151
154 while (left > 0) { 152 while (left > 0) {
155 l = min_t(size_t, PAGE_CACHE_SIZE-po, left); 153 size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
154
156 memcpy(data, page_address(pages[i]) + po, l); 155 memcpy(data, page_address(pages[i]) + po, l);
157 data += l; 156 data += l;
158 left -= l; 157 left -= l;
@@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages,
162 i++; 161 i++;
163 } 162 }
164 } 163 }
165 return len;
166} 164}
167EXPORT_SYMBOL(ceph_copy_from_page_vector); 165EXPORT_SYMBOL(ceph_copy_from_page_vector);
168 166
@@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector);
170 * copy user data from a page vector into a user pointer 168 * copy user data from a page vector into a user pointer
171 */ 169 */
172int ceph_copy_page_vector_to_user(struct page **pages, 170int ceph_copy_page_vector_to_user(struct page **pages,
173 char __user *data, 171 void __user *data,
174 loff_t off, size_t len) 172 loff_t off, size_t len)
175{ 173{
176 int i = 0; 174 int i = 0;