aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/mon_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mon_client.c')
-rw-r--r--fs/ceph/mon_client.c264
1 files changed, 155 insertions, 109 deletions
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 8fdc011ca956..07a539906e67 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -28,7 +28,7 @@
28 * resend any outstanding requests. 28 * resend any outstanding requests.
29 */ 29 */
30 30
31const static struct ceph_connection_operations mon_con_ops; 31static const struct ceph_connection_operations mon_con_ops;
32 32
33static int __validate_auth(struct ceph_mon_client *monc); 33static int __validate_auth(struct ceph_mon_client *monc);
34 34
@@ -104,6 +104,7 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
104 monc->pending_auth = 1; 104 monc->pending_auth = 1;
105 monc->m_auth->front.iov_len = len; 105 monc->m_auth->front.iov_len = len;
106 monc->m_auth->hdr.front_len = cpu_to_le32(len); 106 monc->m_auth->hdr.front_len = cpu_to_le32(len);
107 ceph_con_revoke(monc->con, monc->m_auth);
107 ceph_msg_get(monc->m_auth); /* keep our ref */ 108 ceph_msg_get(monc->m_auth); /* keep our ref */
108 ceph_con_send(monc->con, monc->m_auth); 109 ceph_con_send(monc->con, monc->m_auth);
109} 110}
@@ -187,16 +188,12 @@ static void __send_subscribe(struct ceph_mon_client *monc)
187 monc->want_next_osdmap); 188 monc->want_next_osdmap);
188 if ((__sub_expired(monc) && !monc->sub_sent) || 189 if ((__sub_expired(monc) && !monc->sub_sent) ||
189 monc->want_next_osdmap == 1) { 190 monc->want_next_osdmap == 1) {
190 struct ceph_msg *msg; 191 struct ceph_msg *msg = monc->m_subscribe;
191 struct ceph_mon_subscribe_item *i; 192 struct ceph_mon_subscribe_item *i;
192 void *p, *end; 193 void *p, *end;
193 194
194 msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, 0, 0, NULL);
195 if (!msg)
196 return;
197
198 p = msg->front.iov_base; 195 p = msg->front.iov_base;
199 end = p + msg->front.iov_len; 196 end = p + msg->front_max;
200 197
201 dout("__send_subscribe to 'mdsmap' %u+\n", 198 dout("__send_subscribe to 'mdsmap' %u+\n",
202 (unsigned)monc->have_mdsmap); 199 (unsigned)monc->have_mdsmap);
@@ -226,7 +223,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
226 223
227 msg->front.iov_len = p - msg->front.iov_base; 224 msg->front.iov_len = p - msg->front.iov_base;
228 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 225 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
229 ceph_con_send(monc->con, msg); 226 ceph_con_revoke(monc->con, msg);
227 ceph_con_send(monc->con, ceph_msg_get(msg));
230 228
231 monc->sub_sent = jiffies | 1; /* never 0 */ 229 monc->sub_sent = jiffies | 1; /* never 0 */
232 } 230 }
@@ -353,14 +351,14 @@ out:
353/* 351/*
354 * statfs 352 * statfs
355 */ 353 */
356static struct ceph_mon_statfs_request *__lookup_statfs( 354static struct ceph_mon_generic_request *__lookup_generic_req(
357 struct ceph_mon_client *monc, u64 tid) 355 struct ceph_mon_client *monc, u64 tid)
358{ 356{
359 struct ceph_mon_statfs_request *req; 357 struct ceph_mon_generic_request *req;
360 struct rb_node *n = monc->statfs_request_tree.rb_node; 358 struct rb_node *n = monc->generic_request_tree.rb_node;
361 359
362 while (n) { 360 while (n) {
363 req = rb_entry(n, struct ceph_mon_statfs_request, node); 361 req = rb_entry(n, struct ceph_mon_generic_request, node);
364 if (tid < req->tid) 362 if (tid < req->tid)
365 n = n->rb_left; 363 n = n->rb_left;
366 else if (tid > req->tid) 364 else if (tid > req->tid)
@@ -371,16 +369,16 @@ static struct ceph_mon_statfs_request *__lookup_statfs(
371 return NULL; 369 return NULL;
372} 370}
373 371
374static void __insert_statfs(struct ceph_mon_client *monc, 372static void __insert_generic_request(struct ceph_mon_client *monc,
375 struct ceph_mon_statfs_request *new) 373 struct ceph_mon_generic_request *new)
376{ 374{
377 struct rb_node **p = &monc->statfs_request_tree.rb_node; 375 struct rb_node **p = &monc->generic_request_tree.rb_node;
378 struct rb_node *parent = NULL; 376 struct rb_node *parent = NULL;
379 struct ceph_mon_statfs_request *req = NULL; 377 struct ceph_mon_generic_request *req = NULL;
380 378
381 while (*p) { 379 while (*p) {
382 parent = *p; 380 parent = *p;
383 req = rb_entry(parent, struct ceph_mon_statfs_request, node); 381 req = rb_entry(parent, struct ceph_mon_generic_request, node);
384 if (new->tid < req->tid) 382 if (new->tid < req->tid)
385 p = &(*p)->rb_left; 383 p = &(*p)->rb_left;
386 else if (new->tid > req->tid) 384 else if (new->tid > req->tid)
@@ -390,113 +388,159 @@ static void __insert_statfs(struct ceph_mon_client *monc,
390 } 388 }
391 389
392 rb_link_node(&new->node, parent, p); 390 rb_link_node(&new->node, parent, p);
393 rb_insert_color(&new->node, &monc->statfs_request_tree); 391 rb_insert_color(&new->node, &monc->generic_request_tree);
392}
393
394static void release_generic_request(struct kref *kref)
395{
396 struct ceph_mon_generic_request *req =
397 container_of(kref, struct ceph_mon_generic_request, kref);
398
399 if (req->reply)
400 ceph_msg_put(req->reply);
401 if (req->request)
402 ceph_msg_put(req->request);
403
404 kfree(req);
405}
406
407static void put_generic_request(struct ceph_mon_generic_request *req)
408{
409 kref_put(&req->kref, release_generic_request);
410}
411
412static void get_generic_request(struct ceph_mon_generic_request *req)
413{
414 kref_get(&req->kref);
415}
416
417static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
418 struct ceph_msg_header *hdr,
419 int *skip)
420{
421 struct ceph_mon_client *monc = con->private;
422 struct ceph_mon_generic_request *req;
423 u64 tid = le64_to_cpu(hdr->tid);
424 struct ceph_msg *m;
425
426 mutex_lock(&monc->mutex);
427 req = __lookup_generic_req(monc, tid);
428 if (!req) {
429 dout("get_generic_reply %lld dne\n", tid);
430 *skip = 1;
431 m = NULL;
432 } else {
433 dout("get_generic_reply %lld got %p\n", tid, req->reply);
434 m = ceph_msg_get(req->reply);
435 /*
436 * we don't need to track the connection reading into
437 * this reply because we only have one open connection
438 * at a time, ever.
439 */
440 }
441 mutex_unlock(&monc->mutex);
442 return m;
394} 443}
395 444
396static void handle_statfs_reply(struct ceph_mon_client *monc, 445static void handle_statfs_reply(struct ceph_mon_client *monc,
397 struct ceph_msg *msg) 446 struct ceph_msg *msg)
398{ 447{
399 struct ceph_mon_statfs_request *req; 448 struct ceph_mon_generic_request *req;
400 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 449 struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
401 u64 tid; 450 u64 tid = le64_to_cpu(msg->hdr.tid);
402 451
403 if (msg->front.iov_len != sizeof(*reply)) 452 if (msg->front.iov_len != sizeof(*reply))
404 goto bad; 453 goto bad;
405 tid = le64_to_cpu(msg->hdr.tid);
406 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 454 dout("handle_statfs_reply %p tid %llu\n", msg, tid);
407 455
408 mutex_lock(&monc->mutex); 456 mutex_lock(&monc->mutex);
409 req = __lookup_statfs(monc, tid); 457 req = __lookup_generic_req(monc, tid);
410 if (req) { 458 if (req) {
411 *req->buf = reply->st; 459 *(struct ceph_statfs *)req->buf = reply->st;
412 req->result = 0; 460 req->result = 0;
461 get_generic_request(req);
413 } 462 }
414 mutex_unlock(&monc->mutex); 463 mutex_unlock(&monc->mutex);
415 if (req) 464 if (req) {
416 complete(&req->completion); 465 complete(&req->completion);
466 put_generic_request(req);
467 }
417 return; 468 return;
418 469
419bad: 470bad:
420 pr_err("corrupt statfs reply, no tid\n"); 471 pr_err("corrupt generic reply, no tid\n");
421 ceph_msg_dump(msg); 472 ceph_msg_dump(msg);
422} 473}
423 474
424/* 475/*
425 * (re)send a statfs request 476 * Do a synchronous statfs().
426 */ 477 */
427static int send_statfs(struct ceph_mon_client *monc, 478int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
428 struct ceph_mon_statfs_request *req)
429{ 479{
430 struct ceph_msg *msg; 480 struct ceph_mon_generic_request *req;
431 struct ceph_mon_statfs *h; 481 struct ceph_mon_statfs *h;
482 int err;
483
484 req = kzalloc(sizeof(*req), GFP_NOFS);
485 if (!req)
486 return -ENOMEM;
487
488 kref_init(&req->kref);
489 req->buf = buf;
490 init_completion(&req->completion);
491
492 err = -ENOMEM;
493 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
494 if (!req->request)
495 goto out;
496 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
497 if (!req->reply)
498 goto out;
432 499
433 dout("send_statfs tid %llu\n", req->tid); 500 /* fill out request */
434 msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); 501 h = req->request->front.iov_base;
435 if (IS_ERR(msg))
436 return PTR_ERR(msg);
437 req->request = msg;
438 msg->hdr.tid = cpu_to_le64(req->tid);
439 h = msg->front.iov_base;
440 h->monhdr.have_version = 0; 502 h->monhdr.have_version = 0;
441 h->monhdr.session_mon = cpu_to_le16(-1); 503 h->monhdr.session_mon = cpu_to_le16(-1);
442 h->monhdr.session_mon_tid = 0; 504 h->monhdr.session_mon_tid = 0;
443 h->fsid = monc->monmap->fsid; 505 h->fsid = monc->monmap->fsid;
444 ceph_con_send(monc->con, msg);
445 return 0;
446}
447
448/*
449 * Do a synchronous statfs().
450 */
451int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
452{
453 struct ceph_mon_statfs_request req;
454 int err;
455
456 req.buf = buf;
457 init_completion(&req.completion);
458
459 /* allocate memory for reply */
460 err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1);
461 if (err)
462 return err;
463 506
464 /* register request */ 507 /* register request */
465 mutex_lock(&monc->mutex); 508 mutex_lock(&monc->mutex);
466 req.tid = ++monc->last_tid; 509 req->tid = ++monc->last_tid;
467 req.last_attempt = jiffies; 510 req->request->hdr.tid = cpu_to_le64(req->tid);
468 req.delay = BASE_DELAY_INTERVAL; 511 __insert_generic_request(monc, req);
469 __insert_statfs(monc, &req); 512 monc->num_generic_requests++;
470 monc->num_statfs_requests++;
471 mutex_unlock(&monc->mutex); 513 mutex_unlock(&monc->mutex);
472 514
473 /* send request and wait */ 515 /* send request and wait */
474 err = send_statfs(monc, &req); 516 ceph_con_send(monc->con, ceph_msg_get(req->request));
475 if (!err) 517 err = wait_for_completion_interruptible(&req->completion);
476 err = wait_for_completion_interruptible(&req.completion);
477 518
478 mutex_lock(&monc->mutex); 519 mutex_lock(&monc->mutex);
479 rb_erase(&req.node, &monc->statfs_request_tree); 520 rb_erase(&req->node, &monc->generic_request_tree);
480 monc->num_statfs_requests--; 521 monc->num_generic_requests--;
481 ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1);
482 mutex_unlock(&monc->mutex); 522 mutex_unlock(&monc->mutex);
483 523
484 if (!err) 524 if (!err)
485 err = req.result; 525 err = req->result;
526
527out:
528 kref_put(&req->kref, release_generic_request);
486 return err; 529 return err;
487} 530}
488 531
489/* 532/*
490 * Resend pending statfs requests. 533 * Resend pending statfs requests.
491 */ 534 */
492static void __resend_statfs(struct ceph_mon_client *monc) 535static void __resend_generic_request(struct ceph_mon_client *monc)
493{ 536{
494 struct ceph_mon_statfs_request *req; 537 struct ceph_mon_generic_request *req;
495 struct rb_node *p; 538 struct rb_node *p;
496 539
497 for (p = rb_first(&monc->statfs_request_tree); p; p = rb_next(p)) { 540 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
498 req = rb_entry(p, struct ceph_mon_statfs_request, node); 541 req = rb_entry(p, struct ceph_mon_generic_request, node);
499 send_statfs(monc, req); 542 ceph_con_revoke(monc->con, req->request);
543 ceph_con_send(monc->con, ceph_msg_get(req->request));
500 } 544 }
501} 545}
502 546
@@ -586,26 +630,26 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
586 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 630 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
587 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 631 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
588 632
589 /* msg pools */ 633 /* msgs */
590 err = ceph_msgpool_init(&monc->msgpool_subscribe_ack, 634 err = -ENOMEM;
591 sizeof(struct ceph_mon_subscribe_ack), 1, false); 635 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
592 if (err < 0) 636 sizeof(struct ceph_mon_subscribe_ack),
637 GFP_NOFS);
638 if (!monc->m_subscribe_ack)
593 goto out_monmap; 639 goto out_monmap;
594 err = ceph_msgpool_init(&monc->msgpool_statfs_reply, 640
595 sizeof(struct ceph_mon_statfs_reply), 0, false); 641 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
596 if (err < 0) 642 if (!monc->m_subscribe)
597 goto out_pool1; 643 goto out_subscribe_ack;
598 err = ceph_msgpool_init(&monc->msgpool_auth_reply, 4096, 1, false); 644
599 if (err < 0) 645 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
600 goto out_pool2; 646 if (!monc->m_auth_reply)
601 647 goto out_subscribe;
602 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, 0, 0, NULL); 648
649 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
603 monc->pending_auth = 0; 650 monc->pending_auth = 0;
604 if (IS_ERR(monc->m_auth)) { 651 if (!monc->m_auth)
605 err = PTR_ERR(monc->m_auth); 652 goto out_auth_reply;
606 monc->m_auth = NULL;
607 goto out_pool3;
608 }
609 653
610 monc->cur_mon = -1; 654 monc->cur_mon = -1;
611 monc->hunting = true; 655 monc->hunting = true;
@@ -613,8 +657,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
613 monc->sub_sent = 0; 657 monc->sub_sent = 0;
614 658
615 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 659 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
616 monc->statfs_request_tree = RB_ROOT; 660 monc->generic_request_tree = RB_ROOT;
617 monc->num_statfs_requests = 0; 661 monc->num_generic_requests = 0;
618 monc->last_tid = 0; 662 monc->last_tid = 0;
619 663
620 monc->have_mdsmap = 0; 664 monc->have_mdsmap = 0;
@@ -622,12 +666,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
622 monc->want_next_osdmap = 1; 666 monc->want_next_osdmap = 1;
623 return 0; 667 return 0;
624 668
625out_pool3: 669out_auth_reply:
626 ceph_msgpool_destroy(&monc->msgpool_auth_reply); 670 ceph_msg_put(monc->m_auth_reply);
627out_pool2: 671out_subscribe:
628 ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); 672 ceph_msg_put(monc->m_subscribe);
629out_pool1: 673out_subscribe_ack:
630 ceph_msgpool_destroy(&monc->msgpool_statfs_reply); 674 ceph_msg_put(monc->m_subscribe_ack);
631out_monmap: 675out_monmap:
632 kfree(monc->monmap); 676 kfree(monc->monmap);
633out: 677out:
@@ -651,9 +695,9 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
651 ceph_auth_destroy(monc->auth); 695 ceph_auth_destroy(monc->auth);
652 696
653 ceph_msg_put(monc->m_auth); 697 ceph_msg_put(monc->m_auth);
654 ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); 698 ceph_msg_put(monc->m_auth_reply);
655 ceph_msgpool_destroy(&monc->msgpool_statfs_reply); 699 ceph_msg_put(monc->m_subscribe);
656 ceph_msgpool_destroy(&monc->msgpool_auth_reply); 700 ceph_msg_put(monc->m_subscribe_ack);
657 701
658 kfree(monc->monmap); 702 kfree(monc->monmap);
659} 703}
@@ -662,8 +706,11 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
662 struct ceph_msg *msg) 706 struct ceph_msg *msg)
663{ 707{
664 int ret; 708 int ret;
709 int was_auth = 0;
665 710
666 mutex_lock(&monc->mutex); 711 mutex_lock(&monc->mutex);
712 if (monc->auth->ops)
713 was_auth = monc->auth->ops->is_authenticated(monc->auth);
667 monc->pending_auth = 0; 714 monc->pending_auth = 0;
668 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 715 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
669 msg->front.iov_len, 716 msg->front.iov_len,
@@ -674,14 +721,14 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
674 wake_up(&monc->client->auth_wq); 721 wake_up(&monc->client->auth_wq);
675 } else if (ret > 0) { 722 } else if (ret > 0) {
676 __send_prepared_auth_request(monc, ret); 723 __send_prepared_auth_request(monc, ret);
677 } else if (monc->auth->ops->is_authenticated(monc->auth)) { 724 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
678 dout("authenticated, starting session\n"); 725 dout("authenticated, starting session\n");
679 726
680 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 727 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
681 monc->client->msgr->inst.name.num = monc->auth->global_id; 728 monc->client->msgr->inst.name.num = monc->auth->global_id;
682 729
683 __send_subscribe(monc); 730 __send_subscribe(monc);
684 __resend_statfs(monc); 731 __resend_generic_request(monc);
685 } 732 }
686 mutex_unlock(&monc->mutex); 733 mutex_unlock(&monc->mutex);
687} 734}
@@ -770,18 +817,17 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
770 817
771 switch (type) { 818 switch (type) {
772 case CEPH_MSG_MON_SUBSCRIBE_ACK: 819 case CEPH_MSG_MON_SUBSCRIBE_ACK:
773 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); 820 m = ceph_msg_get(monc->m_subscribe_ack);
774 break; 821 break;
775 case CEPH_MSG_STATFS_REPLY: 822 case CEPH_MSG_STATFS_REPLY:
776 m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len); 823 return get_generic_reply(con, hdr, skip);
777 break;
778 case CEPH_MSG_AUTH_REPLY: 824 case CEPH_MSG_AUTH_REPLY:
779 m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len); 825 m = ceph_msg_get(monc->m_auth_reply);
780 break; 826 break;
781 case CEPH_MSG_MON_MAP: 827 case CEPH_MSG_MON_MAP:
782 case CEPH_MSG_MDS_MAP: 828 case CEPH_MSG_MDS_MAP:
783 case CEPH_MSG_OSD_MAP: 829 case CEPH_MSG_OSD_MAP:
784 m = ceph_msg_new(type, front_len, 0, 0, NULL); 830 m = ceph_msg_new(type, front_len, GFP_NOFS);
785 break; 831 break;
786 } 832 }
787 833
@@ -826,7 +872,7 @@ out:
826 mutex_unlock(&monc->mutex); 872 mutex_unlock(&monc->mutex);
827} 873}
828 874
829const static struct ceph_connection_operations mon_con_ops = { 875static const struct ceph_connection_operations mon_con_ops = {
830 .get = ceph_con_get, 876 .get = ceph_con_get,
831 .put = ceph_con_put, 877 .put = ceph_con_put,
832 .dispatch = dispatch, 878 .dispatch = dispatch,