diff options
author | Sage Weil <sage@newdream.net> | 2009-10-06 14:31:11 -0400 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2009-10-06 14:31:11 -0400 |
commit | ba75bb98cfb93b62c54af25bf67ff90857264bbe (patch) | |
tree | 4a966828835dd4639d25442b932072914e77fc9c | |
parent | 5ecc0a0f8128b1876e8614638deaed49cc8b174c (diff) |
ceph: monitor client
The monitor cluster is responsible for managing cluster membership
and state. The monitor client handles what minimal interaction
the Ceph client has with it: checking for updated versions of the
MDS and OSD maps, getting statfs() information, and unmounting.
Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r-- | fs/ceph/mon_client.c | 694 | ||||
-rw-r--r-- | fs/ceph/mon_client.h | 109 |
2 files changed, 803 insertions, 0 deletions
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c new file mode 100644 index 000000000000..b0c95cec5df8 --- /dev/null +++ b/fs/ceph/mon_client.c | |||
@@ -0,0 +1,694 @@ | |||
1 | #include "ceph_debug.h" | ||
2 | |||
3 | #include <linux/types.h> | ||
4 | #include <linux/random.h> | ||
5 | #include <linux/sched.h> | ||
6 | |||
7 | #include "mon_client.h" | ||
8 | #include "super.h" | ||
9 | #include "decode.h" | ||
10 | |||
11 | /* | ||
12 | * Interact with Ceph monitor cluster. Handle requests for new map | ||
13 | * versions, and periodically resend as needed. Also implement | ||
14 | * statfs() and umount(). | ||
15 | * | ||
16 | * A small cluster of Ceph "monitors" are responsible for managing critical | ||
17 | * cluster configuration and state information. An odd number (e.g., 3, 5) | ||
18 | * of cmon daemons use a modified version of the Paxos part-time parliament | ||
19 | * algorithm to manage the MDS map (mds cluster membership), OSD map, and | ||
20 | * list of clients who have mounted the file system. | ||
21 | * | ||
22 | * We maintain an open, active session with a monitor at all times in order to | ||
23 | * receive timely MDSMap updates. We periodically send a keepalive byte on the | ||
24 | * TCP socket to ensure we detect a failure. If the connection does break, we | ||
25 | * randomly hunt for a new monitor. Once the connection is reestablished, we | ||
26 | * resend any outstanding requests. | ||
27 | */ | ||
28 | |||
29 | const static struct ceph_connection_operations mon_con_ops; | ||
30 | |||
31 | /* | ||
32 | * Decode a monmap blob (e.g., during mount). | ||
33 | */ | ||
34 | struct ceph_monmap *ceph_monmap_decode(void *p, void *end) | ||
35 | { | ||
36 | struct ceph_monmap *m = NULL; | ||
37 | int i, err = -EINVAL; | ||
38 | struct ceph_fsid fsid; | ||
39 | u32 epoch, num_mon; | ||
40 | u16 version; | ||
41 | |||
42 | dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); | ||
43 | |||
44 | ceph_decode_16_safe(&p, end, version, bad); | ||
45 | |||
46 | ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); | ||
47 | ceph_decode_copy(&p, &fsid, sizeof(fsid)); | ||
48 | ceph_decode_32(&p, epoch); | ||
49 | |||
50 | ceph_decode_32(&p, num_mon); | ||
51 | ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); | ||
52 | |||
53 | if (num_mon >= CEPH_MAX_MON) | ||
54 | goto bad; | ||
55 | m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); | ||
56 | if (m == NULL) | ||
57 | return ERR_PTR(-ENOMEM); | ||
58 | m->fsid = fsid; | ||
59 | m->epoch = epoch; | ||
60 | m->num_mon = num_mon; | ||
61 | ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); | ||
62 | |||
63 | if (p != end) | ||
64 | goto bad; | ||
65 | |||
66 | dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, | ||
67 | m->num_mon); | ||
68 | for (i = 0; i < m->num_mon; i++) | ||
69 | dout("monmap_decode mon%d is %s\n", i, | ||
70 | pr_addr(&m->mon_inst[i].addr.in_addr)); | ||
71 | return m; | ||
72 | |||
73 | bad: | ||
74 | dout("monmap_decode failed with %d\n", err); | ||
75 | kfree(m); | ||
76 | return ERR_PTR(err); | ||
77 | } | ||
78 | |||
79 | /* | ||
80 | * return true if *addr is included in the monmap. | ||
81 | */ | ||
82 | int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) | ||
83 | { | ||
84 | int i; | ||
85 | |||
86 | for (i = 0; i < m->num_mon; i++) | ||
87 | if (ceph_entity_addr_equal(addr, &m->mon_inst[i].addr)) | ||
88 | return 1; | ||
89 | return 0; | ||
90 | } | ||
91 | |||
92 | /* | ||
93 | * Close monitor session, if any. | ||
94 | */ | ||
95 | static void __close_session(struct ceph_mon_client *monc) | ||
96 | { | ||
97 | if (monc->con) { | ||
98 | dout("__close_session closing mon%d\n", monc->cur_mon); | ||
99 | ceph_con_close(monc->con); | ||
100 | monc->cur_mon = -1; | ||
101 | } | ||
102 | } | ||
103 | |||
104 | /* | ||
105 | * Open a session with a (new) monitor. | ||
106 | */ | ||
107 | static int __open_session(struct ceph_mon_client *monc) | ||
108 | { | ||
109 | char r; | ||
110 | |||
111 | if (monc->cur_mon < 0) { | ||
112 | get_random_bytes(&r, 1); | ||
113 | monc->cur_mon = r % monc->monmap->num_mon; | ||
114 | dout("open_session num=%d r=%d -> mon%d\n", | ||
115 | monc->monmap->num_mon, r, monc->cur_mon); | ||
116 | monc->sub_sent = 0; | ||
117 | monc->sub_renew_after = jiffies; /* i.e., expired */ | ||
118 | monc->want_next_osdmap = !!monc->want_next_osdmap; | ||
119 | |||
120 | dout("open_session mon%d opening\n", monc->cur_mon); | ||
121 | monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; | ||
122 | monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); | ||
123 | ceph_con_open(monc->con, | ||
124 | &monc->monmap->mon_inst[monc->cur_mon].addr); | ||
125 | } else { | ||
126 | dout("open_session mon%d already open\n", monc->cur_mon); | ||
127 | } | ||
128 | return 0; | ||
129 | } | ||
130 | |||
131 | static bool __sub_expired(struct ceph_mon_client *monc) | ||
132 | { | ||
133 | return time_after_eq(jiffies, monc->sub_renew_after); | ||
134 | } | ||
135 | |||
136 | /* | ||
137 | * Reschedule delayed work timer. | ||
138 | */ | ||
139 | static void __schedule_delayed(struct ceph_mon_client *monc) | ||
140 | { | ||
141 | unsigned delay; | ||
142 | |||
143 | if (monc->cur_mon < 0 || monc->want_mount || __sub_expired(monc)) | ||
144 | delay = 10 * HZ; | ||
145 | else | ||
146 | delay = 20 * HZ; | ||
147 | dout("__schedule_delayed after %u\n", delay); | ||
148 | schedule_delayed_work(&monc->delayed_work, delay); | ||
149 | } | ||
150 | |||
151 | /* | ||
152 | * Send subscribe request for mdsmap and/or osdmap. | ||
153 | */ | ||
154 | static void __send_subscribe(struct ceph_mon_client *monc) | ||
155 | { | ||
156 | dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", | ||
157 | (unsigned)monc->sub_sent, __sub_expired(monc), | ||
158 | monc->want_next_osdmap); | ||
159 | if ((__sub_expired(monc) && !monc->sub_sent) || | ||
160 | monc->want_next_osdmap == 1) { | ||
161 | struct ceph_msg *msg; | ||
162 | struct ceph_mon_subscribe_item *i; | ||
163 | void *p, *end; | ||
164 | |||
165 | msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 64, 0, 0, NULL); | ||
166 | if (!msg) | ||
167 | return; | ||
168 | |||
169 | p = msg->front.iov_base; | ||
170 | end = p + msg->front.iov_len; | ||
171 | |||
172 | dout("__send_subscribe to 'mdsmap' %u+\n", | ||
173 | (unsigned)monc->have_mdsmap); | ||
174 | if (monc->want_next_osdmap) { | ||
175 | dout("__send_subscribe to 'osdmap' %u\n", | ||
176 | (unsigned)monc->have_osdmap); | ||
177 | ceph_encode_32(&p, 2); | ||
178 | ceph_encode_string(&p, end, "osdmap", 6); | ||
179 | i = p; | ||
180 | i->have = cpu_to_le64(monc->have_osdmap); | ||
181 | i->onetime = 1; | ||
182 | p += sizeof(*i); | ||
183 | monc->want_next_osdmap = 2; /* requested */ | ||
184 | } else { | ||
185 | ceph_encode_32(&p, 1); | ||
186 | } | ||
187 | ceph_encode_string(&p, end, "mdsmap", 6); | ||
188 | i = p; | ||
189 | i->have = cpu_to_le64(monc->have_mdsmap); | ||
190 | i->onetime = 0; | ||
191 | p += sizeof(*i); | ||
192 | |||
193 | msg->front.iov_len = p - msg->front.iov_base; | ||
194 | msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); | ||
195 | ceph_con_send(monc->con, msg); | ||
196 | |||
197 | monc->sub_sent = jiffies | 1; /* never 0 */ | ||
198 | } | ||
199 | } | ||
200 | |||
201 | static void handle_subscribe_ack(struct ceph_mon_client *monc, | ||
202 | struct ceph_msg *msg) | ||
203 | { | ||
204 | unsigned seconds; | ||
205 | void *p = msg->front.iov_base; | ||
206 | void *end = p + msg->front.iov_len; | ||
207 | |||
208 | ceph_decode_32_safe(&p, end, seconds, bad); | ||
209 | mutex_lock(&monc->mutex); | ||
210 | if (monc->hunting) { | ||
211 | pr_info("mon%d %s session established\n", | ||
212 | monc->cur_mon, pr_addr(&monc->con->peer_addr.in_addr)); | ||
213 | monc->hunting = false; | ||
214 | } | ||
215 | dout("handle_subscribe_ack after %d seconds\n", seconds); | ||
216 | monc->sub_renew_after = monc->sub_sent + seconds*HZ - 1; | ||
217 | monc->sub_sent = 0; | ||
218 | mutex_unlock(&monc->mutex); | ||
219 | return; | ||
220 | bad: | ||
221 | pr_err("got corrupt subscribe-ack msg\n"); | ||
222 | } | ||
223 | |||
224 | /* | ||
225 | * Keep track of which maps we have | ||
226 | */ | ||
227 | int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) | ||
228 | { | ||
229 | mutex_lock(&monc->mutex); | ||
230 | monc->have_mdsmap = got; | ||
231 | mutex_unlock(&monc->mutex); | ||
232 | return 0; | ||
233 | } | ||
234 | |||
235 | int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) | ||
236 | { | ||
237 | mutex_lock(&monc->mutex); | ||
238 | monc->have_osdmap = got; | ||
239 | monc->want_next_osdmap = 0; | ||
240 | mutex_unlock(&monc->mutex); | ||
241 | return 0; | ||
242 | } | ||
243 | |||
244 | /* | ||
245 | * Register interest in the next osdmap | ||
246 | */ | ||
247 | void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) | ||
248 | { | ||
249 | dout("request_next_osdmap have %u\n", monc->have_osdmap); | ||
250 | mutex_lock(&monc->mutex); | ||
251 | if (!monc->want_next_osdmap) | ||
252 | monc->want_next_osdmap = 1; | ||
253 | if (monc->want_next_osdmap < 2) | ||
254 | __send_subscribe(monc); | ||
255 | mutex_unlock(&monc->mutex); | ||
256 | } | ||
257 | |||
258 | |||
259 | /* | ||
260 | * mount | ||
261 | */ | ||
262 | static void __request_mount(struct ceph_mon_client *monc) | ||
263 | { | ||
264 | struct ceph_msg *msg; | ||
265 | struct ceph_client_mount *h; | ||
266 | int err; | ||
267 | |||
268 | dout("__request_mount\n"); | ||
269 | err = __open_session(monc); | ||
270 | if (err) | ||
271 | return; | ||
272 | msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL); | ||
273 | if (IS_ERR(msg)) | ||
274 | return; | ||
275 | h = msg->front.iov_base; | ||
276 | h->have_version = 0; | ||
277 | ceph_con_send(monc->con, msg); | ||
278 | } | ||
279 | |||
280 | int ceph_monc_request_mount(struct ceph_mon_client *monc) | ||
281 | { | ||
282 | if (!monc->con) { | ||
283 | monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); | ||
284 | if (!monc->con) | ||
285 | return -ENOMEM; | ||
286 | ceph_con_init(monc->client->msgr, monc->con); | ||
287 | monc->con->private = monc; | ||
288 | monc->con->ops = &mon_con_ops; | ||
289 | } | ||
290 | |||
291 | mutex_lock(&monc->mutex); | ||
292 | __request_mount(monc); | ||
293 | __schedule_delayed(monc); | ||
294 | mutex_unlock(&monc->mutex); | ||
295 | return 0; | ||
296 | } | ||
297 | |||
298 | /* | ||
299 | * The monitor responds with mount ack indicate mount success. The | ||
300 | * included client ticket allows the client to talk to MDSs and OSDs. | ||
301 | */ | ||
302 | static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg) | ||
303 | { | ||
304 | struct ceph_client *client = monc->client; | ||
305 | struct ceph_monmap *monmap = NULL, *old = monc->monmap; | ||
306 | void *p, *end; | ||
307 | s32 result; | ||
308 | u32 len; | ||
309 | s64 cnum; | ||
310 | int err = -EINVAL; | ||
311 | |||
312 | if (client->whoami >= 0) { | ||
313 | dout("handle_mount_ack - already mounted\n"); | ||
314 | return; | ||
315 | } | ||
316 | |||
317 | mutex_lock(&monc->mutex); | ||
318 | |||
319 | dout("handle_mount_ack\n"); | ||
320 | p = msg->front.iov_base; | ||
321 | end = p + msg->front.iov_len; | ||
322 | |||
323 | ceph_decode_64_safe(&p, end, cnum, bad); | ||
324 | ceph_decode_32_safe(&p, end, result, bad); | ||
325 | ceph_decode_32_safe(&p, end, len, bad); | ||
326 | if (result) { | ||
327 | pr_err("mount denied: %.*s (%d)\n", len, (char *)p, | ||
328 | result); | ||
329 | err = result; | ||
330 | goto out; | ||
331 | } | ||
332 | p += len; | ||
333 | |||
334 | ceph_decode_32_safe(&p, end, len, bad); | ||
335 | ceph_decode_need(&p, end, len, bad); | ||
336 | monmap = ceph_monmap_decode(p, p + len); | ||
337 | if (IS_ERR(monmap)) { | ||
338 | pr_err("problem decoding monmap, %d\n", | ||
339 | (int)PTR_ERR(monmap)); | ||
340 | err = -EINVAL; | ||
341 | goto out; | ||
342 | } | ||
343 | p += len; | ||
344 | |||
345 | client->monc.monmap = monmap; | ||
346 | kfree(old); | ||
347 | |||
348 | client->signed_ticket = NULL; | ||
349 | client->signed_ticket_len = 0; | ||
350 | |||
351 | monc->want_mount = false; | ||
352 | |||
353 | client->whoami = cnum; | ||
354 | client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; | ||
355 | client->msgr->inst.name.num = cpu_to_le64(cnum); | ||
356 | pr_info("client%lld fsid " FSID_FORMAT "\n", | ||
357 | client->whoami, PR_FSID(&client->monc.monmap->fsid)); | ||
358 | |||
359 | ceph_debugfs_client_init(client); | ||
360 | __send_subscribe(monc); | ||
361 | |||
362 | err = 0; | ||
363 | goto out; | ||
364 | |||
365 | bad: | ||
366 | pr_err("error decoding mount_ack message\n"); | ||
367 | out: | ||
368 | client->mount_err = err; | ||
369 | mutex_unlock(&monc->mutex); | ||
370 | wake_up(&client->mount_wq); | ||
371 | } | ||
372 | |||
373 | |||
374 | |||
375 | |||
376 | /* | ||
377 | * statfs | ||
378 | */ | ||
379 | static void handle_statfs_reply(struct ceph_mon_client *monc, | ||
380 | struct ceph_msg *msg) | ||
381 | { | ||
382 | struct ceph_mon_statfs_request *req; | ||
383 | struct ceph_mon_statfs_reply *reply = msg->front.iov_base; | ||
384 | u64 tid; | ||
385 | |||
386 | if (msg->front.iov_len != sizeof(*reply)) | ||
387 | goto bad; | ||
388 | tid = le64_to_cpu(reply->tid); | ||
389 | dout("handle_statfs_reply %p tid %llu\n", msg, tid); | ||
390 | |||
391 | mutex_lock(&monc->mutex); | ||
392 | req = radix_tree_lookup(&monc->statfs_request_tree, tid); | ||
393 | if (req) { | ||
394 | *req->buf = reply->st; | ||
395 | req->result = 0; | ||
396 | } | ||
397 | mutex_unlock(&monc->mutex); | ||
398 | if (req) | ||
399 | complete(&req->completion); | ||
400 | return; | ||
401 | |||
402 | bad: | ||
403 | pr_err("corrupt statfs reply, no tid\n"); | ||
404 | } | ||
405 | |||
406 | /* | ||
407 | * (re)send a statfs request | ||
408 | */ | ||
409 | static int send_statfs(struct ceph_mon_client *monc, | ||
410 | struct ceph_mon_statfs_request *req) | ||
411 | { | ||
412 | struct ceph_msg *msg; | ||
413 | struct ceph_mon_statfs *h; | ||
414 | int err; | ||
415 | |||
416 | dout("send_statfs tid %llu\n", req->tid); | ||
417 | err = __open_session(monc); | ||
418 | if (err) | ||
419 | return err; | ||
420 | msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); | ||
421 | if (IS_ERR(msg)) | ||
422 | return PTR_ERR(msg); | ||
423 | req->request = msg; | ||
424 | h = msg->front.iov_base; | ||
425 | h->have_version = 0; | ||
426 | h->fsid = monc->monmap->fsid; | ||
427 | h->tid = cpu_to_le64(req->tid); | ||
428 | ceph_con_send(monc->con, msg); | ||
429 | return 0; | ||
430 | } | ||
431 | |||
432 | /* | ||
433 | * Do a synchronous statfs(). | ||
434 | */ | ||
435 | int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) | ||
436 | { | ||
437 | struct ceph_mon_statfs_request req; | ||
438 | int err; | ||
439 | |||
440 | req.buf = buf; | ||
441 | init_completion(&req.completion); | ||
442 | |||
443 | /* allocate memory for reply */ | ||
444 | err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1); | ||
445 | if (err) | ||
446 | return err; | ||
447 | |||
448 | /* register request */ | ||
449 | mutex_lock(&monc->mutex); | ||
450 | req.tid = ++monc->last_tid; | ||
451 | req.last_attempt = jiffies; | ||
452 | req.delay = BASE_DELAY_INTERVAL; | ||
453 | if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) { | ||
454 | mutex_unlock(&monc->mutex); | ||
455 | pr_err("ENOMEM in do_statfs\n"); | ||
456 | return -ENOMEM; | ||
457 | } | ||
458 | monc->num_statfs_requests++; | ||
459 | mutex_unlock(&monc->mutex); | ||
460 | |||
461 | /* send request and wait */ | ||
462 | err = send_statfs(monc, &req); | ||
463 | if (!err) | ||
464 | err = wait_for_completion_interruptible(&req.completion); | ||
465 | |||
466 | mutex_lock(&monc->mutex); | ||
467 | radix_tree_delete(&monc->statfs_request_tree, req.tid); | ||
468 | monc->num_statfs_requests--; | ||
469 | ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1); | ||
470 | mutex_unlock(&monc->mutex); | ||
471 | |||
472 | if (!err) | ||
473 | err = req.result; | ||
474 | return err; | ||
475 | } | ||
476 | |||
477 | /* | ||
478 | * Resend pending statfs requests. | ||
479 | */ | ||
480 | static void __resend_statfs(struct ceph_mon_client *monc) | ||
481 | { | ||
482 | u64 next_tid = 0; | ||
483 | int got; | ||
484 | int did = 0; | ||
485 | struct ceph_mon_statfs_request *req; | ||
486 | |||
487 | while (1) { | ||
488 | got = radix_tree_gang_lookup(&monc->statfs_request_tree, | ||
489 | (void **)&req, | ||
490 | next_tid, 1); | ||
491 | if (got == 0) | ||
492 | break; | ||
493 | did++; | ||
494 | next_tid = req->tid + 1; | ||
495 | |||
496 | send_statfs(monc, req); | ||
497 | } | ||
498 | } | ||
499 | |||
500 | /* | ||
501 | * Delayed work. If we haven't mounted yet, retry. Otherwise, | ||
502 | * renew/retry subscription as needed (in case it is timing out, or we | ||
503 | * got an ENOMEM). And keep the monitor connection alive. | ||
504 | */ | ||
505 | static void delayed_work(struct work_struct *work) | ||
506 | { | ||
507 | struct ceph_mon_client *monc = | ||
508 | container_of(work, struct ceph_mon_client, delayed_work.work); | ||
509 | |||
510 | dout("monc delayed_work\n"); | ||
511 | mutex_lock(&monc->mutex); | ||
512 | if (monc->want_mount) { | ||
513 | __request_mount(monc); | ||
514 | } else { | ||
515 | if (__sub_expired(monc)) { | ||
516 | __close_session(monc); | ||
517 | __open_session(monc); /* continue hunting */ | ||
518 | } else { | ||
519 | ceph_con_keepalive(monc->con); | ||
520 | } | ||
521 | } | ||
522 | __send_subscribe(monc); | ||
523 | __schedule_delayed(monc); | ||
524 | mutex_unlock(&monc->mutex); | ||
525 | } | ||
526 | |||
527 | int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) | ||
528 | { | ||
529 | int err = 0; | ||
530 | |||
531 | dout("init\n"); | ||
532 | memset(monc, 0, sizeof(*monc)); | ||
533 | monc->client = cl; | ||
534 | monc->monmap = NULL; | ||
535 | mutex_init(&monc->mutex); | ||
536 | |||
537 | monc->con = NULL; | ||
538 | |||
539 | /* msg pools */ | ||
540 | err = ceph_msgpool_init(&monc->msgpool_mount_ack, 4096, 1, false); | ||
541 | if (err < 0) | ||
542 | goto out; | ||
543 | err = ceph_msgpool_init(&monc->msgpool_subscribe_ack, 8, 1, false); | ||
544 | if (err < 0) | ||
545 | goto out; | ||
546 | err = ceph_msgpool_init(&monc->msgpool_statfs_reply, | ||
547 | sizeof(struct ceph_mon_statfs_reply), 0, false); | ||
548 | if (err < 0) | ||
549 | goto out; | ||
550 | |||
551 | monc->cur_mon = -1; | ||
552 | monc->hunting = false; /* not really */ | ||
553 | monc->sub_renew_after = jiffies; | ||
554 | monc->sub_sent = 0; | ||
555 | |||
556 | INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); | ||
557 | INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS); | ||
558 | monc->num_statfs_requests = 0; | ||
559 | monc->last_tid = 0; | ||
560 | |||
561 | monc->have_mdsmap = 0; | ||
562 | monc->have_osdmap = 0; | ||
563 | monc->want_next_osdmap = 1; | ||
564 | monc->want_mount = true; | ||
565 | out: | ||
566 | return err; | ||
567 | } | ||
568 | |||
569 | void ceph_monc_stop(struct ceph_mon_client *monc) | ||
570 | { | ||
571 | dout("stop\n"); | ||
572 | cancel_delayed_work_sync(&monc->delayed_work); | ||
573 | |||
574 | mutex_lock(&monc->mutex); | ||
575 | __close_session(monc); | ||
576 | if (monc->con) { | ||
577 | monc->con->private = NULL; | ||
578 | monc->con->ops->put(monc->con); | ||
579 | monc->con = NULL; | ||
580 | } | ||
581 | mutex_unlock(&monc->mutex); | ||
582 | |||
583 | ceph_msgpool_destroy(&monc->msgpool_mount_ack); | ||
584 | ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); | ||
585 | ceph_msgpool_destroy(&monc->msgpool_statfs_reply); | ||
586 | |||
587 | kfree(monc->monmap); | ||
588 | } | ||
589 | |||
590 | |||
591 | /* | ||
592 | * handle incoming message | ||
593 | */ | ||
594 | static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) | ||
595 | { | ||
596 | struct ceph_mon_client *monc = con->private; | ||
597 | int type = le16_to_cpu(msg->hdr.type); | ||
598 | |||
599 | if (!monc) | ||
600 | return; | ||
601 | |||
602 | switch (type) { | ||
603 | case CEPH_MSG_CLIENT_MOUNT_ACK: | ||
604 | handle_mount_ack(monc, msg); | ||
605 | break; | ||
606 | |||
607 | case CEPH_MSG_MON_SUBSCRIBE_ACK: | ||
608 | handle_subscribe_ack(monc, msg); | ||
609 | break; | ||
610 | |||
611 | case CEPH_MSG_STATFS_REPLY: | ||
612 | handle_statfs_reply(monc, msg); | ||
613 | break; | ||
614 | |||
615 | case CEPH_MSG_MDS_MAP: | ||
616 | ceph_mdsc_handle_map(&monc->client->mdsc, msg); | ||
617 | break; | ||
618 | |||
619 | case CEPH_MSG_OSD_MAP: | ||
620 | ceph_osdc_handle_map(&monc->client->osdc, msg); | ||
621 | break; | ||
622 | |||
623 | default: | ||
624 | pr_err("received unknown message type %d %s\n", type, | ||
625 | ceph_msg_type_name(type)); | ||
626 | } | ||
627 | ceph_msg_put(msg); | ||
628 | } | ||
629 | |||
630 | /* | ||
631 | * Allocate memory for incoming message | ||
632 | */ | ||
633 | static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, | ||
634 | struct ceph_msg_header *hdr) | ||
635 | { | ||
636 | struct ceph_mon_client *monc = con->private; | ||
637 | int type = le16_to_cpu(hdr->type); | ||
638 | |||
639 | switch (type) { | ||
640 | case CEPH_MSG_CLIENT_MOUNT_ACK: | ||
641 | return ceph_msgpool_get(&monc->msgpool_mount_ack); | ||
642 | case CEPH_MSG_MON_SUBSCRIBE_ACK: | ||
643 | return ceph_msgpool_get(&monc->msgpool_subscribe_ack); | ||
644 | case CEPH_MSG_STATFS_REPLY: | ||
645 | return ceph_msgpool_get(&monc->msgpool_statfs_reply); | ||
646 | } | ||
647 | return ceph_alloc_msg(con, hdr); | ||
648 | } | ||
649 | |||
650 | /* | ||
651 | * If the monitor connection resets, pick a new monitor and resubmit | ||
652 | * any pending requests. | ||
653 | */ | ||
654 | static void mon_fault(struct ceph_connection *con) | ||
655 | { | ||
656 | struct ceph_mon_client *monc = con->private; | ||
657 | |||
658 | if (!monc) | ||
659 | return; | ||
660 | |||
661 | dout("mon_fault\n"); | ||
662 | mutex_lock(&monc->mutex); | ||
663 | if (!con->private) | ||
664 | goto out; | ||
665 | |||
666 | if (monc->con && !monc->hunting) | ||
667 | pr_info("mon%d %s session lost, " | ||
668 | "hunting for new mon\n", monc->cur_mon, | ||
669 | pr_addr(&monc->con->peer_addr.in_addr)); | ||
670 | |||
671 | __close_session(monc); | ||
672 | if (!monc->hunting) { | ||
673 | /* start hunting */ | ||
674 | monc->hunting = true; | ||
675 | if (__open_session(monc) == 0) { | ||
676 | __send_subscribe(monc); | ||
677 | __resend_statfs(monc); | ||
678 | } | ||
679 | } else { | ||
680 | /* already hunting, let's wait a bit */ | ||
681 | __schedule_delayed(monc); | ||
682 | } | ||
683 | out: | ||
684 | mutex_unlock(&monc->mutex); | ||
685 | } | ||
686 | |||
687 | const static struct ceph_connection_operations mon_con_ops = { | ||
688 | .get = ceph_con_get, | ||
689 | .put = ceph_con_put, | ||
690 | .dispatch = dispatch, | ||
691 | .fault = mon_fault, | ||
692 | .alloc_msg = mon_alloc_msg, | ||
693 | .alloc_middle = ceph_alloc_middle, | ||
694 | }; | ||
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h new file mode 100644 index 000000000000..5258c5693b03 --- /dev/null +++ b/fs/ceph/mon_client.h | |||
@@ -0,0 +1,109 @@ | |||
1 | #ifndef _FS_CEPH_MON_CLIENT_H | ||
2 | #define _FS_CEPH_MON_CLIENT_H | ||
3 | |||
4 | #include <linux/completion.h> | ||
5 | #include <linux/radix-tree.h> | ||
6 | |||
7 | #include "messenger.h" | ||
8 | #include "msgpool.h" | ||
9 | |||
10 | struct ceph_client; | ||
11 | struct ceph_mount_args; | ||
12 | |||
13 | /* | ||
14 | * The monitor map enumerates the set of all monitors. | ||
15 | */ | ||
16 | struct ceph_monmap { | ||
17 | struct ceph_fsid fsid; | ||
18 | u32 epoch; | ||
19 | u32 num_mon; | ||
20 | struct ceph_entity_inst mon_inst[0]; | ||
21 | }; | ||
22 | |||
23 | struct ceph_mon_client; | ||
24 | struct ceph_mon_statfs_request; | ||
25 | |||
26 | |||
27 | /* | ||
28 | * Generic mechanism for resending monitor requests. | ||
29 | */ | ||
30 | typedef void (*ceph_monc_request_func_t)(struct ceph_mon_client *monc, | ||
31 | int newmon); | ||
32 | |||
33 | /* a pending monitor request */ | ||
34 | struct ceph_mon_request { | ||
35 | struct ceph_mon_client *monc; | ||
36 | struct delayed_work delayed_work; | ||
37 | unsigned long delay; | ||
38 | ceph_monc_request_func_t do_request; | ||
39 | }; | ||
40 | |||
41 | /* | ||
42 | * statfs() is done a bit differently because we need to get data back | ||
43 | * to the caller | ||
44 | */ | ||
45 | struct ceph_mon_statfs_request { | ||
46 | u64 tid; | ||
47 | int result; | ||
48 | struct ceph_statfs *buf; | ||
49 | struct completion completion; | ||
50 | unsigned long last_attempt, delay; /* jiffies */ | ||
51 | struct ceph_msg *request; /* original request */ | ||
52 | }; | ||
53 | |||
54 | struct ceph_mon_client { | ||
55 | struct ceph_client *client; | ||
56 | struct ceph_monmap *monmap; | ||
57 | |||
58 | struct mutex mutex; | ||
59 | struct delayed_work delayed_work; | ||
60 | |||
61 | bool hunting; | ||
62 | int cur_mon; /* last monitor i contacted */ | ||
63 | unsigned long sub_sent, sub_renew_after; | ||
64 | struct ceph_connection *con; | ||
65 | |||
66 | /* msg pools */ | ||
67 | struct ceph_msgpool msgpool_mount_ack; | ||
68 | struct ceph_msgpool msgpool_subscribe_ack; | ||
69 | struct ceph_msgpool msgpool_statfs_reply; | ||
70 | |||
71 | /* pending statfs requests */ | ||
72 | struct radix_tree_root statfs_request_tree; | ||
73 | int num_statfs_requests; | ||
74 | u64 last_tid; | ||
75 | |||
76 | /* mds/osd map or mount requests */ | ||
77 | bool want_mount; | ||
78 | int want_next_osdmap; /* 1 = want, 2 = want+asked */ | ||
79 | u32 have_osdmap, have_mdsmap; | ||
80 | |||
81 | struct dentry *debugfs_file; | ||
82 | }; | ||
83 | |||
84 | extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end); | ||
85 | extern int ceph_monmap_contains(struct ceph_monmap *m, | ||
86 | struct ceph_entity_addr *addr); | ||
87 | |||
88 | extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl); | ||
89 | extern void ceph_monc_stop(struct ceph_mon_client *monc); | ||
90 | |||
91 | /* | ||
92 | * The model here is to indicate that we need a new map of at least | ||
93 | * epoch @want, and also call in when we receive a map. We will | ||
94 | * periodically rerequest the map from the monitor cluster until we | ||
95 | * get what we want. | ||
96 | */ | ||
97 | extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); | ||
98 | extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); | ||
99 | |||
100 | extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); | ||
101 | |||
102 | extern int ceph_monc_request_mount(struct ceph_mon_client *monc); | ||
103 | |||
104 | extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, | ||
105 | struct ceph_statfs *buf); | ||
106 | |||
107 | |||
108 | |||
109 | #endif | ||