aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r--fs/ceph/osd_client.c55
1 files changed, 41 insertions, 14 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index dbe63db9762f..3514f71ff85f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -413,11 +413,22 @@ static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
413 */ 413 */
414static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 414static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
415{ 415{
416 struct ceph_osd_request *req;
416 int ret = 0; 417 int ret = 0;
417 418
418 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 419 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
419 if (list_empty(&osd->o_requests)) { 420 if (list_empty(&osd->o_requests)) {
420 __remove_osd(osdc, osd); 421 __remove_osd(osdc, osd);
422 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
423 &osd->o_con.peer_addr,
424 sizeof(osd->o_con.peer_addr)) == 0 &&
425 !ceph_con_opened(&osd->o_con)) {
426 dout(" osd addr hasn't changed and connection never opened,"
427 " letting msgr retry");
428 /* touch each r_stamp for handle_timeout()'s benfit */
429 list_for_each_entry(req, &osd->o_requests, r_osd_item)
430 req->r_stamp = jiffies;
431 ret = -EAGAIN;
421 } else { 432 } else {
422 ceph_con_close(&osd->o_con); 433 ceph_con_close(&osd->o_con);
423 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); 434 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -554,7 +565,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
554{ 565{
555 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 566 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
556 struct ceph_pg pgid; 567 struct ceph_pg pgid;
557 int o = -1; 568 int acting[CEPH_PG_MAX_SIZE];
569 int o = -1, num = 0;
558 int err; 570 int err;
559 571
560 dout("map_osds %p tid %lld\n", req, req->r_tid); 572 dout("map_osds %p tid %lld\n", req, req->r_tid);
@@ -565,10 +577,16 @@ static int __map_osds(struct ceph_osd_client *osdc,
565 pgid = reqhead->layout.ol_pgid; 577 pgid = reqhead->layout.ol_pgid;
566 req->r_pgid = pgid; 578 req->r_pgid = pgid;
567 579
568 o = ceph_calc_pg_primary(osdc->osdmap, pgid); 580 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
581 if (err > 0) {
582 o = acting[0];
583 num = err;
584 }
569 585
570 if ((req->r_osd && req->r_osd->o_osd == o && 586 if ((req->r_osd && req->r_osd->o_osd == o &&
571 req->r_sent >= req->r_osd->o_incarnation) || 587 req->r_sent >= req->r_osd->o_incarnation &&
588 req->r_num_pg_osds == num &&
589 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
572 (req->r_osd == NULL && o == -1)) 590 (req->r_osd == NULL && o == -1))
573 return 0; /* no change */ 591 return 0; /* no change */
574 592
@@ -576,6 +594,10 @@ static int __map_osds(struct ceph_osd_client *osdc,
576 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 594 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
577 req->r_osd ? req->r_osd->o_osd : -1); 595 req->r_osd ? req->r_osd->o_osd : -1);
578 596
597 /* record full pg acting set */
598 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
599 req->r_num_pg_osds = num;
600
579 if (req->r_osd) { 601 if (req->r_osd) {
580 __cancel_request(req); 602 __cancel_request(req);
581 list_del_init(&req->r_osd_item); 603 list_del_init(&req->r_osd_item);
@@ -601,7 +623,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
601 __remove_osd_from_lru(req->r_osd); 623 __remove_osd_from_lru(req->r_osd);
602 list_add(&req->r_osd_item, &req->r_osd->o_requests); 624 list_add(&req->r_osd_item, &req->r_osd->o_requests);
603 } 625 }
604 err = 1; /* osd changed */ 626 err = 1; /* osd or pg changed */
605 627
606out: 628out:
607 return err; 629 return err;
@@ -633,7 +655,7 @@ static int __send_request(struct ceph_osd_client *osdc,
633 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 655 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
634 reqhead->reassert_version = req->r_reassert_version; 656 reqhead->reassert_version = req->r_reassert_version;
635 657
636 req->r_sent_stamp = jiffies; 658 req->r_stamp = jiffies;
637 list_move_tail(&osdc->req_lru, &req->r_req_lru_item); 659 list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
638 660
639 ceph_msg_get(req->r_request); /* send consumes a ref */ 661 ceph_msg_get(req->r_request); /* send consumes a ref */
@@ -660,7 +682,7 @@ static void handle_timeout(struct work_struct *work)
660 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; 682 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
661 unsigned long keepalive = 683 unsigned long keepalive =
662 osdc->client->mount_args->osd_keepalive_timeout * HZ; 684 osdc->client->mount_args->osd_keepalive_timeout * HZ;
663 unsigned long last_sent = 0; 685 unsigned long last_stamp = 0;
664 struct rb_node *p; 686 struct rb_node *p;
665 struct list_head slow_osds; 687 struct list_head slow_osds;
666 688
@@ -697,12 +719,12 @@ static void handle_timeout(struct work_struct *work)
697 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 719 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
698 r_req_lru_item); 720 r_req_lru_item);
699 721
700 if (time_before(jiffies, req->r_sent_stamp + timeout)) 722 if (time_before(jiffies, req->r_stamp + timeout))
701 break; 723 break;
702 724
703 BUG_ON(req == last_req && req->r_sent_stamp == last_sent); 725 BUG_ON(req == last_req && req->r_stamp == last_stamp);
704 last_req = req; 726 last_req = req;
705 last_sent = req->r_sent_stamp; 727 last_stamp = req->r_stamp;
706 728
707 osd = req->r_osd; 729 osd = req->r_osd;
708 BUG_ON(!osd); 730 BUG_ON(!osd);
@@ -718,7 +740,7 @@ static void handle_timeout(struct work_struct *work)
718 */ 740 */
719 INIT_LIST_HEAD(&slow_osds); 741 INIT_LIST_HEAD(&slow_osds);
720 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 742 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
721 if (time_before(jiffies, req->r_sent_stamp + keepalive)) 743 if (time_before(jiffies, req->r_stamp + keepalive))
722 break; 744 break;
723 745
724 osd = req->r_osd; 746 osd = req->r_osd;
@@ -768,16 +790,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
768 struct ceph_osd_request *req; 790 struct ceph_osd_request *req;
769 u64 tid; 791 u64 tid;
770 int numops, object_len, flags; 792 int numops, object_len, flags;
793 s32 result;
771 794
772 tid = le64_to_cpu(msg->hdr.tid); 795 tid = le64_to_cpu(msg->hdr.tid);
773 if (msg->front.iov_len < sizeof(*rhead)) 796 if (msg->front.iov_len < sizeof(*rhead))
774 goto bad; 797 goto bad;
775 numops = le32_to_cpu(rhead->num_ops); 798 numops = le32_to_cpu(rhead->num_ops);
776 object_len = le32_to_cpu(rhead->object_len); 799 object_len = le32_to_cpu(rhead->object_len);
800 result = le32_to_cpu(rhead->result);
777 if (msg->front.iov_len != sizeof(*rhead) + object_len + 801 if (msg->front.iov_len != sizeof(*rhead) + object_len +
778 numops * sizeof(struct ceph_osd_op)) 802 numops * sizeof(struct ceph_osd_op))
779 goto bad; 803 goto bad;
780 dout("handle_reply %p tid %llu\n", msg, tid); 804 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
781 805
782 /* lookup */ 806 /* lookup */
783 mutex_lock(&osdc->request_mutex); 807 mutex_lock(&osdc->request_mutex);
@@ -823,7 +847,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
823 dout("handle_reply tid %llu flags %d\n", tid, flags); 847 dout("handle_reply tid %llu flags %d\n", tid, flags);
824 848
825 /* either this is a read, or we got the safe response */ 849 /* either this is a read, or we got the safe response */
826 if ((flags & CEPH_OSD_FLAG_ONDISK) || 850 if (result < 0 ||
851 (flags & CEPH_OSD_FLAG_ONDISK) ||
827 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 852 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
828 __unregister_request(osdc, req); 853 __unregister_request(osdc, req);
829 854
@@ -862,7 +887,9 @@ static int __kick_requests(struct ceph_osd_client *osdc,
862 887
863 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); 888 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
864 if (kickosd) { 889 if (kickosd) {
865 __reset_osd(osdc, kickosd); 890 err = __reset_osd(osdc, kickosd);
891 if (err == -EAGAIN)
892 return 1;
866 } else { 893 } else {
867 for (p = rb_first(&osdc->osds); p; p = n) { 894 for (p = rb_first(&osdc->osds); p; p = n) {
868 struct ceph_osd *osd = 895 struct ceph_osd *osd =
@@ -913,7 +940,7 @@ static int __kick_requests(struct ceph_osd_client *osdc,
913 940
914kick: 941kick:
915 dout("kicking %p tid %llu osd%d\n", req, req->r_tid, 942 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
916 req->r_osd->o_osd); 943 req->r_osd ? req->r_osd->o_osd : -1);
917 req->r_flags |= CEPH_OSD_FLAG_RETRY; 944 req->r_flags |= CEPH_OSD_FLAG_RETRY;
918 err = __send_request(osdc, req); 945 err = __send_request(osdc, req);
919 if (err) { 946 if (err) {