aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:09 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:09 -0400
commit2f2dc053404febedc9c273452d9d518fb31fde72 (patch)
tree286ff35153d0b52349e035a69f3f795fdcb0afb6 /fs/ceph/mds_client.c
parent1d3576fd10f0d7a104204267b81cf84a07028dad (diff)
ceph: MDS client
The MDS (metadata server) client is responsible for submitting requests to the MDS cluster and parsing the response. We decide which MDS to submit each request to based on cached information about the current partition of the directory hierarchy across the cluster. A stateful session is opened with each MDS before we submit requests to it, and a mutex is used to control the ordering of messages within each session. An MDS request may generate two responses. The first indicates the operation was a success and returns any result. A second reply is sent when the operation commits to disk. Note that locking on the MDS ensures that the results of updates are visible only to the updating client before the operation commits. Requests are linked to the containing directory so that an fsync will wait for them to commit. If an MDS fails and/or recovers, we resubmit requests as needed. We also reconnect existing capabilities to a recovering MDS to reestablish that shared session state. Old dentry leases are invalidated. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c2912
1 files changed, 2912 insertions, 0 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
new file mode 100644
index 000000000000..de8ba4a242ca
--- /dev/null
+++ b/fs/ceph/mds_client.c
@@ -0,0 +1,2912 @@
1#include "ceph_debug.h"
2
3#include <linux/wait.h>
4#include <linux/sched.h>
5
6#include "mds_client.h"
7#include "mon_client.h"
8#include "super.h"
9#include "messenger.h"
10#include "decode.h"
11
12/*
13 * A cluster of MDS (metadata server) daemons is responsible for
14 * managing the file system namespace (the directory hierarchy and
15 * inodes) and for coordinating shared access to storage. Metadata is
16 * partitioning hierarchically across a number of servers, and that
17 * partition varies over time as the cluster adjusts the distribution
18 * in order to balance load.
19 *
20 * The MDS client is primarily responsible to managing synchronous
21 * metadata requests for operations like open, unlink, and so forth.
22 * If there is a MDS failure, we find out about it when we (possibly
23 * request and) receive a new MDS map, and can resubmit affected
24 * requests.
25 *
26 * For the most part, though, we take advantage of a lossless
27 * communications channel to the MDS, and do not need to worry about
28 * timing out or resubmitting requests.
29 *
30 * We maintain a stateful "session" with each MDS we interact with.
31 * Within each session, we sent periodic heartbeat messages to ensure
32 * any capabilities or leases we have been issues remain valid. If
33 * the session times out and goes stale, our leases and capabilities
34 * are no longer valid.
35 */
36
37static void __wake_requests(struct ceph_mds_client *mdsc,
38 struct list_head *head);
39
40const static struct ceph_connection_operations mds_con_ops;
41
42
43/*
44 * mds reply parsing
45 */
46
47/*
48 * parse individual inode info
49 */
50static int parse_reply_info_in(void **p, void *end,
51 struct ceph_mds_reply_info_in *info)
52{
53 int err = -EIO;
54
55 info->in = *p;
56 *p += sizeof(struct ceph_mds_reply_inode) +
57 sizeof(*info->in->fragtree.splits) *
58 le32_to_cpu(info->in->fragtree.nsplits);
59
60 ceph_decode_32_safe(p, end, info->symlink_len, bad);
61 ceph_decode_need(p, end, info->symlink_len, bad);
62 info->symlink = *p;
63 *p += info->symlink_len;
64
65 ceph_decode_32_safe(p, end, info->xattr_len, bad);
66 ceph_decode_need(p, end, info->xattr_len, bad);
67 info->xattr_data = *p;
68 *p += info->xattr_len;
69 return 0;
70bad:
71 return err;
72}
73
74/*
75 * parse a normal reply, which may contain a (dir+)dentry and/or a
76 * target inode.
77 */
78static int parse_reply_info_trace(void **p, void *end,
79 struct ceph_mds_reply_info_parsed *info)
80{
81 int err;
82
83 if (info->head->is_dentry) {
84 err = parse_reply_info_in(p, end, &info->diri);
85 if (err < 0)
86 goto out_bad;
87
88 if (unlikely(*p + sizeof(*info->dirfrag) > end))
89 goto bad;
90 info->dirfrag = *p;
91 *p += sizeof(*info->dirfrag) +
92 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
93 if (unlikely(*p > end))
94 goto bad;
95
96 ceph_decode_32_safe(p, end, info->dname_len, bad);
97 ceph_decode_need(p, end, info->dname_len, bad);
98 info->dname = *p;
99 *p += info->dname_len;
100 info->dlease = *p;
101 *p += sizeof(*info->dlease);
102 }
103
104 if (info->head->is_target) {
105 err = parse_reply_info_in(p, end, &info->targeti);
106 if (err < 0)
107 goto out_bad;
108 }
109
110 if (unlikely(*p != end))
111 goto bad;
112 return 0;
113
114bad:
115 err = -EIO;
116out_bad:
117 pr_err("problem parsing mds trace %d\n", err);
118 return err;
119}
120
121/*
122 * parse readdir results
123 */
124static int parse_reply_info_dir(void **p, void *end,
125 struct ceph_mds_reply_info_parsed *info)
126{
127 u32 num, i = 0;
128 int err;
129
130 info->dir_dir = *p;
131 if (*p + sizeof(*info->dir_dir) > end)
132 goto bad;
133 *p += sizeof(*info->dir_dir) +
134 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
135 if (*p > end)
136 goto bad;
137
138 ceph_decode_need(p, end, sizeof(num) + 2, bad);
139 ceph_decode_32(p, num);
140 ceph_decode_8(p, info->dir_end);
141 ceph_decode_8(p, info->dir_complete);
142 if (num == 0)
143 goto done;
144
145 /* alloc large array */
146 info->dir_nr = num;
147 info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
148 sizeof(*info->dir_dname) +
149 sizeof(*info->dir_dname_len) +
150 sizeof(*info->dir_dlease),
151 GFP_NOFS);
152 if (info->dir_in == NULL) {
153 err = -ENOMEM;
154 goto out_bad;
155 }
156 info->dir_dname = (void *)(info->dir_in + num);
157 info->dir_dname_len = (void *)(info->dir_dname + num);
158 info->dir_dlease = (void *)(info->dir_dname_len + num);
159
160 while (num) {
161 /* dentry */
162 ceph_decode_need(p, end, sizeof(u32)*2, bad);
163 ceph_decode_32(p, info->dir_dname_len[i]);
164 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
165 info->dir_dname[i] = *p;
166 *p += info->dir_dname_len[i];
167 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
168 info->dir_dname[i]);
169 info->dir_dlease[i] = *p;
170 *p += sizeof(struct ceph_mds_reply_lease);
171
172 /* inode */
173 err = parse_reply_info_in(p, end, &info->dir_in[i]);
174 if (err < 0)
175 goto out_bad;
176 i++;
177 num--;
178 }
179
180done:
181 if (*p != end)
182 goto bad;
183 return 0;
184
185bad:
186 err = -EIO;
187out_bad:
188 pr_err("problem parsing dir contents %d\n", err);
189 return err;
190}
191
192/*
193 * parse entire mds reply
194 */
195static int parse_reply_info(struct ceph_msg *msg,
196 struct ceph_mds_reply_info_parsed *info)
197{
198 void *p, *end;
199 u32 len;
200 int err;
201
202 info->head = msg->front.iov_base;
203 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
204 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
205
206 /* trace */
207 ceph_decode_32_safe(&p, end, len, bad);
208 if (len > 0) {
209 err = parse_reply_info_trace(&p, p+len, info);
210 if (err < 0)
211 goto out_bad;
212 }
213
214 /* dir content */
215 ceph_decode_32_safe(&p, end, len, bad);
216 if (len > 0) {
217 err = parse_reply_info_dir(&p, p+len, info);
218 if (err < 0)
219 goto out_bad;
220 }
221
222 /* snap blob */
223 ceph_decode_32_safe(&p, end, len, bad);
224 info->snapblob_len = len;
225 info->snapblob = p;
226 p += len;
227
228 if (p != end)
229 goto bad;
230 return 0;
231
232bad:
233 err = -EIO;
234out_bad:
235 pr_err("mds parse_reply err %d\n", err);
236 return err;
237}
238
239static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
240{
241 kfree(info->dir_in);
242}
243
244
245/*
246 * sessions
247 */
248static const char *session_state_name(int s)
249{
250 switch (s) {
251 case CEPH_MDS_SESSION_NEW: return "new";
252 case CEPH_MDS_SESSION_OPENING: return "opening";
253 case CEPH_MDS_SESSION_OPEN: return "open";
254 case CEPH_MDS_SESSION_HUNG: return "hung";
255 case CEPH_MDS_SESSION_CLOSING: return "closing";
256 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
257 default: return "???";
258 }
259}
260
261static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
262{
263 if (atomic_inc_not_zero(&s->s_ref)) {
264 dout("mdsc get_session %p %d -> %d\n", s,
265 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
266 return s;
267 } else {
268 dout("mdsc get_session %p 0 -- FAIL", s);
269 return NULL;
270 }
271}
272
273void ceph_put_mds_session(struct ceph_mds_session *s)
274{
275 dout("mdsc put_session %p %d -> %d\n", s,
276 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
277 if (atomic_dec_and_test(&s->s_ref)) {
278 ceph_con_shutdown(&s->s_con);
279 kfree(s);
280 }
281}
282
283/*
284 * called under mdsc->mutex
285 */
286struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
287 int mds)
288{
289 struct ceph_mds_session *session;
290
291 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
292 return NULL;
293 session = mdsc->sessions[mds];
294 dout("lookup_mds_session %p %d\n", session,
295 atomic_read(&session->s_ref));
296 get_session(session);
297 return session;
298}
299
300static bool __have_session(struct ceph_mds_client *mdsc, int mds)
301{
302 if (mds >= mdsc->max_sessions)
303 return false;
304 return mdsc->sessions[mds];
305}
306
307/*
308 * create+register a new session for given mds.
309 * called under mdsc->mutex.
310 */
311static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
312 int mds)
313{
314 struct ceph_mds_session *s;
315
316 s = kzalloc(sizeof(*s), GFP_NOFS);
317 s->s_mdsc = mdsc;
318 s->s_mds = mds;
319 s->s_state = CEPH_MDS_SESSION_NEW;
320 s->s_ttl = 0;
321 s->s_seq = 0;
322 mutex_init(&s->s_mutex);
323
324 ceph_con_init(mdsc->client->msgr, &s->s_con);
325 s->s_con.private = s;
326 s->s_con.ops = &mds_con_ops;
327 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
328 s->s_con.peer_name.num = cpu_to_le64(mds);
329 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
330
331 spin_lock_init(&s->s_cap_lock);
332 s->s_cap_gen = 0;
333 s->s_cap_ttl = 0;
334 s->s_renew_requested = 0;
335 s->s_renew_seq = 0;
336 INIT_LIST_HEAD(&s->s_caps);
337 s->s_nr_caps = 0;
338 atomic_set(&s->s_ref, 1);
339 INIT_LIST_HEAD(&s->s_waiting);
340 INIT_LIST_HEAD(&s->s_unsafe);
341 s->s_num_cap_releases = 0;
342 INIT_LIST_HEAD(&s->s_cap_releases);
343 INIT_LIST_HEAD(&s->s_cap_releases_done);
344 INIT_LIST_HEAD(&s->s_cap_flushing);
345 INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
346
347 dout("register_session mds%d\n", mds);
348 if (mds >= mdsc->max_sessions) {
349 int newmax = 1 << get_count_order(mds+1);
350 struct ceph_mds_session **sa;
351
352 dout("register_session realloc to %d\n", newmax);
353 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
354 if (sa == NULL)
355 return ERR_PTR(-ENOMEM);
356 if (mdsc->sessions) {
357 memcpy(sa, mdsc->sessions,
358 mdsc->max_sessions * sizeof(void *));
359 kfree(mdsc->sessions);
360 }
361 mdsc->sessions = sa;
362 mdsc->max_sessions = newmax;
363 }
364 mdsc->sessions[mds] = s;
365 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
366 return s;
367}
368
369/*
370 * called under mdsc->mutex
371 */
372static void unregister_session(struct ceph_mds_client *mdsc, int mds)
373{
374 dout("unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
375 ceph_put_mds_session(mdsc->sessions[mds]);
376 mdsc->sessions[mds] = NULL;
377}
378
379/*
380 * drop session refs in request.
381 *
382 * should be last request ref, or hold mdsc->mutex
383 */
384static void put_request_session(struct ceph_mds_request *req)
385{
386 if (req->r_session) {
387 ceph_put_mds_session(req->r_session);
388 req->r_session = NULL;
389 }
390}
391
392void ceph_mdsc_put_request(struct ceph_mds_request *req)
393{
394 dout("mdsc put_request %p %d -> %d\n", req,
395 atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1);
396 if (atomic_dec_and_test(&req->r_ref)) {
397 if (req->r_request)
398 ceph_msg_put(req->r_request);
399 if (req->r_reply) {
400 ceph_msg_put(req->r_reply);
401 destroy_reply_info(&req->r_reply_info);
402 }
403 if (req->r_inode) {
404 ceph_put_cap_refs(ceph_inode(req->r_inode),
405 CEPH_CAP_PIN);
406 iput(req->r_inode);
407 }
408 if (req->r_locked_dir)
409 ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
410 CEPH_CAP_PIN);
411 if (req->r_target_inode)
412 iput(req->r_target_inode);
413 if (req->r_dentry)
414 dput(req->r_dentry);
415 if (req->r_old_dentry) {
416 ceph_put_cap_refs(
417 ceph_inode(req->r_old_dentry->d_parent->d_inode),
418 CEPH_CAP_PIN);
419 dput(req->r_old_dentry);
420 }
421 kfree(req->r_path1);
422 kfree(req->r_path2);
423 put_request_session(req);
424 ceph_unreserve_caps(&req->r_caps_reservation);
425 kfree(req);
426 }
427}
428
429/*
430 * lookup session, bump ref if found.
431 *
432 * called under mdsc->mutex.
433 */
434static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
435 u64 tid)
436{
437 struct ceph_mds_request *req;
438 req = radix_tree_lookup(&mdsc->request_tree, tid);
439 if (req)
440 ceph_mdsc_get_request(req);
441 return req;
442}
443
444/*
445 * Register an in-flight request, and assign a tid. Link to directory
446 * are modifying (if any).
447 *
448 * Called under mdsc->mutex.
449 */
450static void __register_request(struct ceph_mds_client *mdsc,
451 struct ceph_mds_request *req,
452 struct inode *dir)
453{
454 req->r_tid = ++mdsc->last_tid;
455 if (req->r_num_caps)
456 ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
457 dout("__register_request %p tid %lld\n", req, req->r_tid);
458 ceph_mdsc_get_request(req);
459 radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
460
461 if (dir) {
462 struct ceph_inode_info *ci = ceph_inode(dir);
463
464 spin_lock(&ci->i_unsafe_lock);
465 req->r_unsafe_dir = dir;
466 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
467 spin_unlock(&ci->i_unsafe_lock);
468 }
469}
470
471static void __unregister_request(struct ceph_mds_client *mdsc,
472 struct ceph_mds_request *req)
473{
474 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
475 radix_tree_delete(&mdsc->request_tree, req->r_tid);
476 ceph_mdsc_put_request(req);
477
478 if (req->r_unsafe_dir) {
479 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
480
481 spin_lock(&ci->i_unsafe_lock);
482 list_del_init(&req->r_unsafe_dir_item);
483 spin_unlock(&ci->i_unsafe_lock);
484 }
485}
486
487/*
488 * Choose mds to send request to next. If there is a hint set in the
489 * request (e.g., due to a prior forward hint from the mds), use that.
490 * Otherwise, consult frag tree and/or caps to identify the
491 * appropriate mds. If all else fails, choose randomly.
492 *
493 * Called under mdsc->mutex.
494 */
495static int __choose_mds(struct ceph_mds_client *mdsc,
496 struct ceph_mds_request *req)
497{
498 struct inode *inode;
499 struct ceph_inode_info *ci;
500 struct ceph_cap *cap;
501 int mode = req->r_direct_mode;
502 int mds = -1;
503 u32 hash = req->r_direct_hash;
504 bool is_hash = req->r_direct_is_hash;
505
506 /*
507 * is there a specific mds we should try? ignore hint if we have
508 * no session and the mds is not up (active or recovering).
509 */
510 if (req->r_resend_mds >= 0 &&
511 (__have_session(mdsc, req->r_resend_mds) ||
512 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
513 dout("choose_mds using resend_mds mds%d\n",
514 req->r_resend_mds);
515 return req->r_resend_mds;
516 }
517
518 if (mode == USE_RANDOM_MDS)
519 goto random;
520
521 inode = NULL;
522 if (req->r_inode) {
523 inode = req->r_inode;
524 } else if (req->r_dentry) {
525 if (req->r_dentry->d_inode) {
526 inode = req->r_dentry->d_inode;
527 } else {
528 inode = req->r_dentry->d_parent->d_inode;
529 hash = req->r_dentry->d_name.hash;
530 is_hash = true;
531 }
532 }
533 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
534 (int)hash, mode);
535 if (!inode)
536 goto random;
537 ci = ceph_inode(inode);
538
539 if (is_hash && S_ISDIR(inode->i_mode)) {
540 struct ceph_inode_frag frag;
541 int found;
542
543 ceph_choose_frag(ci, hash, &frag, &found);
544 if (found) {
545 if (mode == USE_ANY_MDS && frag.ndist > 0) {
546 u8 r;
547
548 /* choose a random replica */
549 get_random_bytes(&r, 1);
550 r %= frag.ndist;
551 mds = frag.dist[r];
552 dout("choose_mds %p %llx.%llx "
553 "frag %u mds%d (%d/%d)\n",
554 inode, ceph_vinop(inode),
555 frag.frag, frag.mds,
556 (int)r, frag.ndist);
557 return mds;
558 }
559
560 /* since this file/dir wasn't known to be
561 * replicated, then we want to look for the
562 * authoritative mds. */
563 mode = USE_AUTH_MDS;
564 if (frag.mds >= 0) {
565 /* choose auth mds */
566 mds = frag.mds;
567 dout("choose_mds %p %llx.%llx "
568 "frag %u mds%d (auth)\n",
569 inode, ceph_vinop(inode), frag.frag, mds);
570 return mds;
571 }
572 }
573 }
574
575 spin_lock(&inode->i_lock);
576 cap = NULL;
577 if (mode == USE_AUTH_MDS)
578 cap = ci->i_auth_cap;
579 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
580 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
581 if (!cap) {
582 spin_unlock(&inode->i_lock);
583 goto random;
584 }
585 mds = cap->session->s_mds;
586 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
587 inode, ceph_vinop(inode), mds,
588 cap == ci->i_auth_cap ? "auth " : "", cap);
589 spin_unlock(&inode->i_lock);
590 return mds;
591
592random:
593 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
594 dout("choose_mds chose random mds%d\n", mds);
595 return mds;
596}
597
598
599/*
600 * session messages
601 */
602static struct ceph_msg *create_session_msg(u32 op, u64 seq)
603{
604 struct ceph_msg *msg;
605 struct ceph_mds_session_head *h;
606
607 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
608 if (IS_ERR(msg)) {
609 pr_err("create_session_msg ENOMEM creating msg\n");
610 return ERR_PTR(PTR_ERR(msg));
611 }
612 h = msg->front.iov_base;
613 h->op = cpu_to_le32(op);
614 h->seq = cpu_to_le64(seq);
615 return msg;
616}
617
618/*
619 * send session open request.
620 *
621 * called under mdsc->mutex
622 */
623static int __open_session(struct ceph_mds_client *mdsc,
624 struct ceph_mds_session *session)
625{
626 struct ceph_msg *msg;
627 int mstate;
628 int mds = session->s_mds;
629 int err = 0;
630
631 /* wait for mds to go active? */
632 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
633 dout("open_session to mds%d (%s)\n", mds,
634 ceph_mds_state_name(mstate));
635 session->s_state = CEPH_MDS_SESSION_OPENING;
636 session->s_renew_requested = jiffies;
637
638 /* send connect message */
639 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
640 if (IS_ERR(msg)) {
641 err = PTR_ERR(msg);
642 goto out;
643 }
644 ceph_con_send(&session->s_con, msg);
645
646out:
647 return 0;
648}
649
650/*
651 * session caps
652 */
653
654/*
655 * Free preallocated cap messages assigned to this session
656 */
657static void cleanup_cap_releases(struct ceph_mds_session *session)
658{
659 struct ceph_msg *msg;
660
661 spin_lock(&session->s_cap_lock);
662 while (!list_empty(&session->s_cap_releases)) {
663 msg = list_first_entry(&session->s_cap_releases,
664 struct ceph_msg, list_head);
665 list_del_init(&msg->list_head);
666 ceph_msg_put(msg);
667 }
668 while (!list_empty(&session->s_cap_releases_done)) {
669 msg = list_first_entry(&session->s_cap_releases_done,
670 struct ceph_msg, list_head);
671 list_del_init(&msg->list_head);
672 ceph_msg_put(msg);
673 }
674 spin_unlock(&session->s_cap_lock);
675}
676
677/*
678 * Helper to safely iterate over all caps associated with a session.
679 *
680 * caller must hold session s_mutex
681 */
682static int iterate_session_caps(struct ceph_mds_session *session,
683 int (*cb)(struct inode *, struct ceph_cap *,
684 void *), void *arg)
685{
686 struct ceph_cap *cap, *ncap;
687 struct inode *inode;
688 int ret;
689
690 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
691 spin_lock(&session->s_cap_lock);
692 list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
693 inode = igrab(&cap->ci->vfs_inode);
694 if (!inode)
695 continue;
696 spin_unlock(&session->s_cap_lock);
697 ret = cb(inode, cap, arg);
698 iput(inode);
699 if (ret < 0)
700 return ret;
701 spin_lock(&session->s_cap_lock);
702 }
703 spin_unlock(&session->s_cap_lock);
704
705 return 0;
706}
707
708static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
709 void *arg)
710{
711 struct ceph_inode_info *ci = ceph_inode(inode);
712 dout("removing cap %p, ci is %p, inode is %p\n",
713 cap, ci, &ci->vfs_inode);
714 ceph_remove_cap(cap);
715 return 0;
716}
717
718/*
719 * caller must hold session s_mutex
720 */
721static void remove_session_caps(struct ceph_mds_session *session)
722{
723 dout("remove_session_caps on %p\n", session);
724 iterate_session_caps(session, remove_session_caps_cb, NULL);
725 BUG_ON(session->s_nr_caps > 0);
726 cleanup_cap_releases(session);
727}
728
729/*
730 * wake up any threads waiting on this session's caps. if the cap is
731 * old (didn't get renewed on the client reconnect), remove it now.
732 *
733 * caller must hold s_mutex.
734 */
735static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
736 void *arg)
737{
738 struct ceph_mds_session *session = arg;
739
740 spin_lock(&inode->i_lock);
741 if (cap->gen != session->s_cap_gen) {
742 pr_err("failed reconnect %p %llx.%llx cap %p "
743 "(gen %d < session %d)\n", inode, ceph_vinop(inode),
744 cap, cap->gen, session->s_cap_gen);
745 __ceph_remove_cap(cap, NULL);
746 }
747 wake_up(&ceph_inode(inode)->i_cap_wq);
748 spin_unlock(&inode->i_lock);
749 return 0;
750}
751
752static void wake_up_session_caps(struct ceph_mds_session *session)
753{
754 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
755 iterate_session_caps(session, wake_up_session_cb, session);
756}
757
758/*
759 * Send periodic message to MDS renewing all currently held caps. The
760 * ack will reset the expiration for all caps from this session.
761 *
762 * caller holds s_mutex
763 */
764static int send_renew_caps(struct ceph_mds_client *mdsc,
765 struct ceph_mds_session *session)
766{
767 struct ceph_msg *msg;
768 int state;
769
770 if (time_after_eq(jiffies, session->s_cap_ttl) &&
771 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
772 pr_info("mds%d caps stale\n", session->s_mds);
773
774 /* do not try to renew caps until a recovering mds has reconnected
775 * with its clients. */
776 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
777 if (state < CEPH_MDS_STATE_RECONNECT) {
778 dout("send_renew_caps ignoring mds%d (%s)\n",
779 session->s_mds, ceph_mds_state_name(state));
780 return 0;
781 }
782
783 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
784 ceph_mds_state_name(state));
785 session->s_renew_requested = jiffies;
786 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
787 ++session->s_renew_seq);
788 if (IS_ERR(msg))
789 return PTR_ERR(msg);
790 ceph_con_send(&session->s_con, msg);
791 return 0;
792}
793
794/*
795 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
796 */
797static void renewed_caps(struct ceph_mds_client *mdsc,
798 struct ceph_mds_session *session, int is_renew)
799{
800 int was_stale;
801 int wake = 0;
802
803 spin_lock(&session->s_cap_lock);
804 was_stale = is_renew && (session->s_cap_ttl == 0 ||
805 time_after_eq(jiffies, session->s_cap_ttl));
806
807 session->s_cap_ttl = session->s_renew_requested +
808 mdsc->mdsmap->m_session_timeout*HZ;
809
810 if (was_stale) {
811 if (time_before(jiffies, session->s_cap_ttl)) {
812 pr_info("mds%d caps renewed\n", session->s_mds);
813 wake = 1;
814 } else {
815 pr_info("mds%d caps still stale\n", session->s_mds);
816 }
817 }
818 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
819 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
820 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
821 spin_unlock(&session->s_cap_lock);
822
823 if (wake)
824 wake_up_session_caps(session);
825}
826
827/*
828 * send a session close request
829 */
830static int request_close_session(struct ceph_mds_client *mdsc,
831 struct ceph_mds_session *session)
832{
833 struct ceph_msg *msg;
834 int err = 0;
835
836 dout("request_close_session mds%d state %s seq %lld\n",
837 session->s_mds, session_state_name(session->s_state),
838 session->s_seq);
839 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
840 if (IS_ERR(msg))
841 err = PTR_ERR(msg);
842 else
843 ceph_con_send(&session->s_con, msg);
844 return err;
845}
846
847/*
848 * Called with s_mutex held.
849 */
850static int __close_session(struct ceph_mds_client *mdsc,
851 struct ceph_mds_session *session)
852{
853 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
854 return 0;
855 session->s_state = CEPH_MDS_SESSION_CLOSING;
856 return request_close_session(mdsc, session);
857}
858
859/*
860 * Trim old(er) caps.
861 *
862 * Because we can't cache an inode without one or more caps, we do
863 * this indirectly: if a cap is unused, we prune its aliases, at which
864 * point the inode will hopefully get dropped to.
865 *
866 * Yes, this is a bit sloppy. Our only real goal here is to respond to
867 * memory pressure from the MDS, though, so it needn't be perfect.
868 */
869static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
870{
871 struct ceph_mds_session *session = arg;
872 struct ceph_inode_info *ci = ceph_inode(inode);
873 int used, oissued, mine;
874
875 if (session->s_trim_caps <= 0)
876 return -1;
877
878 spin_lock(&inode->i_lock);
879 mine = cap->issued | cap->implemented;
880 used = __ceph_caps_used(ci);
881 oissued = __ceph_caps_issued_other(ci, cap);
882
883 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
884 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
885 ceph_cap_string(used));
886 if (ci->i_dirty_caps)
887 goto out; /* dirty caps */
888 if ((used & ~oissued) & mine)
889 goto out; /* we need these caps */
890
891 session->s_trim_caps--;
892 if (oissued) {
893 /* we aren't the only cap.. just remove us */
894 __ceph_remove_cap(cap, NULL);
895 } else {
896 /* try to drop referring dentries */
897 spin_unlock(&inode->i_lock);
898 d_prune_aliases(inode);
899 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
900 inode, cap, atomic_read(&inode->i_count));
901 return 0;
902 }
903
904out:
905 spin_unlock(&inode->i_lock);
906 return 0;
907}
908
909/*
910 * Trim session cap count down to some max number.
911 */
912static int trim_caps(struct ceph_mds_client *mdsc,
913 struct ceph_mds_session *session,
914 int max_caps)
915{
916 int trim_caps = session->s_nr_caps - max_caps;
917
918 dout("trim_caps mds%d start: %d / %d, trim %d\n",
919 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
920 if (trim_caps > 0) {
921 session->s_trim_caps = trim_caps;
922 iterate_session_caps(session, trim_caps_cb, session);
923 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
924 session->s_mds, session->s_nr_caps, max_caps,
925 trim_caps - session->s_trim_caps);
926 }
927 return 0;
928}
929
930/*
931 * Allocate cap_release messages. If there is a partially full message
932 * in the queue, try to allocate enough to cover it's remainder, so that
933 * we can send it immediately.
934 *
935 * Called under s_mutex.
936 */
937static int add_cap_releases(struct ceph_mds_client *mdsc,
938 struct ceph_mds_session *session,
939 int extra)
940{
941 struct ceph_msg *msg;
942 struct ceph_mds_cap_release *head;
943 int err = -ENOMEM;
944
945 if (extra < 0)
946 extra = mdsc->client->mount_args.cap_release_safety;
947
948 spin_lock(&session->s_cap_lock);
949
950 if (!list_empty(&session->s_cap_releases)) {
951 msg = list_first_entry(&session->s_cap_releases,
952 struct ceph_msg,
953 list_head);
954 head = msg->front.iov_base;
955 extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
956 }
957
958 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
959 spin_unlock(&session->s_cap_lock);
960 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
961 0, 0, NULL);
962 if (!msg)
963 goto out_unlocked;
964 dout("add_cap_releases %p msg %p now %d\n", session, msg,
965 (int)msg->front.iov_len);
966 head = msg->front.iov_base;
967 head->num = cpu_to_le32(0);
968 msg->front.iov_len = sizeof(*head);
969 spin_lock(&session->s_cap_lock);
970 list_add(&msg->list_head, &session->s_cap_releases);
971 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
972 }
973
974 if (!list_empty(&session->s_cap_releases)) {
975 msg = list_first_entry(&session->s_cap_releases,
976 struct ceph_msg,
977 list_head);
978 head = msg->front.iov_base;
979 if (head->num) {
980 dout(" queueing non-full %p (%d)\n", msg,
981 le32_to_cpu(head->num));
982 list_move_tail(&msg->list_head,
983 &session->s_cap_releases_done);
984 session->s_num_cap_releases -=
985 CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
986 }
987 }
988 err = 0;
989 spin_unlock(&session->s_cap_lock);
990out_unlocked:
991 return err;
992}
993
994/*
995 * flush all dirty inode data to disk.
996 *
997 * returns true if we've flushed through want_flush_seq
998 */
999static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1000{
1001 int mds, ret = 1;
1002
1003 dout("check_cap_flush want %lld\n", want_flush_seq);
1004 mutex_lock(&mdsc->mutex);
1005 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1006 struct ceph_mds_session *session = mdsc->sessions[mds];
1007
1008 if (!session)
1009 continue;
1010 get_session(session);
1011 mutex_unlock(&mdsc->mutex);
1012
1013 mutex_lock(&session->s_mutex);
1014 if (!list_empty(&session->s_cap_flushing)) {
1015 struct ceph_inode_info *ci =
1016 list_entry(session->s_cap_flushing.next,
1017 struct ceph_inode_info,
1018 i_flushing_item);
1019 struct inode *inode = &ci->vfs_inode;
1020
1021 spin_lock(&inode->i_lock);
1022 if (ci->i_cap_flush_seq <= want_flush_seq) {
1023 dout("check_cap_flush still flushing %p "
1024 "seq %lld <= %lld to mds%d\n", inode,
1025 ci->i_cap_flush_seq, want_flush_seq,
1026 session->s_mds);
1027 ret = 0;
1028 }
1029 spin_unlock(&inode->i_lock);
1030 }
1031 mutex_unlock(&session->s_mutex);
1032 ceph_put_mds_session(session);
1033
1034 if (!ret)
1035 return ret;
1036 mutex_lock(&mdsc->mutex);
1037 }
1038
1039 mutex_unlock(&mdsc->mutex);
1040 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1041 return ret;
1042}
1043
1044/*
1045 * called under s_mutex
1046 */
1047static void send_cap_releases(struct ceph_mds_client *mdsc,
1048 struct ceph_mds_session *session)
1049{
1050 struct ceph_msg *msg;
1051
1052 dout("send_cap_releases mds%d\n", session->s_mds);
1053 while (1) {
1054 spin_lock(&session->s_cap_lock);
1055 if (list_empty(&session->s_cap_releases_done))
1056 break;
1057 msg = list_first_entry(&session->s_cap_releases_done,
1058 struct ceph_msg, list_head);
1059 list_del_init(&msg->list_head);
1060 spin_unlock(&session->s_cap_lock);
1061 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1062 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1063 ceph_con_send(&session->s_con, msg);
1064 }
1065 spin_unlock(&session->s_cap_lock);
1066}
1067
1068/*
1069 * requests
1070 */
1071
1072/*
1073 * Create an mds request.
1074 */
1075struct ceph_mds_request *
1076ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1077{
1078 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1079
1080 if (!req)
1081 return ERR_PTR(-ENOMEM);
1082
1083 req->r_started = jiffies;
1084 req->r_resend_mds = -1;
1085 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1086 req->r_fmode = -1;
1087 atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */
1088 INIT_LIST_HEAD(&req->r_wait);
1089 init_completion(&req->r_completion);
1090 init_completion(&req->r_safe_completion);
1091 INIT_LIST_HEAD(&req->r_unsafe_item);
1092
1093 req->r_op = op;
1094 req->r_direct_mode = mode;
1095 return req;
1096}
1097
1098/*
1099 * return oldest (lowest) tid in request tree, 0 if none.
1100 *
1101 * called under mdsc->mutex.
1102 */
1103static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1104{
1105 struct ceph_mds_request *first;
1106 if (radix_tree_gang_lookup(&mdsc->request_tree,
1107 (void **)&first, 0, 1) <= 0)
1108 return 0;
1109 return first->r_tid;
1110}
1111
1112/*
1113 * Build a dentry's path. Allocate on heap; caller must kfree. Based
1114 * on build_path_from_dentry in fs/cifs/dir.c.
1115 *
1116 * If @stop_on_nosnap, generate path relative to the first non-snapped
1117 * inode.
1118 *
1119 * Encode hidden .snap dirs as a double /, i.e.
1120 * foo/.snap/bar -> foo//bar
1121 */
1122char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1123 int stop_on_nosnap)
1124{
1125 struct dentry *temp;
1126 char *path;
1127 int len, pos;
1128
1129 if (dentry == NULL)
1130 return ERR_PTR(-EINVAL);
1131
1132retry:
1133 len = 0;
1134 for (temp = dentry; !IS_ROOT(temp);) {
1135 struct inode *inode = temp->d_inode;
1136 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1137 len++; /* slash only */
1138 else if (stop_on_nosnap && inode &&
1139 ceph_snap(inode) == CEPH_NOSNAP)
1140 break;
1141 else
1142 len += 1 + temp->d_name.len;
1143 temp = temp->d_parent;
1144 if (temp == NULL) {
1145 pr_err("build_path_dentry corrupt dentry %p\n", dentry);
1146 return ERR_PTR(-EINVAL);
1147 }
1148 }
1149 if (len)
1150 len--; /* no leading '/' */
1151
1152 path = kmalloc(len+1, GFP_NOFS);
1153 if (path == NULL)
1154 return ERR_PTR(-ENOMEM);
1155 pos = len;
1156 path[pos] = 0; /* trailing null */
1157 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1158 struct inode *inode = temp->d_inode;
1159
1160 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1161 dout("build_path_dentry path+%d: %p SNAPDIR\n",
1162 pos, temp);
1163 } else if (stop_on_nosnap && inode &&
1164 ceph_snap(inode) == CEPH_NOSNAP) {
1165 break;
1166 } else {
1167 pos -= temp->d_name.len;
1168 if (pos < 0)
1169 break;
1170 strncpy(path + pos, temp->d_name.name,
1171 temp->d_name.len);
1172 dout("build_path_dentry path+%d: %p '%.*s'\n",
1173 pos, temp, temp->d_name.len, path + pos);
1174 }
1175 if (pos)
1176 path[--pos] = '/';
1177 temp = temp->d_parent;
1178 if (temp == NULL) {
1179 pr_err("build_path_dentry corrupt dentry\n");
1180 kfree(path);
1181 return ERR_PTR(-EINVAL);
1182 }
1183 }
1184 if (pos != 0) {
1185 pr_err("build_path_dentry did not end path lookup where "
1186 "expected, namelen is %d, pos is %d\n", len, pos);
1187 /* presumably this is only possible if racing with a
1188 rename of one of the parent directories (we can not
1189 lock the dentries above us to prevent this, but
1190 retrying should be harmless) */
1191 kfree(path);
1192 goto retry;
1193 }
1194
1195 *base = ceph_ino(temp->d_inode);
1196 *plen = len;
1197 dout("build_path_dentry on %p %d built %llx '%.*s'\n",
1198 dentry, atomic_read(&dentry->d_count), *base, len, path);
1199 return path;
1200}
1201
1202static int build_dentry_path(struct dentry *dentry,
1203 const char **ppath, int *ppathlen, u64 *pino,
1204 int *pfreepath)
1205{
1206 char *path;
1207
1208 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1209 *pino = ceph_ino(dentry->d_parent->d_inode);
1210 *ppath = dentry->d_name.name;
1211 *ppathlen = dentry->d_name.len;
1212 return 0;
1213 }
1214 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1215 if (IS_ERR(path))
1216 return PTR_ERR(path);
1217 *ppath = path;
1218 *pfreepath = 1;
1219 return 0;
1220}
1221
1222static int build_inode_path(struct inode *inode,
1223 const char **ppath, int *ppathlen, u64 *pino,
1224 int *pfreepath)
1225{
1226 struct dentry *dentry;
1227 char *path;
1228
1229 if (ceph_snap(inode) == CEPH_NOSNAP) {
1230 *pino = ceph_ino(inode);
1231 *ppathlen = 0;
1232 return 0;
1233 }
1234 dentry = d_find_alias(inode);
1235 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1236 dput(dentry);
1237 if (IS_ERR(path))
1238 return PTR_ERR(path);
1239 *ppath = path;
1240 *pfreepath = 1;
1241 return 0;
1242}
1243
1244/*
1245 * request arguments may be specified via an inode *, a dentry *, or
1246 * an explicit ino+path.
1247 */
1248static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1249 const char *rpath, u64 rino,
1250 const char **ppath, int *pathlen,
1251 u64 *ino, int *freepath)
1252{
1253 int r = 0;
1254
1255 if (rinode) {
1256 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1257 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1258 ceph_snap(rinode));
1259 } else if (rdentry) {
1260 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1261 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1262 *ppath);
1263 } else if (rpath) {
1264 *ino = rino;
1265 *ppath = rpath;
1266 *pathlen = strlen(rpath);
1267 dout(" path %.*s\n", *pathlen, rpath);
1268 }
1269
1270 return r;
1271}
1272
1273/*
1274 * called under mdsc->mutex
1275 */
1276static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1277 struct ceph_mds_request *req,
1278 int mds)
1279{
1280 struct ceph_msg *msg;
1281 struct ceph_mds_request_head *head;
1282 const char *path1 = NULL;
1283 const char *path2 = NULL;
1284 u64 ino1 = 0, ino2 = 0;
1285 int pathlen1 = 0, pathlen2 = 0;
1286 int freepath1 = 0, freepath2 = 0;
1287 int len;
1288 u16 releases;
1289 void *p, *end;
1290 int ret;
1291
1292 ret = set_request_path_attr(req->r_inode, req->r_dentry,
1293 req->r_path1, req->r_ino1.ino,
1294 &path1, &pathlen1, &ino1, &freepath1);
1295 if (ret < 0) {
1296 msg = ERR_PTR(ret);
1297 goto out;
1298 }
1299
1300 ret = set_request_path_attr(NULL, req->r_old_dentry,
1301 req->r_path2, req->r_ino2.ino,
1302 &path2, &pathlen2, &ino2, &freepath2);
1303 if (ret < 0) {
1304 msg = ERR_PTR(ret);
1305 goto out_free1;
1306 }
1307
1308 len = sizeof(*head) +
1309 pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
1310
1311 /* calculate (max) length for cap releases */
1312 len += sizeof(struct ceph_mds_request_release) *
1313 (!!req->r_inode_drop + !!req->r_dentry_drop +
1314 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1315 if (req->r_dentry_drop)
1316 len += req->r_dentry->d_name.len;
1317 if (req->r_old_dentry_drop)
1318 len += req->r_old_dentry->d_name.len;
1319
1320 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
1321 if (IS_ERR(msg))
1322 goto out_free2;
1323
1324 head = msg->front.iov_base;
1325 p = msg->front.iov_base + sizeof(*head);
1326 end = msg->front.iov_base + msg->front.iov_len;
1327
1328 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1329 head->op = cpu_to_le32(req->r_op);
1330 head->caller_uid = cpu_to_le32(current_fsuid());
1331 head->caller_gid = cpu_to_le32(current_fsgid());
1332 head->args = req->r_args;
1333
1334 ceph_encode_filepath(&p, end, ino1, path1);
1335 ceph_encode_filepath(&p, end, ino2, path2);
1336
1337 /* cap releases */
1338 releases = 0;
1339 if (req->r_inode_drop)
1340 releases += ceph_encode_inode_release(&p,
1341 req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1342 mds, req->r_inode_drop, req->r_inode_unless, 0);
1343 if (req->r_dentry_drop)
1344 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1345 mds, req->r_dentry_drop, req->r_dentry_unless);
1346 if (req->r_old_dentry_drop)
1347 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1348 mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1349 if (req->r_old_inode_drop)
1350 releases += ceph_encode_inode_release(&p,
1351 req->r_old_dentry->d_inode,
1352 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1353 head->num_releases = cpu_to_le16(releases);
1354
1355 BUG_ON(p > end);
1356 msg->front.iov_len = p - msg->front.iov_base;
1357 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1358
1359 msg->pages = req->r_pages;
1360 msg->nr_pages = req->r_num_pages;
1361 msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1362 msg->hdr.data_off = cpu_to_le16(0);
1363
1364out_free2:
1365 if (freepath2)
1366 kfree((char *)path2);
1367out_free1:
1368 if (freepath1)
1369 kfree((char *)path1);
1370out:
1371 return msg;
1372}
1373
1374/*
1375 * called under mdsc->mutex if error, under no mutex if
1376 * success.
1377 */
1378static void complete_request(struct ceph_mds_client *mdsc,
1379 struct ceph_mds_request *req)
1380{
1381 if (req->r_callback)
1382 req->r_callback(mdsc, req);
1383 else
1384 complete(&req->r_completion);
1385}
1386
1387/*
1388 * called under mdsc->mutex
1389 */
1390static int __prepare_send_request(struct ceph_mds_client *mdsc,
1391 struct ceph_mds_request *req,
1392 int mds)
1393{
1394 struct ceph_mds_request_head *rhead;
1395 struct ceph_msg *msg;
1396 int flags = 0;
1397
1398 req->r_mds = mds;
1399 req->r_attempts++;
1400 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1401 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1402
1403 if (req->r_request) {
1404 ceph_msg_put(req->r_request);
1405 req->r_request = NULL;
1406 }
1407 msg = create_request_message(mdsc, req, mds);
1408 if (IS_ERR(msg)) {
1409 req->r_reply = ERR_PTR(PTR_ERR(msg));
1410 complete_request(mdsc, req);
1411 return -PTR_ERR(msg);
1412 }
1413 req->r_request = msg;
1414
1415 rhead = msg->front.iov_base;
1416 rhead->tid = cpu_to_le64(req->r_tid);
1417 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1418 if (req->r_got_unsafe)
1419 flags |= CEPH_MDS_FLAG_REPLAY;
1420 if (req->r_locked_dir)
1421 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1422 rhead->flags = cpu_to_le32(flags);
1423 rhead->num_fwd = req->r_num_fwd;
1424 rhead->num_retry = req->r_attempts - 1;
1425
1426 dout(" r_locked_dir = %p\n", req->r_locked_dir);
1427
1428 if (req->r_target_inode && req->r_got_unsafe)
1429 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1430 else
1431 rhead->ino = 0;
1432 return 0;
1433}
1434
1435/*
1436 * send request, or put it on the appropriate wait list.
1437 */
1438static int __do_request(struct ceph_mds_client *mdsc,
1439 struct ceph_mds_request *req)
1440{
1441 struct ceph_mds_session *session = NULL;
1442 int mds = -1;
1443 int err = -EAGAIN;
1444
1445 if (req->r_reply)
1446 goto out;
1447
1448 if (req->r_timeout &&
1449 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1450 dout("do_request timed out\n");
1451 err = -EIO;
1452 goto finish;
1453 }
1454
1455 mds = __choose_mds(mdsc, req);
1456 if (mds < 0 ||
1457 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1458 dout("do_request no mds or not active, waiting for map\n");
1459 list_add(&req->r_wait, &mdsc->waiting_for_map);
1460 goto out;
1461 }
1462
1463 /* get, open session */
1464 session = __ceph_lookup_mds_session(mdsc, mds);
1465 if (!session)
1466 session = register_session(mdsc, mds);
1467 dout("do_request mds%d session %p state %s\n", mds, session,
1468 session_state_name(session->s_state));
1469 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1470 session->s_state != CEPH_MDS_SESSION_HUNG) {
1471 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1472 session->s_state == CEPH_MDS_SESSION_CLOSING)
1473 __open_session(mdsc, session);
1474 list_add(&req->r_wait, &session->s_waiting);
1475 goto out_session;
1476 }
1477
1478 /* send request */
1479 req->r_session = get_session(session);
1480 req->r_resend_mds = -1; /* forget any previous mds hint */
1481
1482 if (req->r_request_started == 0) /* note request start time */
1483 req->r_request_started = jiffies;
1484
1485 err = __prepare_send_request(mdsc, req, mds);
1486 if (!err) {
1487 ceph_msg_get(req->r_request);
1488 ceph_con_send(&session->s_con, req->r_request);
1489 }
1490
1491out_session:
1492 ceph_put_mds_session(session);
1493out:
1494 return err;
1495
1496finish:
1497 req->r_reply = ERR_PTR(err);
1498 complete_request(mdsc, req);
1499 goto out;
1500}
1501
1502/*
1503 * called under mdsc->mutex
1504 */
1505static void __wake_requests(struct ceph_mds_client *mdsc,
1506 struct list_head *head)
1507{
1508 struct ceph_mds_request *req, *nreq;
1509
1510 list_for_each_entry_safe(req, nreq, head, r_wait) {
1511 list_del_init(&req->r_wait);
1512 __do_request(mdsc, req);
1513 }
1514}
1515
1516/*
1517 * Wake up threads with requests pending for @mds, so that they can
1518 * resubmit their requests to a possibly different mds. If @all is set,
1519 * wake up if their requests has been forwarded to @mds, too.
1520 */
1521static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
1522{
1523 struct ceph_mds_request *reqs[10];
1524 u64 nexttid = 0;
1525 int i, got;
1526
1527 dout("kick_requests mds%d\n", mds);
1528 while (nexttid <= mdsc->last_tid) {
1529 got = radix_tree_gang_lookup(&mdsc->request_tree,
1530 (void **)&reqs, nexttid, 10);
1531 if (got == 0)
1532 break;
1533 nexttid = reqs[got-1]->r_tid + 1;
1534 for (i = 0; i < got; i++) {
1535 if (reqs[i]->r_got_unsafe)
1536 continue;
1537 if (reqs[i]->r_session &&
1538 reqs[i]->r_session->s_mds == mds) {
1539 dout(" kicking tid %llu\n", reqs[i]->r_tid);
1540 put_request_session(reqs[i]);
1541 __do_request(mdsc, reqs[i]);
1542 }
1543 }
1544 }
1545}
1546
1547void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1548 struct ceph_mds_request *req)
1549{
1550 dout("submit_request on %p\n", req);
1551 mutex_lock(&mdsc->mutex);
1552 __register_request(mdsc, req, NULL);
1553 __do_request(mdsc, req);
1554 mutex_unlock(&mdsc->mutex);
1555}
1556
1557/*
1558 * Synchrously perform an mds request. Take care of all of the
1559 * session setup, forwarding, retry details.
1560 */
1561int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1562 struct inode *dir,
1563 struct ceph_mds_request *req)
1564{
1565 int err;
1566
1567 dout("do_request on %p\n", req);
1568
1569 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1570 if (req->r_inode)
1571 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1572 if (req->r_locked_dir)
1573 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1574 if (req->r_old_dentry)
1575 ceph_get_cap_refs(
1576 ceph_inode(req->r_old_dentry->d_parent->d_inode),
1577 CEPH_CAP_PIN);
1578
1579 /* issue */
1580 mutex_lock(&mdsc->mutex);
1581 __register_request(mdsc, req, dir);
1582 __do_request(mdsc, req);
1583
1584 /* wait */
1585 if (!req->r_reply) {
1586 mutex_unlock(&mdsc->mutex);
1587 if (req->r_timeout) {
1588 err = wait_for_completion_timeout(&req->r_completion,
1589 req->r_timeout);
1590 if (err > 0)
1591 err = 0;
1592 else if (err == 0)
1593 req->r_reply = ERR_PTR(-EIO);
1594 } else {
1595 wait_for_completion(&req->r_completion);
1596 }
1597 mutex_lock(&mdsc->mutex);
1598 }
1599
1600 if (IS_ERR(req->r_reply)) {
1601 err = PTR_ERR(req->r_reply);
1602 req->r_reply = NULL;
1603
1604 /* clean up */
1605 __unregister_request(mdsc, req);
1606 if (!list_empty(&req->r_unsafe_item))
1607 list_del_init(&req->r_unsafe_item);
1608 complete(&req->r_safe_completion);
1609 } else if (req->r_err) {
1610 err = req->r_err;
1611 } else {
1612 err = le32_to_cpu(req->r_reply_info.head->result);
1613 }
1614 mutex_unlock(&mdsc->mutex);
1615
1616 dout("do_request %p done, result %d\n", req, err);
1617 return err;
1618}
1619
1620/*
1621 * Handle mds reply.
1622 *
1623 * We take the session mutex and parse and process the reply immediately.
1624 * This preserves the logical ordering of replies, capabilities, etc., sent
1625 * by the MDS as they are applied to our local cache.
1626 */
1627static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1628{
1629 struct ceph_mds_client *mdsc = session->s_mdsc;
1630 struct ceph_mds_request *req;
1631 struct ceph_mds_reply_head *head = msg->front.iov_base;
1632 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
1633 u64 tid;
1634 int err, result;
1635 int mds;
1636
1637 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1638 return;
1639 if (msg->front.iov_len < sizeof(*head)) {
1640 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1641 return;
1642 }
1643
1644 /* get request, session */
1645 tid = le64_to_cpu(head->tid);
1646 mutex_lock(&mdsc->mutex);
1647 req = __lookup_request(mdsc, tid);
1648 if (!req) {
1649 dout("handle_reply on unknown tid %llu\n", tid);
1650 mutex_unlock(&mdsc->mutex);
1651 return;
1652 }
1653 dout("handle_reply %p\n", req);
1654 mds = le64_to_cpu(msg->hdr.src.name.num);
1655
1656 /* correct session? */
1657 if (!req->r_session && req->r_session != session) {
1658 pr_err("mdsc_handle_reply got %llu on session mds%d"
1659 " not mds%d\n", tid, session->s_mds,
1660 req->r_session ? req->r_session->s_mds : -1);
1661 mutex_unlock(&mdsc->mutex);
1662 goto out;
1663 }
1664
1665 /* dup? */
1666 if ((req->r_got_unsafe && !head->safe) ||
1667 (req->r_got_safe && head->safe)) {
1668 pr_warning("got a dup %s reply on %llu from mds%d\n",
1669 head->safe ? "safe" : "unsafe", tid, mds);
1670 mutex_unlock(&mdsc->mutex);
1671 goto out;
1672 }
1673
1674 result = le32_to_cpu(head->result);
1675
1676 /*
1677 * Tolerate 2 consecutive ESTALEs from the same mds.
1678 * FIXME: we should be looking at the cap migrate_seq.
1679 */
1680 if (result == -ESTALE) {
1681 req->r_direct_mode = USE_AUTH_MDS;
1682 req->r_num_stale++;
1683 if (req->r_num_stale <= 2) {
1684 __do_request(mdsc, req);
1685 mutex_unlock(&mdsc->mutex);
1686 goto out;
1687 }
1688 } else {
1689 req->r_num_stale = 0;
1690 }
1691
1692 if (head->safe) {
1693 req->r_got_safe = true;
1694 __unregister_request(mdsc, req);
1695 complete(&req->r_safe_completion);
1696
1697 if (req->r_got_unsafe) {
1698 /*
1699 * We already handled the unsafe response, now do the
1700 * cleanup. No need to examine the response; the MDS
1701 * doesn't include any result info in the safe
1702 * response. And even if it did, there is nothing
1703 * useful we could do with a revised return value.
1704 */
1705 dout("got safe reply %llu, mds%d\n", tid, mds);
1706 list_del_init(&req->r_unsafe_item);
1707
1708 /* last unsafe request during umount? */
1709 if (mdsc->stopping && !__get_oldest_tid(mdsc))
1710 complete(&mdsc->safe_umount_waiters);
1711 mutex_unlock(&mdsc->mutex);
1712 goto out;
1713 }
1714 }
1715
1716 BUG_ON(req->r_reply);
1717
1718 if (!head->safe) {
1719 req->r_got_unsafe = true;
1720 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1721 }
1722
1723 dout("handle_reply tid %lld result %d\n", tid, result);
1724 rinfo = &req->r_reply_info;
1725 err = parse_reply_info(msg, rinfo);
1726 mutex_unlock(&mdsc->mutex);
1727
1728 mutex_lock(&session->s_mutex);
1729 if (err < 0) {
1730 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
1731 goto out_err;
1732 }
1733
1734 /* snap trace */
1735 if (rinfo->snapblob_len) {
1736 down_write(&mdsc->snap_rwsem);
1737 ceph_update_snap_trace(mdsc, rinfo->snapblob,
1738 rinfo->snapblob + rinfo->snapblob_len,
1739 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
1740 downgrade_write(&mdsc->snap_rwsem);
1741 } else {
1742 down_read(&mdsc->snap_rwsem);
1743 }
1744
1745 /* insert trace into our cache */
1746 err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1747 if (err == 0) {
1748 if (result == 0 && rinfo->dir_nr)
1749 ceph_readdir_prepopulate(req, req->r_session);
1750 ceph_unreserve_caps(&req->r_caps_reservation);
1751 }
1752
1753 up_read(&mdsc->snap_rwsem);
1754out_err:
1755 if (err) {
1756 req->r_err = err;
1757 } else {
1758 req->r_reply = msg;
1759 ceph_msg_get(msg);
1760 }
1761
1762 add_cap_releases(mdsc, req->r_session, -1);
1763 mutex_unlock(&session->s_mutex);
1764
1765 /* kick calling process */
1766 complete_request(mdsc, req);
1767out:
1768 ceph_mdsc_put_request(req);
1769 return;
1770}
1771
1772
1773
1774/*
1775 * handle mds notification that our request has been forwarded.
1776 */
1777static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
1778{
1779 struct ceph_mds_request *req;
1780 u64 tid;
1781 u32 next_mds;
1782 u32 fwd_seq;
1783 u8 must_resend;
1784 int err = -EINVAL;
1785 void *p = msg->front.iov_base;
1786 void *end = p + msg->front.iov_len;
1787 int from_mds, state;
1788
1789 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1790 goto bad;
1791 from_mds = le64_to_cpu(msg->hdr.src.name.num);
1792
1793 ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
1794 ceph_decode_64(&p, tid);
1795 ceph_decode_32(&p, next_mds);
1796 ceph_decode_32(&p, fwd_seq);
1797 ceph_decode_8(&p, must_resend);
1798
1799 WARN_ON(must_resend); /* shouldn't happen. */
1800
1801 mutex_lock(&mdsc->mutex);
1802 req = __lookup_request(mdsc, tid);
1803 if (!req) {
1804 dout("forward %llu dne\n", tid);
1805 goto out; /* dup reply? */
1806 }
1807
1808 state = mdsc->sessions[next_mds]->s_state;
1809 if (fwd_seq <= req->r_num_fwd) {
1810 dout("forward %llu to mds%d - old seq %d <= %d\n",
1811 tid, next_mds, req->r_num_fwd, fwd_seq);
1812 } else {
1813 /* resend. forward race not possible; mds would drop */
1814 dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
1815 req->r_num_fwd = fwd_seq;
1816 req->r_resend_mds = next_mds;
1817 put_request_session(req);
1818 __do_request(mdsc, req);
1819 }
1820 ceph_mdsc_put_request(req);
1821out:
1822 mutex_unlock(&mdsc->mutex);
1823 return;
1824
1825bad:
1826 pr_err("mdsc_handle_forward decode error err=%d\n", err);
1827}
1828
1829/*
1830 * handle a mds session control message
1831 */
1832static void handle_session(struct ceph_mds_session *session,
1833 struct ceph_msg *msg)
1834{
1835 struct ceph_mds_client *mdsc = session->s_mdsc;
1836 u32 op;
1837 u64 seq;
1838 int mds;
1839 struct ceph_mds_session_head *h = msg->front.iov_base;
1840 int wake = 0;
1841
1842 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
1843 return;
1844 mds = le64_to_cpu(msg->hdr.src.name.num);
1845
1846 /* decode */
1847 if (msg->front.iov_len != sizeof(*h))
1848 goto bad;
1849 op = le32_to_cpu(h->op);
1850 seq = le64_to_cpu(h->seq);
1851
1852 mutex_lock(&mdsc->mutex);
1853 /* FIXME: this ttl calculation is generous */
1854 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
1855 mutex_unlock(&mdsc->mutex);
1856
1857 mutex_lock(&session->s_mutex);
1858
1859 dout("handle_session mds%d %s %p state %s seq %llu\n",
1860 mds, ceph_session_op_name(op), session,
1861 session_state_name(session->s_state), seq);
1862
1863 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
1864 session->s_state = CEPH_MDS_SESSION_OPEN;
1865 pr_info("mds%d came back\n", session->s_mds);
1866 }
1867
1868 switch (op) {
1869 case CEPH_SESSION_OPEN:
1870 session->s_state = CEPH_MDS_SESSION_OPEN;
1871 renewed_caps(mdsc, session, 0);
1872 wake = 1;
1873 if (mdsc->stopping)
1874 __close_session(mdsc, session);
1875 break;
1876
1877 case CEPH_SESSION_RENEWCAPS:
1878 if (session->s_renew_seq == seq)
1879 renewed_caps(mdsc, session, 1);
1880 break;
1881
1882 case CEPH_SESSION_CLOSE:
1883 unregister_session(mdsc, mds);
1884 remove_session_caps(session);
1885 wake = 1; /* for good measure */
1886 complete(&mdsc->session_close_waiters);
1887 kick_requests(mdsc, mds, 0); /* cur only */
1888 break;
1889
1890 case CEPH_SESSION_STALE:
1891 pr_info("mds%d caps went stale, renewing\n",
1892 session->s_mds);
1893 spin_lock(&session->s_cap_lock);
1894 session->s_cap_gen++;
1895 session->s_cap_ttl = 0;
1896 spin_unlock(&session->s_cap_lock);
1897 send_renew_caps(mdsc, session);
1898 break;
1899
1900 case CEPH_SESSION_RECALL_STATE:
1901 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
1902 break;
1903
1904 default:
1905 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
1906 WARN_ON(1);
1907 }
1908
1909 mutex_unlock(&session->s_mutex);
1910 if (wake) {
1911 mutex_lock(&mdsc->mutex);
1912 __wake_requests(mdsc, &session->s_waiting);
1913 mutex_unlock(&mdsc->mutex);
1914 }
1915 return;
1916
1917bad:
1918 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
1919 (int)msg->front.iov_len);
1920 return;
1921}
1922
1923
1924/*
1925 * called under session->mutex.
1926 */
1927static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
1928 struct ceph_mds_session *session)
1929{
1930 struct ceph_mds_request *req, *nreq;
1931 int err;
1932
1933 dout("replay_unsafe_requests mds%d\n", session->s_mds);
1934
1935 mutex_lock(&mdsc->mutex);
1936 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
1937 err = __prepare_send_request(mdsc, req, session->s_mds);
1938 if (!err) {
1939 ceph_msg_get(req->r_request);
1940 ceph_con_send(&session->s_con, req->r_request);
1941 }
1942 }
1943 mutex_unlock(&mdsc->mutex);
1944}
1945
1946/*
1947 * Encode information about a cap for a reconnect with the MDS.
1948 */
1949struct encode_caps_data {
1950 void **pp;
1951 void *end;
1952 int *num_caps;
1953};
1954
1955static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
1956 void *arg)
1957{
1958 struct ceph_mds_cap_reconnect *rec;
1959 struct ceph_inode_info *ci;
1960 struct encode_caps_data *data = (struct encode_caps_data *)arg;
1961 void *p = *(data->pp);
1962 void *end = data->end;
1963 char *path;
1964 int pathlen, err;
1965 u64 pathbase;
1966 struct dentry *dentry;
1967
1968 ci = cap->ci;
1969
1970 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
1971 inode, ceph_vinop(inode), cap, cap->cap_id,
1972 ceph_cap_string(cap->issued));
1973 ceph_decode_need(&p, end, sizeof(u64), needmore);
1974 ceph_encode_64(&p, ceph_ino(inode));
1975
1976 dentry = d_find_alias(inode);
1977 if (dentry) {
1978 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
1979 if (IS_ERR(path)) {
1980 err = PTR_ERR(path);
1981 BUG_ON(err);
1982 }
1983 } else {
1984 path = NULL;
1985 pathlen = 0;
1986 }
1987 ceph_decode_need(&p, end, pathlen+4, needmore);
1988 ceph_encode_string(&p, end, path, pathlen);
1989
1990 ceph_decode_need(&p, end, sizeof(*rec), needmore);
1991 rec = p;
1992 p += sizeof(*rec);
1993 BUG_ON(p > end);
1994 spin_lock(&inode->i_lock);
1995 cap->seq = 0; /* reset cap seq */
1996 cap->issue_seq = 0; /* and issue_seq */
1997 rec->cap_id = cpu_to_le64(cap->cap_id);
1998 rec->pathbase = cpu_to_le64(pathbase);
1999 rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2000 rec->issued = cpu_to_le32(cap->issued);
2001 rec->size = cpu_to_le64(inode->i_size);
2002 ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
2003 ceph_encode_timespec(&rec->atime, &inode->i_atime);
2004 rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2005 spin_unlock(&inode->i_lock);
2006
2007 kfree(path);
2008 dput(dentry);
2009 (*data->num_caps)++;
2010 *(data->pp) = p;
2011 return 0;
2012needmore:
2013 return -ENOSPC;
2014}
2015
2016
2017/*
2018 * If an MDS fails and recovers, clients need to reconnect in order to
2019 * reestablish shared state. This includes all caps issued through
2020 * this session _and_ the snap_realm hierarchy. Because it's not
2021 * clear which snap realms the mds cares about, we send everything we
2022 * know about.. that ensures we'll then get any new info the
2023 * recovering MDS might have.
2024 *
2025 * This is a relatively heavyweight operation, but it's rare.
2026 *
2027 * called with mdsc->mutex held.
2028 */
2029static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2030{
2031 struct ceph_mds_session *session;
2032 struct ceph_msg *reply;
2033 int newlen, len = 4 + 1;
2034 void *p, *end;
2035 int err;
2036 int num_caps, num_realms = 0;
2037 int got;
2038 u64 next_snap_ino = 0;
2039 __le32 *pnum_caps, *pnum_realms;
2040 struct encode_caps_data iter_args;
2041
2042 pr_info("reconnect to recovering mds%d\n", mds);
2043
2044 /* find session */
2045 session = __ceph_lookup_mds_session(mdsc, mds);
2046 mutex_unlock(&mdsc->mutex); /* drop lock for duration */
2047
2048 if (session) {
2049 mutex_lock(&session->s_mutex);
2050
2051 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2052 session->s_seq = 0;
2053
2054 ceph_con_open(&session->s_con,
2055 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2056
2057 /* replay unsafe requests */
2058 replay_unsafe_requests(mdsc, session);
2059
2060 /* estimate needed space */
2061 len += session->s_nr_caps *
2062 (100+sizeof(struct ceph_mds_cap_reconnect));
2063 pr_info("estimating i need %d bytes for %d caps\n",
2064 len, session->s_nr_caps);
2065 } else {
2066 dout("no session for mds%d, will send short reconnect\n",
2067 mds);
2068 }
2069
2070 down_read(&mdsc->snap_rwsem);
2071
2072retry:
2073 /* build reply */
2074 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
2075 if (IS_ERR(reply)) {
2076 err = PTR_ERR(reply);
2077 pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
2078 len, mds);
2079 goto out;
2080 }
2081 p = reply->front.iov_base;
2082 end = p + len;
2083
2084 if (!session) {
2085 ceph_encode_8(&p, 1); /* session was closed */
2086 ceph_encode_32(&p, 0);
2087 goto send;
2088 }
2089 dout("session %p state %s\n", session,
2090 session_state_name(session->s_state));
2091
2092 /* traverse this session's caps */
2093 ceph_encode_8(&p, 0);
2094 pnum_caps = p;
2095 ceph_encode_32(&p, session->s_nr_caps);
2096 num_caps = 0;
2097
2098 iter_args.pp = &p;
2099 iter_args.end = end;
2100 iter_args.num_caps = &num_caps;
2101 err = iterate_session_caps(session, encode_caps_cb, &iter_args);
2102 if (err == -ENOSPC)
2103 goto needmore;
2104 if (err < 0)
2105 goto out;
2106 *pnum_caps = cpu_to_le32(num_caps);
2107
2108 /*
2109 * snaprealms. we provide mds with the ino, seq (version), and
2110 * parent for all of our realms. If the mds has any newer info,
2111 * it will tell us.
2112 */
2113 next_snap_ino = 0;
2114 /* save some space for the snaprealm count */
2115 pnum_realms = p;
2116 ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
2117 p += sizeof(*pnum_realms);
2118 num_realms = 0;
2119 while (1) {
2120 struct ceph_snap_realm *realm;
2121 struct ceph_mds_snaprealm_reconnect *sr_rec;
2122 got = radix_tree_gang_lookup(&mdsc->snap_realms,
2123 (void **)&realm, next_snap_ino, 1);
2124 if (!got)
2125 break;
2126
2127 dout(" adding snap realm %llx seq %lld parent %llx\n",
2128 realm->ino, realm->seq, realm->parent_ino);
2129 ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
2130 sr_rec = p;
2131 sr_rec->ino = cpu_to_le64(realm->ino);
2132 sr_rec->seq = cpu_to_le64(realm->seq);
2133 sr_rec->parent = cpu_to_le64(realm->parent_ino);
2134 p += sizeof(*sr_rec);
2135 num_realms++;
2136 next_snap_ino = realm->ino + 1;
2137 }
2138 *pnum_realms = cpu_to_le32(num_realms);
2139
2140send:
2141 reply->front.iov_len = p - reply->front.iov_base;
2142 reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
2143 dout("final len was %u (guessed %d)\n",
2144 (unsigned)reply->front.iov_len, len);
2145 ceph_con_send(&session->s_con, reply);
2146
2147 if (session) {
2148 session->s_state = CEPH_MDS_SESSION_OPEN;
2149 __wake_requests(mdsc, &session->s_waiting);
2150 }
2151
2152out:
2153 up_read(&mdsc->snap_rwsem);
2154 if (session) {
2155 mutex_unlock(&session->s_mutex);
2156 ceph_put_mds_session(session);
2157 }
2158 mutex_lock(&mdsc->mutex);
2159 return;
2160
2161needmore:
2162 /*
2163 * we need a larger buffer. this doesn't very accurately
2164 * factor in snap realms, but it's safe.
2165 */
2166 num_caps += num_realms;
2167 newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
2168 pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
2169 len, num_caps, session->s_nr_caps, newlen);
2170 len = newlen;
2171 ceph_msg_put(reply);
2172 goto retry;
2173}
2174
2175
2176/*
2177 * compare old and new mdsmaps, kicking requests
2178 * and closing out old connections as necessary
2179 *
2180 * called under mdsc->mutex.
2181 */
2182static void check_new_map(struct ceph_mds_client *mdsc,
2183 struct ceph_mdsmap *newmap,
2184 struct ceph_mdsmap *oldmap)
2185{
2186 int i;
2187 int oldstate, newstate;
2188 struct ceph_mds_session *s;
2189
2190 dout("check_new_map new %u old %u\n",
2191 newmap->m_epoch, oldmap->m_epoch);
2192
2193 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2194 if (mdsc->sessions[i] == NULL)
2195 continue;
2196 s = mdsc->sessions[i];
2197 oldstate = ceph_mdsmap_get_state(oldmap, i);
2198 newstate = ceph_mdsmap_get_state(newmap, i);
2199
2200 dout("check_new_map mds%d state %s -> %s (session %s)\n",
2201 i, ceph_mds_state_name(oldstate),
2202 ceph_mds_state_name(newstate),
2203 session_state_name(s->s_state));
2204
2205 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2206 ceph_mdsmap_get_addr(newmap, i),
2207 sizeof(struct ceph_entity_addr))) {
2208 if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2209 /* the session never opened, just close it
2210 * out now */
2211 __wake_requests(mdsc, &s->s_waiting);
2212 unregister_session(mdsc, i);
2213 } else {
2214 /* just close it */
2215 mutex_unlock(&mdsc->mutex);
2216 mutex_lock(&s->s_mutex);
2217 mutex_lock(&mdsc->mutex);
2218 ceph_con_close(&s->s_con);
2219 mutex_unlock(&s->s_mutex);
2220 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2221 }
2222
2223 /* kick any requests waiting on the recovering mds */
2224 kick_requests(mdsc, i, 1);
2225 } else if (oldstate == newstate) {
2226 continue; /* nothing new with this mds */
2227 }
2228
2229 /*
2230 * send reconnect?
2231 */
2232 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2233 newstate >= CEPH_MDS_STATE_RECONNECT)
2234 send_mds_reconnect(mdsc, i);
2235
2236 /*
2237 * kick requests on any mds that has gone active.
2238 *
2239 * kick requests on cur or forwarder: we may have sent
2240 * the request to mds1, mds1 told us it forwarded it
2241 * to mds2, but then we learn mds1 failed and can't be
2242 * sure it successfully forwarded our request before
2243 * it died.
2244 */
2245 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2246 newstate >= CEPH_MDS_STATE_ACTIVE) {
2247 kick_requests(mdsc, i, 1);
2248 ceph_kick_flushing_caps(mdsc, s);
2249 }
2250 }
2251}
2252
2253
2254
2255/*
2256 * leases
2257 */
2258
2259/*
2260 * caller must hold session s_mutex, dentry->d_lock
2261 */
2262void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2263{
2264 struct ceph_dentry_info *di = ceph_dentry(dentry);
2265
2266 ceph_put_mds_session(di->lease_session);
2267 di->lease_session = NULL;
2268}
2269
2270static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2271{
2272 struct super_block *sb = mdsc->client->sb;
2273 struct inode *inode;
2274 struct ceph_mds_session *session;
2275 struct ceph_inode_info *ci;
2276 struct dentry *parent, *dentry;
2277 struct ceph_dentry_info *di;
2278 int mds;
2279 struct ceph_mds_lease *h = msg->front.iov_base;
2280 struct ceph_vino vino;
2281 int mask;
2282 struct qstr dname;
2283 int release = 0;
2284
2285 if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
2286 return;
2287 mds = le64_to_cpu(msg->hdr.src.name.num);
2288 dout("handle_lease from mds%d\n", mds);
2289
2290 /* decode */
2291 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2292 goto bad;
2293 vino.ino = le64_to_cpu(h->ino);
2294 vino.snap = CEPH_NOSNAP;
2295 mask = le16_to_cpu(h->mask);
2296 dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2297 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2298 if (dname.len != get_unaligned_le32(h+1))
2299 goto bad;
2300
2301 /* find session */
2302 mutex_lock(&mdsc->mutex);
2303 session = __ceph_lookup_mds_session(mdsc, mds);
2304 mutex_unlock(&mdsc->mutex);
2305 if (!session) {
2306 pr_err("handle_lease got lease but no session mds%d\n", mds);
2307 return;
2308 }
2309
2310 mutex_lock(&session->s_mutex);
2311 session->s_seq++;
2312
2313 /* lookup inode */
2314 inode = ceph_find_inode(sb, vino);
2315 dout("handle_lease '%s', mask %d, ino %llx %p\n",
2316 ceph_lease_op_name(h->action), mask, vino.ino, inode);
2317 if (inode == NULL) {
2318 dout("handle_lease no inode %llx\n", vino.ino);
2319 goto release;
2320 }
2321 ci = ceph_inode(inode);
2322
2323 /* dentry */
2324 parent = d_find_alias(inode);
2325 if (!parent) {
2326 dout("no parent dentry on inode %p\n", inode);
2327 WARN_ON(1);
2328 goto release; /* hrm... */
2329 }
2330 dname.hash = full_name_hash(dname.name, dname.len);
2331 dentry = d_lookup(parent, &dname);
2332 dput(parent);
2333 if (!dentry)
2334 goto release;
2335
2336 spin_lock(&dentry->d_lock);
2337 di = ceph_dentry(dentry);
2338 switch (h->action) {
2339 case CEPH_MDS_LEASE_REVOKE:
2340 if (di && di->lease_session == session) {
2341 h->seq = cpu_to_le32(di->lease_seq);
2342 __ceph_mdsc_drop_dentry_lease(dentry);
2343 }
2344 release = 1;
2345 break;
2346
2347 case CEPH_MDS_LEASE_RENEW:
2348 if (di && di->lease_session == session &&
2349 di->lease_gen == session->s_cap_gen &&
2350 di->lease_renew_from &&
2351 di->lease_renew_after == 0) {
2352 unsigned long duration =
2353 le32_to_cpu(h->duration_ms) * HZ / 1000;
2354
2355 di->lease_seq = le32_to_cpu(h->seq);
2356 dentry->d_time = di->lease_renew_from + duration;
2357 di->lease_renew_after = di->lease_renew_from +
2358 (duration >> 1);
2359 di->lease_renew_from = 0;
2360 }
2361 break;
2362 }
2363 spin_unlock(&dentry->d_lock);
2364 dput(dentry);
2365
2366 if (!release)
2367 goto out;
2368
2369release:
2370 /* let's just reuse the same message */
2371 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2372 ceph_msg_get(msg);
2373 ceph_con_send(&session->s_con, msg);
2374
2375out:
2376 iput(inode);
2377 mutex_unlock(&session->s_mutex);
2378 ceph_put_mds_session(session);
2379 return;
2380
2381bad:
2382 pr_err("corrupt lease message\n");
2383}
2384
2385void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2386 struct inode *inode,
2387 struct dentry *dentry, char action,
2388 u32 seq)
2389{
2390 struct ceph_msg *msg;
2391 struct ceph_mds_lease *lease;
2392 int len = sizeof(*lease) + sizeof(u32);
2393 int dnamelen = 0;
2394
2395 dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2396 inode, dentry, ceph_lease_op_name(action), session->s_mds);
2397 dnamelen = dentry->d_name.len;
2398 len += dnamelen;
2399
2400 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
2401 if (IS_ERR(msg))
2402 return;
2403 lease = msg->front.iov_base;
2404 lease->action = action;
2405 lease->mask = cpu_to_le16(CEPH_LOCK_DN);
2406 lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2407 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2408 lease->seq = cpu_to_le32(seq);
2409 put_unaligned_le32(dnamelen, lease + 1);
2410 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2411
2412 /*
2413 * if this is a preemptive lease RELEASE, no need to
2414 * flush request stream, since the actual request will
2415 * soon follow.
2416 */
2417 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2418
2419 ceph_con_send(&session->s_con, msg);
2420}
2421
2422/*
2423 * Preemptively release a lease we expect to invalidate anyway.
2424 * Pass @inode always, @dentry is optional.
2425 */
2426void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2427 struct dentry *dentry, int mask)
2428{
2429 struct ceph_dentry_info *di;
2430 struct ceph_mds_session *session;
2431 u32 seq;
2432
2433 BUG_ON(inode == NULL);
2434 BUG_ON(dentry == NULL);
2435 BUG_ON(mask != CEPH_LOCK_DN);
2436
2437 /* is dentry lease valid? */
2438 spin_lock(&dentry->d_lock);
2439 di = ceph_dentry(dentry);
2440 if (!di || !di->lease_session ||
2441 di->lease_session->s_mds < 0 ||
2442 di->lease_gen != di->lease_session->s_cap_gen ||
2443 !time_before(jiffies, dentry->d_time)) {
2444 dout("lease_release inode %p dentry %p -- "
2445 "no lease on %d\n",
2446 inode, dentry, mask);
2447 spin_unlock(&dentry->d_lock);
2448 return;
2449 }
2450
2451 /* we do have a lease on this dentry; note mds and seq */
2452 session = ceph_get_mds_session(di->lease_session);
2453 seq = di->lease_seq;
2454 __ceph_mdsc_drop_dentry_lease(dentry);
2455 spin_unlock(&dentry->d_lock);
2456
2457 dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2458 inode, dentry, mask, session->s_mds);
2459 ceph_mdsc_lease_send_msg(session, inode, dentry,
2460 CEPH_MDS_LEASE_RELEASE, seq);
2461 ceph_put_mds_session(session);
2462}
2463
2464/*
2465 * drop all leases (and dentry refs) in preparation for umount
2466 */
2467static void drop_leases(struct ceph_mds_client *mdsc)
2468{
2469 int i;
2470
2471 dout("drop_leases\n");
2472 mutex_lock(&mdsc->mutex);
2473 for (i = 0; i < mdsc->max_sessions; i++) {
2474 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2475 if (!s)
2476 continue;
2477 mutex_unlock(&mdsc->mutex);
2478 mutex_lock(&s->s_mutex);
2479 mutex_unlock(&s->s_mutex);
2480 ceph_put_mds_session(s);
2481 mutex_lock(&mdsc->mutex);
2482 }
2483 mutex_unlock(&mdsc->mutex);
2484}
2485
2486
2487
2488/*
2489 * delayed work -- periodically trim expired leases, renew caps with mds
2490 */
2491static void schedule_delayed(struct ceph_mds_client *mdsc)
2492{
2493 int delay = 5;
2494 unsigned hz = round_jiffies_relative(HZ * delay);
2495 schedule_delayed_work(&mdsc->delayed_work, hz);
2496}
2497
2498static void delayed_work(struct work_struct *work)
2499{
2500 int i;
2501 struct ceph_mds_client *mdsc =
2502 container_of(work, struct ceph_mds_client, delayed_work.work);
2503 int renew_interval;
2504 int renew_caps;
2505
2506 dout("mdsc delayed_work\n");
2507 ceph_check_delayed_caps(mdsc, 0);
2508
2509 mutex_lock(&mdsc->mutex);
2510 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2511 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2512 mdsc->last_renew_caps);
2513 if (renew_caps)
2514 mdsc->last_renew_caps = jiffies;
2515
2516 for (i = 0; i < mdsc->max_sessions; i++) {
2517 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2518 if (s == NULL)
2519 continue;
2520 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2521 dout("resending session close request for mds%d\n",
2522 s->s_mds);
2523 request_close_session(mdsc, s);
2524 ceph_put_mds_session(s);
2525 continue;
2526 }
2527 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2528 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2529 s->s_state = CEPH_MDS_SESSION_HUNG;
2530 pr_info("mds%d hung\n", s->s_mds);
2531 }
2532 }
2533 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2534 /* this mds is failed or recovering, just wait */
2535 ceph_put_mds_session(s);
2536 continue;
2537 }
2538 mutex_unlock(&mdsc->mutex);
2539
2540 mutex_lock(&s->s_mutex);
2541 if (renew_caps)
2542 send_renew_caps(mdsc, s);
2543 else
2544 ceph_con_keepalive(&s->s_con);
2545 add_cap_releases(mdsc, s, -1);
2546 send_cap_releases(mdsc, s);
2547 mutex_unlock(&s->s_mutex);
2548 ceph_put_mds_session(s);
2549
2550 mutex_lock(&mdsc->mutex);
2551 }
2552 mutex_unlock(&mdsc->mutex);
2553
2554 schedule_delayed(mdsc);
2555}
2556
2557
2558void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2559{
2560 mdsc->client = client;
2561 mutex_init(&mdsc->mutex);
2562 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2563 init_completion(&mdsc->safe_umount_waiters);
2564 init_completion(&mdsc->session_close_waiters);
2565 INIT_LIST_HEAD(&mdsc->waiting_for_map);
2566 mdsc->sessions = NULL;
2567 mdsc->max_sessions = 0;
2568 mdsc->stopping = 0;
2569 init_rwsem(&mdsc->snap_rwsem);
2570 INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
2571 INIT_LIST_HEAD(&mdsc->snap_empty);
2572 spin_lock_init(&mdsc->snap_empty_lock);
2573 mdsc->last_tid = 0;
2574 INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
2575 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2576 mdsc->last_renew_caps = jiffies;
2577 INIT_LIST_HEAD(&mdsc->cap_delay_list);
2578 spin_lock_init(&mdsc->cap_delay_lock);
2579 INIT_LIST_HEAD(&mdsc->snap_flush_list);
2580 spin_lock_init(&mdsc->snap_flush_lock);
2581 mdsc->cap_flush_seq = 0;
2582 INIT_LIST_HEAD(&mdsc->cap_dirty);
2583 mdsc->num_cap_flushing = 0;
2584 spin_lock_init(&mdsc->cap_dirty_lock);
2585 init_waitqueue_head(&mdsc->cap_flushing_wq);
2586 spin_lock_init(&mdsc->dentry_lru_lock);
2587 INIT_LIST_HEAD(&mdsc->dentry_lru);
2588}
2589
2590/*
2591 * Wait for safe replies on open mds requests. If we time out, drop
2592 * all requests from the tree to avoid dangling dentry refs.
2593 */
2594static void wait_requests(struct ceph_mds_client *mdsc)
2595{
2596 struct ceph_mds_request *req;
2597 struct ceph_client *client = mdsc->client;
2598
2599 mutex_lock(&mdsc->mutex);
2600 if (__get_oldest_tid(mdsc)) {
2601 mutex_unlock(&mdsc->mutex);
2602 dout("wait_requests waiting for requests\n");
2603 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2604 client->mount_args.mount_timeout * HZ);
2605 mutex_lock(&mdsc->mutex);
2606
2607 /* tear down remaining requests */
2608 while (radix_tree_gang_lookup(&mdsc->request_tree,
2609 (void **)&req, 0, 1)) {
2610 dout("wait_requests timed out on tid %llu\n",
2611 req->r_tid);
2612 radix_tree_delete(&mdsc->request_tree, req->r_tid);
2613 ceph_mdsc_put_request(req);
2614 }
2615 }
2616 mutex_unlock(&mdsc->mutex);
2617 dout("wait_requests done\n");
2618}
2619
2620/*
2621 * called before mount is ro, and before dentries are torn down.
2622 * (hmm, does this still race with new lookups?)
2623 */
2624void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2625{
2626 dout("pre_umount\n");
2627 mdsc->stopping = 1;
2628
2629 drop_leases(mdsc);
2630 ceph_check_delayed_caps(mdsc, 1);
2631 wait_requests(mdsc);
2632}
2633
2634/*
2635 * wait for all write mds requests to flush.
2636 */
2637static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2638{
2639 struct ceph_mds_request *req;
2640 u64 next_tid = 0;
2641 int got;
2642
2643 mutex_lock(&mdsc->mutex);
2644 dout("wait_unsafe_requests want %lld\n", want_tid);
2645 while (1) {
2646 got = radix_tree_gang_lookup(&mdsc->request_tree, (void **)&req,
2647 next_tid, 1);
2648 if (!got)
2649 break;
2650 if (req->r_tid > want_tid)
2651 break;
2652
2653 next_tid = req->r_tid + 1;
2654 if ((req->r_op & CEPH_MDS_OP_WRITE) == 0)
2655 continue; /* not a write op */
2656
2657 ceph_mdsc_get_request(req);
2658 mutex_unlock(&mdsc->mutex);
2659 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
2660 req->r_tid, want_tid);
2661 wait_for_completion(&req->r_safe_completion);
2662 mutex_lock(&mdsc->mutex);
2663 ceph_mdsc_put_request(req);
2664 }
2665 mutex_unlock(&mdsc->mutex);
2666 dout("wait_unsafe_requests done\n");
2667}
2668
2669void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2670{
2671 u64 want_tid, want_flush;
2672
2673 dout("sync\n");
2674 mutex_lock(&mdsc->mutex);
2675 want_tid = mdsc->last_tid;
2676 want_flush = mdsc->cap_flush_seq;
2677 mutex_unlock(&mdsc->mutex);
2678 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
2679
2680 ceph_check_delayed_caps(mdsc, 1);
2681
2682 wait_unsafe_requests(mdsc, want_tid);
2683 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
2684}
2685
2686
2687/*
2688 * called after sb is ro.
2689 */
2690void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
2691{
2692 struct ceph_mds_session *session;
2693 int i;
2694 int n;
2695 struct ceph_client *client = mdsc->client;
2696 unsigned long started, timeout = client->mount_args.mount_timeout * HZ;
2697
2698 dout("close_sessions\n");
2699
2700 mutex_lock(&mdsc->mutex);
2701
2702 /* close sessions */
2703 started = jiffies;
2704 while (time_before(jiffies, started + timeout)) {
2705 dout("closing sessions\n");
2706 n = 0;
2707 for (i = 0; i < mdsc->max_sessions; i++) {
2708 session = __ceph_lookup_mds_session(mdsc, i);
2709 if (!session)
2710 continue;
2711 mutex_unlock(&mdsc->mutex);
2712 mutex_lock(&session->s_mutex);
2713 __close_session(mdsc, session);
2714 mutex_unlock(&session->s_mutex);
2715 ceph_put_mds_session(session);
2716 mutex_lock(&mdsc->mutex);
2717 n++;
2718 }
2719 if (n == 0)
2720 break;
2721
2722 if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
2723 break;
2724
2725 dout("waiting for sessions to close\n");
2726 mutex_unlock(&mdsc->mutex);
2727 wait_for_completion_timeout(&mdsc->session_close_waiters,
2728 timeout);
2729 mutex_lock(&mdsc->mutex);
2730 }
2731
2732 /* tear down remaining sessions */
2733 for (i = 0; i < mdsc->max_sessions; i++) {
2734 if (mdsc->sessions[i]) {
2735 session = get_session(mdsc->sessions[i]);
2736 unregister_session(mdsc, i);
2737 mutex_unlock(&mdsc->mutex);
2738 mutex_lock(&session->s_mutex);
2739 remove_session_caps(session);
2740 mutex_unlock(&session->s_mutex);
2741 ceph_put_mds_session(session);
2742 mutex_lock(&mdsc->mutex);
2743 }
2744 }
2745
2746 WARN_ON(!list_empty(&mdsc->cap_delay_list));
2747
2748 mutex_unlock(&mdsc->mutex);
2749
2750 ceph_cleanup_empty_realms(mdsc);
2751
2752 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2753
2754 dout("stopped\n");
2755}
2756
2757void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2758{
2759 dout("stop\n");
2760 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2761 if (mdsc->mdsmap)
2762 ceph_mdsmap_destroy(mdsc->mdsmap);
2763 kfree(mdsc->sessions);
2764}
2765
2766
2767/*
2768 * handle mds map update.
2769 */
2770void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2771{
2772 u32 epoch;
2773 u32 maplen;
2774 void *p = msg->front.iov_base;
2775 void *end = p + msg->front.iov_len;
2776 struct ceph_mdsmap *newmap, *oldmap;
2777 struct ceph_fsid fsid;
2778 int err = -EINVAL;
2779
2780 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
2781 ceph_decode_copy(&p, &fsid, sizeof(fsid));
2782 if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
2783 pr_err("got mdsmap with wrong fsid\n");
2784 return;
2785 }
2786 ceph_decode_32(&p, epoch);
2787 ceph_decode_32(&p, maplen);
2788 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
2789
2790 /* do we need it? */
2791 ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
2792 mutex_lock(&mdsc->mutex);
2793 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
2794 dout("handle_map epoch %u <= our %u\n",
2795 epoch, mdsc->mdsmap->m_epoch);
2796 mutex_unlock(&mdsc->mutex);
2797 return;
2798 }
2799
2800 newmap = ceph_mdsmap_decode(&p, end);
2801 if (IS_ERR(newmap)) {
2802 err = PTR_ERR(newmap);
2803 goto bad_unlock;
2804 }
2805
2806 /* swap into place */
2807 if (mdsc->mdsmap) {
2808 oldmap = mdsc->mdsmap;
2809 mdsc->mdsmap = newmap;
2810 check_new_map(mdsc, newmap, oldmap);
2811 ceph_mdsmap_destroy(oldmap);
2812 } else {
2813 mdsc->mdsmap = newmap; /* first mds map */
2814 }
2815 mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
2816
2817 __wake_requests(mdsc, &mdsc->waiting_for_map);
2818
2819 mutex_unlock(&mdsc->mutex);
2820 schedule_delayed(mdsc);
2821 return;
2822
2823bad_unlock:
2824 mutex_unlock(&mdsc->mutex);
2825bad:
2826 pr_err("error decoding mdsmap %d\n", err);
2827 return;
2828}
2829
2830static struct ceph_connection *con_get(struct ceph_connection *con)
2831{
2832 struct ceph_mds_session *s = con->private;
2833
2834 if (get_session(s)) {
2835 dout("mdsc con_get %p %d -> %d\n", s,
2836 atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
2837 return con;
2838 }
2839 dout("mdsc con_get %p FAIL\n", s);
2840 return NULL;
2841}
2842
2843static void con_put(struct ceph_connection *con)
2844{
2845 struct ceph_mds_session *s = con->private;
2846
2847 dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
2848 atomic_read(&s->s_ref) - 1);
2849 ceph_put_mds_session(s);
2850}
2851
2852/*
2853 * if the client is unresponsive for long enough, the mds will kill
2854 * the session entirely.
2855 */
2856static void peer_reset(struct ceph_connection *con)
2857{
2858 struct ceph_mds_session *s = con->private;
2859
2860 pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n",
2861 s->s_mds);
2862}
2863
2864static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2865{
2866 struct ceph_mds_session *s = con->private;
2867 struct ceph_mds_client *mdsc = s->s_mdsc;
2868 int type = le16_to_cpu(msg->hdr.type);
2869
2870 switch (type) {
2871 case CEPH_MSG_MDS_MAP:
2872 ceph_mdsc_handle_map(mdsc, msg);
2873 break;
2874 case CEPH_MSG_CLIENT_SESSION:
2875 handle_session(s, msg);
2876 break;
2877 case CEPH_MSG_CLIENT_REPLY:
2878 handle_reply(s, msg);
2879 break;
2880 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2881 handle_forward(mdsc, msg);
2882 break;
2883 case CEPH_MSG_CLIENT_CAPS:
2884 ceph_handle_caps(s, msg);
2885 break;
2886 case CEPH_MSG_CLIENT_SNAP:
2887 ceph_handle_snap(mdsc, msg);
2888 break;
2889 case CEPH_MSG_CLIENT_LEASE:
2890 handle_lease(mdsc, msg);
2891 break;
2892
2893 default:
2894 pr_err("received unknown message type %d %s\n", type,
2895 ceph_msg_type_name(type));
2896 }
2897 ceph_msg_put(msg);
2898}
2899
2900const static struct ceph_connection_operations mds_con_ops = {
2901 .get = con_get,
2902 .put = con_put,
2903 .dispatch = dispatch,
2904 .peer_reset = peer_reset,
2905 .alloc_msg = ceph_alloc_msg,
2906 .alloc_middle = ceph_alloc_middle,
2907};
2908
2909
2910
2911
2912/* eof */