summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-10-10 16:52:05 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2016-10-10 16:52:05 -0400
commit8dfb790b15e779232d5d4e3f0102af2bea21ca55 (patch)
tree7208241fc93d39f769dcec0c227c8582f117dfce /net
parentfed41f7d039bad02f94cad9059e4b14cd81d13f2 (diff)
parent64f77566e1c84990d6c448bb3960f899521c0b7d (diff)
Merge tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client
Pull Ceph updates from Ilya Dryomov: "The big ticket item here is support for rbd exclusive-lock feature, with maintenance operations offloaded to userspace (Douglas Fuller, Mike Christie and myself). Another block device bullet is a series fixing up layering error paths (myself). On the filesystem side, we've got patches that improve our handling of buffered vs dio write races (Neil Brown) and a few assorted fixes from Zheng. Also included a couple of random cleanups and a minor CRUSH update" * tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client: (39 commits) crush: remove redundant local variable crush: don't normalize input of crush_ln iteratively libceph: ceph_build_auth() doesn't need ceph_auth_build_hello() libceph: use CEPH_AUTH_UNKNOWN in ceph_auth_build_hello() ceph: fix description for rsize and rasize mount options rbd: use kmalloc_array() in rbd_header_from_disk() ceph: use list_move instead of list_del/list_add ceph: handle CEPH_SESSION_REJECT message ceph: avoid accessing / when mounting a subpath ceph: fix mandatory flock check ceph: remove warning when ceph_releasepage() is called on dirty page ceph: ignore error from invalidate_inode_pages2_range() in direct write ceph: fix error handling of start_read() rbd: add rbd_obj_request_error() helper rbd: img_data requests don't own their page array rbd: don't call rbd_osd_req_format_read() for !img_data requests rbd: rework rbd_img_obj_exists_submit() error paths rbd: don't crash or leak on errors in rbd_img_obj_parent_read_full_callback() rbd: move bumping img_request refcount into rbd_obj_request_submit() rbd: mark the original request as done if stat request fails ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/Makefile1
-rw-r--r--net/ceph/auth.c13
-rw-r--r--net/ceph/auth_none.c2
-rw-r--r--net/ceph/ceph_common.c13
-rw-r--r--net/ceph/ceph_strings.c1
-rw-r--r--net/ceph/cls_lock_client.c325
-rw-r--r--net/ceph/crush/mapper.c17
-rw-r--r--net/ceph/mon_client.c82
-rw-r--r--net/ceph/osd_client.c169
9 files changed, 607 insertions, 16 deletions
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 84cbed630c4b..6a5180903e7b 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o
5 5
6libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ 6libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
7 mon_client.o \ 7 mon_client.o \
8 cls_lock_client.o \
8 osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ 9 osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
9 debugfs.o \ 10 debugfs.o \
10 auth.o auth_none.o \ 11 auth.o auth_none.o \
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index 2bc5965fdd1e..c822b3ae1bd3 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -82,7 +82,10 @@ void ceph_auth_reset(struct ceph_auth_client *ac)
82 mutex_unlock(&ac->mutex); 82 mutex_unlock(&ac->mutex);
83} 83}
84 84
85int ceph_entity_name_encode(const char *name, void **p, void *end) 85/*
86 * EntityName, not to be confused with entity_name_t
87 */
88int ceph_auth_entity_name_encode(const char *name, void **p, void *end)
86{ 89{
87 int len = strlen(name); 90 int len = strlen(name);
88 91
@@ -111,7 +114,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
111 monhdr->session_mon = cpu_to_le16(-1); 114 monhdr->session_mon = cpu_to_le16(-1);
112 monhdr->session_mon_tid = 0; 115 monhdr->session_mon_tid = 0;
113 116
114 ceph_encode_32(&p, 0); /* no protocol, yet */ 117 ceph_encode_32(&p, CEPH_AUTH_UNKNOWN); /* no protocol, yet */
115 118
116 lenp = p; 119 lenp = p;
117 p += sizeof(u32); 120 p += sizeof(u32);
@@ -124,7 +127,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
124 for (i = 0; i < num; i++) 127 for (i = 0; i < num; i++)
125 ceph_encode_32(&p, supported_protocols[i]); 128 ceph_encode_32(&p, supported_protocols[i]);
126 129
127 ret = ceph_entity_name_encode(ac->name, &p, end); 130 ret = ceph_auth_entity_name_encode(ac->name, &p, end);
128 if (ret < 0) 131 if (ret < 0)
129 goto out; 132 goto out;
130 ceph_decode_need(&p, end, sizeof(u64), bad); 133 ceph_decode_need(&p, end, sizeof(u64), bad);
@@ -259,9 +262,7 @@ int ceph_build_auth(struct ceph_auth_client *ac,
259 int ret = 0; 262 int ret = 0;
260 263
261 mutex_lock(&ac->mutex); 264 mutex_lock(&ac->mutex);
262 if (!ac->protocol) 265 if (ac->ops->should_authenticate(ac))
263 ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
264 else if (ac->ops->should_authenticate(ac))
265 ret = ceph_build_auth_request(ac, msg_buf, msg_len); 266 ret = ceph_build_auth_request(ac, msg_buf, msg_len);
266 mutex_unlock(&ac->mutex); 267 mutex_unlock(&ac->mutex);
267 return ret; 268 return ret;
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 5f836f02ae36..df45e467c81f 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -46,7 +46,7 @@ static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac,
46 int ret; 46 int ret;
47 47
48 ceph_encode_8_safe(&p, end, 1, e_range); 48 ceph_encode_8_safe(&p, end, 1, e_range);
49 ret = ceph_entity_name_encode(ac->name, &p, end); 49 ret = ceph_auth_entity_name_encode(ac->name, &p, end);
50 if (ret < 0) 50 if (ret < 0)
51 return ret; 51 return ret;
52 52
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bddfcf6f09c2..464e88599b9d 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -566,11 +566,17 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
566} 566}
567EXPORT_SYMBOL(ceph_print_client_options); 567EXPORT_SYMBOL(ceph_print_client_options);
568 568
569u64 ceph_client_id(struct ceph_client *client) 569struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client)
570{
571 return &client->msgr.inst.addr;
572}
573EXPORT_SYMBOL(ceph_client_addr);
574
575u64 ceph_client_gid(struct ceph_client *client)
570{ 576{
571 return client->monc.auth->global_id; 577 return client->monc.auth->global_id;
572} 578}
573EXPORT_SYMBOL(ceph_client_id); 579EXPORT_SYMBOL(ceph_client_gid);
574 580
575/* 581/*
576 * create a fresh client instance 582 * create a fresh client instance
@@ -685,7 +691,8 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
685 return client->auth_err; 691 return client->auth_err;
686 } 692 }
687 693
688 pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid); 694 pr_info("client%llu fsid %pU\n", ceph_client_gid(client),
695 &client->fsid);
689 ceph_debugfs_client_init(client); 696 ceph_debugfs_client_init(client);
690 697
691 return 0; 698 return 0;
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3773a4fa11e3..19b7d8aa915c 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type)
15 default: return "unknown"; 15 default: return "unknown";
16 } 16 }
17} 17}
18EXPORT_SYMBOL(ceph_entity_type_name);
18 19
19const char *ceph_osd_op_name(int op) 20const char *ceph_osd_op_name(int op)
20{ 21{
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
new file mode 100644
index 000000000000..50f040fdb2a9
--- /dev/null
+++ b/net/ceph/cls_lock_client.c
@@ -0,0 +1,325 @@
1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/types.h>
4#include <linux/slab.h>
5
6#include <linux/ceph/cls_lock_client.h>
7#include <linux/ceph/decode.h>
8
9/**
10 * ceph_cls_lock - grab rados lock for object
11 * @oid, @oloc: object to lock
12 * @lock_name: the name of the lock
13 * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
14 * @cookie: user-defined identifier for this instance of the lock
15 * @tag: user-defined tag
16 * @desc: user-defined lock description
17 * @flags: lock flags
18 *
19 * All operations on the same lock should use the same tag.
20 */
21int ceph_cls_lock(struct ceph_osd_client *osdc,
22 struct ceph_object_id *oid,
23 struct ceph_object_locator *oloc,
24 char *lock_name, u8 type, char *cookie,
25 char *tag, char *desc, u8 flags)
26{
27 int lock_op_buf_size;
28 int name_len = strlen(lock_name);
29 int cookie_len = strlen(cookie);
30 int tag_len = strlen(tag);
31 int desc_len = strlen(desc);
32 void *p, *end;
33 struct page *lock_op_page;
34 struct timespec mtime;
35 int ret;
36
37 lock_op_buf_size = name_len + sizeof(__le32) +
38 cookie_len + sizeof(__le32) +
39 tag_len + sizeof(__le32) +
40 desc_len + sizeof(__le32) +
41 sizeof(struct ceph_timespec) +
42 /* flag and type */
43 sizeof(u8) + sizeof(u8) +
44 CEPH_ENCODING_START_BLK_LEN;
45 if (lock_op_buf_size > PAGE_SIZE)
46 return -E2BIG;
47
48 lock_op_page = alloc_page(GFP_NOIO);
49 if (!lock_op_page)
50 return -ENOMEM;
51
52 p = page_address(lock_op_page);
53 end = p + lock_op_buf_size;
54
55 /* encode cls_lock_lock_op struct */
56 ceph_start_encoding(&p, 1, 1,
57 lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
58 ceph_encode_string(&p, end, lock_name, name_len);
59 ceph_encode_8(&p, type);
60 ceph_encode_string(&p, end, cookie, cookie_len);
61 ceph_encode_string(&p, end, tag, tag_len);
62 ceph_encode_string(&p, end, desc, desc_len);
63 /* only support infinite duration */
64 memset(&mtime, 0, sizeof(mtime));
65 ceph_encode_timespec(p, &mtime);
66 p += sizeof(struct ceph_timespec);
67 ceph_encode_8(&p, flags);
68
69 dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
70 __func__, lock_name, type, cookie, tag, desc, flags);
71 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
72 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
73 lock_op_page, lock_op_buf_size, NULL, NULL);
74
75 dout("%s: status %d\n", __func__, ret);
76 __free_page(lock_op_page);
77 return ret;
78}
79EXPORT_SYMBOL(ceph_cls_lock);
80
81/**
82 * ceph_cls_unlock - release rados lock for object
83 * @oid, @oloc: object to lock
84 * @lock_name: the name of the lock
85 * @cookie: user-defined identifier for this instance of the lock
86 */
87int ceph_cls_unlock(struct ceph_osd_client *osdc,
88 struct ceph_object_id *oid,
89 struct ceph_object_locator *oloc,
90 char *lock_name, char *cookie)
91{
92 int unlock_op_buf_size;
93 int name_len = strlen(lock_name);
94 int cookie_len = strlen(cookie);
95 void *p, *end;
96 struct page *unlock_op_page;
97 int ret;
98
99 unlock_op_buf_size = name_len + sizeof(__le32) +
100 cookie_len + sizeof(__le32) +
101 CEPH_ENCODING_START_BLK_LEN;
102 if (unlock_op_buf_size > PAGE_SIZE)
103 return -E2BIG;
104
105 unlock_op_page = alloc_page(GFP_NOIO);
106 if (!unlock_op_page)
107 return -ENOMEM;
108
109 p = page_address(unlock_op_page);
110 end = p + unlock_op_buf_size;
111
112 /* encode cls_lock_unlock_op struct */
113 ceph_start_encoding(&p, 1, 1,
114 unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
115 ceph_encode_string(&p, end, lock_name, name_len);
116 ceph_encode_string(&p, end, cookie, cookie_len);
117
118 dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
119 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
120 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
121 unlock_op_page, unlock_op_buf_size, NULL, NULL);
122
123 dout("%s: status %d\n", __func__, ret);
124 __free_page(unlock_op_page);
125 return ret;
126}
127EXPORT_SYMBOL(ceph_cls_unlock);
128
129/**
130 * ceph_cls_break_lock - release rados lock for object for specified client
131 * @oid, @oloc: object to lock
132 * @lock_name: the name of the lock
133 * @cookie: user-defined identifier for this instance of the lock
134 * @locker: current lock owner
135 */
136int ceph_cls_break_lock(struct ceph_osd_client *osdc,
137 struct ceph_object_id *oid,
138 struct ceph_object_locator *oloc,
139 char *lock_name, char *cookie,
140 struct ceph_entity_name *locker)
141{
142 int break_op_buf_size;
143 int name_len = strlen(lock_name);
144 int cookie_len = strlen(cookie);
145 struct page *break_op_page;
146 void *p, *end;
147 int ret;
148
149 break_op_buf_size = name_len + sizeof(__le32) +
150 cookie_len + sizeof(__le32) +
151 sizeof(u8) + sizeof(__le64) +
152 CEPH_ENCODING_START_BLK_LEN;
153 if (break_op_buf_size > PAGE_SIZE)
154 return -E2BIG;
155
156 break_op_page = alloc_page(GFP_NOIO);
157 if (!break_op_page)
158 return -ENOMEM;
159
160 p = page_address(break_op_page);
161 end = p + break_op_buf_size;
162
163 /* encode cls_lock_break_op struct */
164 ceph_start_encoding(&p, 1, 1,
165 break_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
166 ceph_encode_string(&p, end, lock_name, name_len);
167 ceph_encode_copy(&p, locker, sizeof(*locker));
168 ceph_encode_string(&p, end, cookie, cookie_len);
169
170 dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
171 cookie, ENTITY_NAME(*locker));
172 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
173 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
174 break_op_page, break_op_buf_size, NULL, NULL);
175
176 dout("%s: status %d\n", __func__, ret);
177 __free_page(break_op_page);
178 return ret;
179}
180EXPORT_SYMBOL(ceph_cls_break_lock);
181
182void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
183{
184 int i;
185
186 for (i = 0; i < num_lockers; i++)
187 kfree(lockers[i].id.cookie);
188 kfree(lockers);
189}
190EXPORT_SYMBOL(ceph_free_lockers);
191
192static int decode_locker(void **p, void *end, struct ceph_locker *locker)
193{
194 u8 struct_v;
195 u32 len;
196 char *s;
197 int ret;
198
199 ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len);
200 if (ret)
201 return ret;
202
203 ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name));
204 s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
205 if (IS_ERR(s))
206 return PTR_ERR(s);
207
208 locker->id.cookie = s;
209
210 ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len);
211 if (ret)
212 return ret;
213
214 *p += sizeof(struct ceph_timespec); /* skip expiration */
215 ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
216 ceph_decode_addr(&locker->info.addr);
217 len = ceph_decode_32(p);
218 *p += len; /* skip description */
219
220 dout("%s %s%llu cookie %s addr %s\n", __func__,
221 ENTITY_NAME(locker->id.name), locker->id.cookie,
222 ceph_pr_addr(&locker->info.addr.in_addr));
223 return 0;
224}
225
226static int decode_lockers(void **p, void *end, u8 *type, char **tag,
227 struct ceph_locker **lockers, u32 *num_lockers)
228{
229 u8 struct_v;
230 u32 struct_len;
231 char *s;
232 int i;
233 int ret;
234
235 ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply",
236 &struct_v, &struct_len);
237 if (ret)
238 return ret;
239
240 *num_lockers = ceph_decode_32(p);
241 *lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO);
242 if (!*lockers)
243 return -ENOMEM;
244
245 for (i = 0; i < *num_lockers; i++) {
246 ret = decode_locker(p, end, *lockers + i);
247 if (ret)
248 goto err_free_lockers;
249 }
250
251 *type = ceph_decode_8(p);
252 s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
253 if (IS_ERR(s)) {
254 ret = PTR_ERR(s);
255 goto err_free_lockers;
256 }
257
258 *tag = s;
259 return 0;
260
261err_free_lockers:
262 ceph_free_lockers(*lockers, *num_lockers);
263 return ret;
264}
265
266/*
267 * On success, the caller is responsible for:
268 *
269 * kfree(tag);
270 * ceph_free_lockers(lockers, num_lockers);
271 */
272int ceph_cls_lock_info(struct ceph_osd_client *osdc,
273 struct ceph_object_id *oid,
274 struct ceph_object_locator *oloc,
275 char *lock_name, u8 *type, char **tag,
276 struct ceph_locker **lockers, u32 *num_lockers)
277{
278 int get_info_op_buf_size;
279 int name_len = strlen(lock_name);
280 struct page *get_info_op_page, *reply_page;
281 size_t reply_len;
282 void *p, *end;
283 int ret;
284
285 get_info_op_buf_size = name_len + sizeof(__le32) +
286 CEPH_ENCODING_START_BLK_LEN;
287 if (get_info_op_buf_size > PAGE_SIZE)
288 return -E2BIG;
289
290 get_info_op_page = alloc_page(GFP_NOIO);
291 if (!get_info_op_page)
292 return -ENOMEM;
293
294 reply_page = alloc_page(GFP_NOIO);
295 if (!reply_page) {
296 __free_page(get_info_op_page);
297 return -ENOMEM;
298 }
299
300 p = page_address(get_info_op_page);
301 end = p + get_info_op_buf_size;
302
303 /* encode cls_lock_get_info_op struct */
304 ceph_start_encoding(&p, 1, 1,
305 get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
306 ceph_encode_string(&p, end, lock_name, name_len);
307
308 dout("%s lock_name %s\n", __func__, lock_name);
309 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
310 CEPH_OSD_FLAG_READ, get_info_op_page,
311 get_info_op_buf_size, reply_page, &reply_len);
312
313 dout("%s: status %d\n", __func__, ret);
314 if (ret >= 0) {
315 p = page_address(reply_page);
316 end = p + reply_len;
317
318 ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
319 }
320
321 __free_page(get_info_op_page);
322 __free_page(reply_page);
323 return ret;
324}
325EXPORT_SYMBOL(ceph_cls_lock_info);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 5fcfb98f309e..a421e905331a 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -245,7 +245,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
245/* compute 2^44*log2(input+1) */ 245/* compute 2^44*log2(input+1) */
246static __u64 crush_ln(unsigned int xin) 246static __u64 crush_ln(unsigned int xin)
247{ 247{
248 unsigned int x = xin, x1; 248 unsigned int x = xin;
249 int iexpon, index1, index2; 249 int iexpon, index1, index2;
250 __u64 RH, LH, LL, xl64, result; 250 __u64 RH, LH, LL, xl64, result;
251 251
@@ -253,9 +253,15 @@ static __u64 crush_ln(unsigned int xin)
253 253
254 /* normalize input */ 254 /* normalize input */
255 iexpon = 15; 255 iexpon = 15;
256 while (!(x & 0x18000)) { 256
257 x <<= 1; 257 /*
258 iexpon--; 258 * figure out number of bits we need to shift and
259 * do it in one step instead of iteratively
260 */
261 if (!(x & 0x18000)) {
262 int bits = __builtin_clz(x & 0x1FFFF) - 16;
263 x <<= bits;
264 iexpon = 15 - bits;
259 } 265 }
260 266
261 index1 = (x >> 8) << 1; 267 index1 = (x >> 8) << 1;
@@ -267,12 +273,11 @@ static __u64 crush_ln(unsigned int xin)
267 /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */ 273 /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */
268 xl64 = (__s64)x * RH; 274 xl64 = (__s64)x * RH;
269 xl64 >>= 48; 275 xl64 >>= 48;
270 x1 = xl64;
271 276
272 result = iexpon; 277 result = iexpon;
273 result <<= (12 + 32); 278 result <<= (12 + 32);
274 279
275 index2 = x1 & 0xff; 280 index2 = xl64 & 0xff;
276 /* LL ~ 2^48*log2(1.0+index2/2^15) */ 281 /* LL ~ 2^48*log2(1.0+index2/2^15) */
277 LL = __LL_tbl[index2]; 282 LL = __LL_tbl[index2];
278 283
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index ef34a02719d7..a8effc8b7280 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -835,6 +835,83 @@ int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
835} 835}
836EXPORT_SYMBOL(ceph_monc_get_version_async); 836EXPORT_SYMBOL(ceph_monc_get_version_async);
837 837
838static void handle_command_ack(struct ceph_mon_client *monc,
839 struct ceph_msg *msg)
840{
841 struct ceph_mon_generic_request *req;
842 void *p = msg->front.iov_base;
843 void *const end = p + msg->front_alloc_len;
844 u64 tid = le64_to_cpu(msg->hdr.tid);
845
846 dout("%s msg %p tid %llu\n", __func__, msg, tid);
847
848 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
849 sizeof(u32), bad);
850 p += sizeof(struct ceph_mon_request_header);
851
852 mutex_lock(&monc->mutex);
853 req = lookup_generic_request(&monc->generic_request_tree, tid);
854 if (!req) {
855 mutex_unlock(&monc->mutex);
856 return;
857 }
858
859 req->result = ceph_decode_32(&p);
860 __finish_generic_request(req);
861 mutex_unlock(&monc->mutex);
862
863 complete_generic_request(req);
864 return;
865
866bad:
867 pr_err("corrupt mon_command ack, tid %llu\n", tid);
868 ceph_msg_dump(msg);
869}
870
871int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
872 struct ceph_entity_addr *client_addr)
873{
874 struct ceph_mon_generic_request *req;
875 struct ceph_mon_command *h;
876 int ret = -ENOMEM;
877 int len;
878
879 req = alloc_generic_request(monc, GFP_NOIO);
880 if (!req)
881 goto out;
882
883 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
884 if (!req->request)
885 goto out;
886
887 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
888 true);
889 if (!req->reply)
890 goto out;
891
892 mutex_lock(&monc->mutex);
893 register_generic_request(req);
894 h = req->request->front.iov_base;
895 h->monhdr.have_version = 0;
896 h->monhdr.session_mon = cpu_to_le16(-1);
897 h->monhdr.session_mon_tid = 0;
898 h->fsid = monc->monmap->fsid;
899 h->num_strs = cpu_to_le32(1);
900 len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
901 \"blacklistop\": \"add\", \
902 \"addr\": \"%pISpc/%u\" }",
903 &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
904 h->str_len = cpu_to_le32(len);
905 send_generic_request(monc, req);
906 mutex_unlock(&monc->mutex);
907
908 ret = wait_generic_request(req);
909out:
910 put_generic_request(req);
911 return ret;
912}
913EXPORT_SYMBOL(ceph_monc_blacklist_add);
914
838/* 915/*
839 * Resend pending generic requests. 916 * Resend pending generic requests.
840 */ 917 */
@@ -1139,6 +1216,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1139 handle_get_version_reply(monc, msg); 1216 handle_get_version_reply(monc, msg);
1140 break; 1217 break;
1141 1218
1219 case CEPH_MSG_MON_COMMAND_ACK:
1220 handle_command_ack(monc, msg);
1221 break;
1222
1142 case CEPH_MSG_MON_MAP: 1223 case CEPH_MSG_MON_MAP:
1143 ceph_monc_handle_map(monc, msg); 1224 ceph_monc_handle_map(monc, msg);
1144 break; 1225 break;
@@ -1178,6 +1259,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1178 m = ceph_msg_get(monc->m_subscribe_ack); 1259 m = ceph_msg_get(monc->m_subscribe_ack);
1179 break; 1260 break;
1180 case CEPH_MSG_STATFS_REPLY: 1261 case CEPH_MSG_STATFS_REPLY:
1262 case CEPH_MSG_MON_COMMAND_ACK:
1181 return get_generic_reply(con, hdr, skip); 1263 return get_generic_reply(con, hdr, skip);
1182 case CEPH_MSG_AUTH_REPLY: 1264 case CEPH_MSG_AUTH_REPLY:
1183 m = ceph_msg_get(monc->m_auth_reply); 1265 m = ceph_msg_get(monc->m_auth_reply);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a97e7b506612..d9bf7a1d0a58 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
338 ceph_osd_data_release(&op->notify.request_data); 338 ceph_osd_data_release(&op->notify.request_data);
339 ceph_osd_data_release(&op->notify.response_data); 339 ceph_osd_data_release(&op->notify.response_data);
340 break; 340 break;
341 case CEPH_OSD_OP_LIST_WATCHERS:
342 ceph_osd_data_release(&op->list_watchers.response_data);
343 break;
341 default: 344 default:
342 break; 345 break;
343 } 346 }
@@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
863 case CEPH_OSD_OP_NOTIFY: 866 case CEPH_OSD_OP_NOTIFY:
864 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 867 dst->notify.cookie = cpu_to_le64(src->notify.cookie);
865 break; 868 break;
869 case CEPH_OSD_OP_LIST_WATCHERS:
870 break;
866 case CEPH_OSD_OP_SETALLOCHINT: 871 case CEPH_OSD_OP_SETALLOCHINT:
867 dst->alloc_hint.expected_object_size = 872 dst->alloc_hint.expected_object_size =
868 cpu_to_le64(src->alloc_hint.expected_object_size); 873 cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
1445 ceph_osdc_msg_data_add(req->r_reply, 1450 ceph_osdc_msg_data_add(req->r_reply,
1446 &op->extent.osd_data); 1451 &op->extent.osd_data);
1447 break; 1452 break;
1453 case CEPH_OSD_OP_LIST_WATCHERS:
1454 ceph_osdc_msg_data_add(req->r_reply,
1455 &op->list_watchers.response_data);
1456 break;
1448 1457
1449 /* both */ 1458 /* both */
1450 case CEPH_OSD_OP_CALL: 1459 case CEPH_OSD_OP_CALL:
@@ -3891,12 +3900,121 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
3891 return ret; 3900 return ret;
3892} 3901}
3893 3902
3903static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
3904{
3905 u8 struct_v;
3906 u32 struct_len;
3907 int ret;
3908
3909 ret = ceph_start_decoding(p, end, 2, "watch_item_t",
3910 &struct_v, &struct_len);
3911 if (ret)
3912 return ret;
3913
3914 ceph_decode_copy(p, &item->name, sizeof(item->name));
3915 item->cookie = ceph_decode_64(p);
3916 *p += 4; /* skip timeout_seconds */
3917 if (struct_v >= 2) {
3918 ceph_decode_copy(p, &item->addr, sizeof(item->addr));
3919 ceph_decode_addr(&item->addr);
3920 }
3921
3922 dout("%s %s%llu cookie %llu addr %s\n", __func__,
3923 ENTITY_NAME(item->name), item->cookie,
3924 ceph_pr_addr(&item->addr.in_addr));
3925 return 0;
3926}
3927
3928static int decode_watchers(void **p, void *end,
3929 struct ceph_watch_item **watchers,
3930 u32 *num_watchers)
3931{
3932 u8 struct_v;
3933 u32 struct_len;
3934 int i;
3935 int ret;
3936
3937 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
3938 &struct_v, &struct_len);
3939 if (ret)
3940 return ret;
3941
3942 *num_watchers = ceph_decode_32(p);
3943 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
3944 if (!*watchers)
3945 return -ENOMEM;
3946
3947 for (i = 0; i < *num_watchers; i++) {
3948 ret = decode_watcher(p, end, *watchers + i);
3949 if (ret) {
3950 kfree(*watchers);
3951 return ret;
3952 }
3953 }
3954
3955 return 0;
3956}
3957
3958/*
3959 * On success, the caller is responsible for:
3960 *
3961 * kfree(watchers);
3962 */
3963int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
3964 struct ceph_object_id *oid,
3965 struct ceph_object_locator *oloc,
3966 struct ceph_watch_item **watchers,
3967 u32 *num_watchers)
3968{
3969 struct ceph_osd_request *req;
3970 struct page **pages;
3971 int ret;
3972
3973 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3974 if (!req)
3975 return -ENOMEM;
3976
3977 ceph_oid_copy(&req->r_base_oid, oid);
3978 ceph_oloc_copy(&req->r_base_oloc, oloc);
3979 req->r_flags = CEPH_OSD_FLAG_READ;
3980
3981 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3982 if (ret)
3983 goto out_put_req;
3984
3985 pages = ceph_alloc_page_vector(1, GFP_NOIO);
3986 if (IS_ERR(pages)) {
3987 ret = PTR_ERR(pages);
3988 goto out_put_req;
3989 }
3990
3991 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
3992 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
3993 response_data),
3994 pages, PAGE_SIZE, 0, false, true);
3995
3996 ceph_osdc_start_request(osdc, req, false);
3997 ret = ceph_osdc_wait_request(osdc, req);
3998 if (ret >= 0) {
3999 void *p = page_address(pages[0]);
4000 void *const end = p + req->r_ops[0].outdata_len;
4001
4002 ret = decode_watchers(&p, end, watchers, num_watchers);
4003 }
4004
4005out_put_req:
4006 ceph_osdc_put_request(req);
4007 return ret;
4008}
4009EXPORT_SYMBOL(ceph_osdc_list_watchers);
4010
3894/* 4011/*
3895 * Call all pending notify callbacks - for use after a watch is 4012 * Call all pending notify callbacks - for use after a watch is
3896 * unregistered, to make sure no more callbacks for it will be invoked 4013 * unregistered, to make sure no more callbacks for it will be invoked
3897 */ 4014 */
3898void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 4015void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
3899{ 4016{
4017 dout("%s osdc %p\n", __func__, osdc);
3900 flush_workqueue(osdc->notify_wq); 4018 flush_workqueue(osdc->notify_wq);
3901} 4019}
3902EXPORT_SYMBOL(ceph_osdc_flush_notifies); 4020EXPORT_SYMBOL(ceph_osdc_flush_notifies);
@@ -3910,6 +4028,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
3910EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 4028EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
3911 4029
3912/* 4030/*
4031 * Execute an OSD class method on an object.
4032 *
4033 * @flags: CEPH_OSD_FLAG_*
4034 * @resp_len: out param for reply length
4035 */
4036int ceph_osdc_call(struct ceph_osd_client *osdc,
4037 struct ceph_object_id *oid,
4038 struct ceph_object_locator *oloc,
4039 const char *class, const char *method,
4040 unsigned int flags,
4041 struct page *req_page, size_t req_len,
4042 struct page *resp_page, size_t *resp_len)
4043{
4044 struct ceph_osd_request *req;
4045 int ret;
4046
4047 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4048 if (!req)
4049 return -ENOMEM;
4050
4051 ceph_oid_copy(&req->r_base_oid, oid);
4052 ceph_oloc_copy(&req->r_base_oloc, oloc);
4053 req->r_flags = flags;
4054
4055 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4056 if (ret)
4057 goto out_put_req;
4058
4059 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4060 if (req_page)
4061 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4062 0, false, false);
4063 if (resp_page)
4064 osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4065 PAGE_SIZE, 0, false, false);
4066
4067 ceph_osdc_start_request(osdc, req, false);
4068 ret = ceph_osdc_wait_request(osdc, req);
4069 if (ret >= 0) {
4070 ret = req->r_ops[0].rval;
4071 if (resp_page)
4072 *resp_len = req->r_ops[0].outdata_len;
4073 }
4074
4075out_put_req:
4076 ceph_osdc_put_request(req);
4077 return ret;
4078}
4079EXPORT_SYMBOL(ceph_osdc_call);
4080
4081/*
3913 * init, shutdown 4082 * init, shutdown
3914 */ 4083 */
3915int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 4084int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)