aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c638
1 files changed, 498 insertions, 140 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3e20a122ffa2..6b5dda1cb5df 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -22,10 +22,15 @@
22#define OSD_OPREPLY_FRONT_LEN 512 22#define OSD_OPREPLY_FRONT_LEN 512
23 23
24static const struct ceph_connection_operations osd_con_ops; 24static const struct ceph_connection_operations osd_con_ops;
25static int __kick_requests(struct ceph_osd_client *osdc,
26 struct ceph_osd *kickosd);
27 25
28static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 26static void send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req);
30static void __unregister_linger_request(struct ceph_osd_client *osdc,
31 struct ceph_osd_request *req);
32static int __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req);
29 34
30static int op_needs_trail(int op) 35static int op_needs_trail(int op)
31{ 36{
@@ -34,6 +39,7 @@ static int op_needs_trail(int op)
34 case CEPH_OSD_OP_SETXATTR: 39 case CEPH_OSD_OP_SETXATTR:
35 case CEPH_OSD_OP_CMPXATTR: 40 case CEPH_OSD_OP_CMPXATTR:
36 case CEPH_OSD_OP_CALL: 41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
37 return 1; 43 return 1;
38 default: 44 default:
39 return 0; 45 return 0;
@@ -209,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
209 init_completion(&req->r_completion); 215 init_completion(&req->r_completion);
210 init_completion(&req->r_safe_completion); 216 init_completion(&req->r_safe_completion);
211 INIT_LIST_HEAD(&req->r_unsafe_item); 217 INIT_LIST_HEAD(&req->r_unsafe_item);
218 INIT_LIST_HEAD(&req->r_linger_item);
219 INIT_LIST_HEAD(&req->r_linger_osd);
212 req->r_flags = flags; 220 req->r_flags = flags;
213 221
214 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 222 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -315,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
315 break; 323 break;
316 case CEPH_OSD_OP_STARTSYNC: 324 case CEPH_OSD_OP_STARTSYNC:
317 break; 325 break;
326 case CEPH_OSD_OP_NOTIFY:
327 {
328 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
329 __le32 timeout = cpu_to_le32(src->watch.timeout);
330
331 BUG_ON(!req->r_trail);
332
333 ceph_pagelist_append(req->r_trail,
334 &prot_ver, sizeof(prot_ver));
335 ceph_pagelist_append(req->r_trail,
336 &timeout, sizeof(timeout));
337 }
338 case CEPH_OSD_OP_NOTIFY_ACK:
339 case CEPH_OSD_OP_WATCH:
340 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
341 dst->watch.ver = cpu_to_le64(src->watch.ver);
342 dst->watch.flag = src->watch.flag;
343 break;
318 default: 344 default:
319 pr_err("unrecognized osd opcode %d\n", dst->op); 345 pr_err("unrecognized osd opcode %d\n", dst->op);
320 WARN_ON(1); 346 WARN_ON(1);
@@ -444,8 +470,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
444 snapc, ops, 470 snapc, ops,
445 use_mempool, 471 use_mempool,
446 GFP_NOFS, NULL, NULL); 472 GFP_NOFS, NULL, NULL);
447 if (IS_ERR(req)) 473 if (!req)
448 return req; 474 return NULL;
449 475
450 /* calculate max write size */ 476 /* calculate max write size */
451 calc_layout(osdc, vino, layout, off, plen, req, ops); 477 calc_layout(osdc, vino, layout, off, plen, req, ops);
@@ -529,6 +555,51 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
529 return NULL; 555 return NULL;
530} 556}
531 557
558/*
559 * Resubmit requests pending on the given osd.
560 */
561static void __kick_osd_requests(struct ceph_osd_client *osdc,
562 struct ceph_osd *osd)
563{
564 struct ceph_osd_request *req, *nreq;
565 int err;
566
567 dout("__kick_osd_requests osd%d\n", osd->o_osd);
568 err = __reset_osd(osdc, osd);
569 if (err == -EAGAIN)
570 return;
571
572 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
573 list_move(&req->r_req_lru_item, &osdc->req_unsent);
574 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
575 osd->o_osd);
576 if (!req->r_linger)
577 req->r_flags |= CEPH_OSD_FLAG_RETRY;
578 }
579
580 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
581 r_linger_osd) {
582 /*
583 * reregister request prior to unregistering linger so
584 * that r_osd is preserved.
585 */
586 BUG_ON(!list_empty(&req->r_req_lru_item));
587 __register_request(osdc, req);
588 list_add(&req->r_req_lru_item, &osdc->req_unsent);
589 list_add(&req->r_osd_item, &req->r_osd->o_requests);
590 __unregister_linger_request(osdc, req);
591 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
592 osd->o_osd);
593 }
594}
595
596static void kick_osd_requests(struct ceph_osd_client *osdc,
597 struct ceph_osd *kickosd)
598{
599 mutex_lock(&osdc->request_mutex);
600 __kick_osd_requests(osdc, kickosd);
601 mutex_unlock(&osdc->request_mutex);
602}
532 603
533/* 604/*
534 * If the osd connection drops, we need to resubmit all requests. 605 * If the osd connection drops, we need to resubmit all requests.
@@ -543,7 +614,8 @@ static void osd_reset(struct ceph_connection *con)
543 dout("osd_reset osd%d\n", osd->o_osd); 614 dout("osd_reset osd%d\n", osd->o_osd);
544 osdc = osd->o_osdc; 615 osdc = osd->o_osdc;
545 down_read(&osdc->map_sem); 616 down_read(&osdc->map_sem);
546 kick_requests(osdc, osd); 617 kick_osd_requests(osdc, osd);
618 send_queued(osdc);
547 up_read(&osdc->map_sem); 619 up_read(&osdc->map_sem);
548} 620}
549 621
@@ -561,6 +633,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
561 atomic_set(&osd->o_ref, 1); 633 atomic_set(&osd->o_ref, 1);
562 osd->o_osdc = osdc; 634 osd->o_osdc = osdc;
563 INIT_LIST_HEAD(&osd->o_requests); 635 INIT_LIST_HEAD(&osd->o_requests);
636 INIT_LIST_HEAD(&osd->o_linger_requests);
564 INIT_LIST_HEAD(&osd->o_osd_lru); 637 INIT_LIST_HEAD(&osd->o_osd_lru);
565 osd->o_incarnation = 1; 638 osd->o_incarnation = 1;
566 639
@@ -650,7 +723,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
650 int ret = 0; 723 int ret = 0;
651 724
652 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 725 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
653 if (list_empty(&osd->o_requests)) { 726 if (list_empty(&osd->o_requests) &&
727 list_empty(&osd->o_linger_requests)) {
654 __remove_osd(osdc, osd); 728 __remove_osd(osdc, osd);
655 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 729 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
656 &osd->o_con.peer_addr, 730 &osd->o_con.peer_addr,
@@ -723,15 +797,14 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
723 * Register request, assign tid. If this is the first request, set up 797 * Register request, assign tid. If this is the first request, set up
724 * the timeout event. 798 * the timeout event.
725 */ 799 */
726static void register_request(struct ceph_osd_client *osdc, 800static void __register_request(struct ceph_osd_client *osdc,
727 struct ceph_osd_request *req) 801 struct ceph_osd_request *req)
728{ 802{
729 mutex_lock(&osdc->request_mutex);
730 req->r_tid = ++osdc->last_tid; 803 req->r_tid = ++osdc->last_tid;
731 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 804 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
732 INIT_LIST_HEAD(&req->r_req_lru_item); 805 INIT_LIST_HEAD(&req->r_req_lru_item);
733 806
734 dout("register_request %p tid %lld\n", req, req->r_tid); 807 dout("__register_request %p tid %lld\n", req, req->r_tid);
735 __insert_request(osdc, req); 808 __insert_request(osdc, req);
736 ceph_osdc_get_request(req); 809 ceph_osdc_get_request(req);
737 osdc->num_requests++; 810 osdc->num_requests++;
@@ -740,6 +813,13 @@ static void register_request(struct ceph_osd_client *osdc,
740 dout(" first request, scheduling timeout\n"); 813 dout(" first request, scheduling timeout\n");
741 __schedule_osd_timeout(osdc); 814 __schedule_osd_timeout(osdc);
742 } 815 }
816}
817
818static void register_request(struct ceph_osd_client *osdc,
819 struct ceph_osd_request *req)
820{
821 mutex_lock(&osdc->request_mutex);
822 __register_request(osdc, req);
743 mutex_unlock(&osdc->request_mutex); 823 mutex_unlock(&osdc->request_mutex);
744} 824}
745 825
@@ -758,9 +838,13 @@ static void __unregister_request(struct ceph_osd_client *osdc,
758 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 838 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
759 839
760 list_del_init(&req->r_osd_item); 840 list_del_init(&req->r_osd_item);
761 if (list_empty(&req->r_osd->o_requests)) 841 if (list_empty(&req->r_osd->o_requests) &&
842 list_empty(&req->r_osd->o_linger_requests)) {
843 dout("moving osd to %p lru\n", req->r_osd);
762 __move_osd_to_lru(osdc, req->r_osd); 844 __move_osd_to_lru(osdc, req->r_osd);
763 req->r_osd = NULL; 845 }
846 if (list_empty(&req->r_linger_item))
847 req->r_osd = NULL;
764 } 848 }
765 849
766 ceph_osdc_put_request(req); 850 ceph_osdc_put_request(req);
@@ -781,20 +865,73 @@ static void __cancel_request(struct ceph_osd_request *req)
781 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 865 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
782 req->r_sent = 0; 866 req->r_sent = 0;
783 } 867 }
784 list_del_init(&req->r_req_lru_item);
785} 868}
786 869
870static void __register_linger_request(struct ceph_osd_client *osdc,
871 struct ceph_osd_request *req)
872{
873 dout("__register_linger_request %p\n", req);
874 list_add_tail(&req->r_linger_item, &osdc->req_linger);
875 list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
876}
877
878static void __unregister_linger_request(struct ceph_osd_client *osdc,
879 struct ceph_osd_request *req)
880{
881 dout("__unregister_linger_request %p\n", req);
882 if (req->r_osd) {
883 list_del_init(&req->r_linger_item);
884 list_del_init(&req->r_linger_osd);
885
886 if (list_empty(&req->r_osd->o_requests) &&
887 list_empty(&req->r_osd->o_linger_requests)) {
888 dout("moving osd to %p lru\n", req->r_osd);
889 __move_osd_to_lru(osdc, req->r_osd);
890 }
891 if (list_empty(&req->r_osd_item))
892 req->r_osd = NULL;
893 }
894}
895
896void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
897 struct ceph_osd_request *req)
898{
899 mutex_lock(&osdc->request_mutex);
900 if (req->r_linger) {
901 __unregister_linger_request(osdc, req);
902 ceph_osdc_put_request(req);
903 }
904 mutex_unlock(&osdc->request_mutex);
905}
906EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
907
908void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
909 struct ceph_osd_request *req)
910{
911 if (!req->r_linger) {
912 dout("set_request_linger %p\n", req);
913 req->r_linger = 1;
914 /*
915 * caller is now responsible for calling
916 * unregister_linger_request
917 */
918 ceph_osdc_get_request(req);
919 }
920}
921EXPORT_SYMBOL(ceph_osdc_set_request_linger);
922
787/* 923/*
788 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 924 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
789 * (as needed), and set the request r_osd appropriately. If there is 925 * (as needed), and set the request r_osd appropriately. If there is
790 * no up osd, set r_osd to NULL. 926 * no up osd, set r_osd to NULL. Move the request to the appropriate list
927 * (unsent, homeless) or leave on in-flight lru.
791 * 928 *
792 * Return 0 if unchanged, 1 if changed, or negative on error. 929 * Return 0 if unchanged, 1 if changed, or negative on error.
793 * 930 *
794 * Caller should hold map_sem for read and request_mutex. 931 * Caller should hold map_sem for read and request_mutex.
795 */ 932 */
796static int __map_osds(struct ceph_osd_client *osdc, 933static int __map_request(struct ceph_osd_client *osdc,
797 struct ceph_osd_request *req) 934 struct ceph_osd_request *req)
798{ 935{
799 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 936 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
800 struct ceph_pg pgid; 937 struct ceph_pg pgid;
@@ -802,11 +939,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
802 int o = -1, num = 0; 939 int o = -1, num = 0;
803 int err; 940 int err;
804 941
805 dout("map_osds %p tid %lld\n", req, req->r_tid); 942 dout("map_request %p tid %lld\n", req, req->r_tid);
806 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 943 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
807 &req->r_file_layout, osdc->osdmap); 944 &req->r_file_layout, osdc->osdmap);
808 if (err) 945 if (err) {
946 list_move(&req->r_req_lru_item, &osdc->req_notarget);
809 return err; 947 return err;
948 }
810 pgid = reqhead->layout.ol_pgid; 949 pgid = reqhead->layout.ol_pgid;
811 req->r_pgid = pgid; 950 req->r_pgid = pgid;
812 951
@@ -823,7 +962,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
823 (req->r_osd == NULL && o == -1)) 962 (req->r_osd == NULL && o == -1))
824 return 0; /* no change */ 963 return 0; /* no change */
825 964
826 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", 965 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
827 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 966 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
828 req->r_osd ? req->r_osd->o_osd : -1); 967 req->r_osd ? req->r_osd->o_osd : -1);
829 968
@@ -841,10 +980,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
841 if (!req->r_osd && o >= 0) { 980 if (!req->r_osd && o >= 0) {
842 err = -ENOMEM; 981 err = -ENOMEM;
843 req->r_osd = create_osd(osdc); 982 req->r_osd = create_osd(osdc);
844 if (!req->r_osd) 983 if (!req->r_osd) {
984 list_move(&req->r_req_lru_item, &osdc->req_notarget);
845 goto out; 985 goto out;
986 }
846 987
847 dout("map_osds osd %p is osd%d\n", req->r_osd, o); 988 dout("map_request osd %p is osd%d\n", req->r_osd, o);
848 req->r_osd->o_osd = o; 989 req->r_osd->o_osd = o;
849 req->r_osd->o_con.peer_name.num = cpu_to_le64(o); 990 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
850 __insert_osd(osdc, req->r_osd); 991 __insert_osd(osdc, req->r_osd);
@@ -855,6 +996,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
855 if (req->r_osd) { 996 if (req->r_osd) {
856 __remove_osd_from_lru(req->r_osd); 997 __remove_osd_from_lru(req->r_osd);
857 list_add(&req->r_osd_item, &req->r_osd->o_requests); 998 list_add(&req->r_osd_item, &req->r_osd->o_requests);
999 list_move(&req->r_req_lru_item, &osdc->req_unsent);
1000 } else {
1001 list_move(&req->r_req_lru_item, &osdc->req_notarget);
858 } 1002 }
859 err = 1; /* osd or pg changed */ 1003 err = 1; /* osd or pg changed */
860 1004
@@ -869,16 +1013,6 @@ static int __send_request(struct ceph_osd_client *osdc,
869 struct ceph_osd_request *req) 1013 struct ceph_osd_request *req)
870{ 1014{
871 struct ceph_osd_request_head *reqhead; 1015 struct ceph_osd_request_head *reqhead;
872 int err;
873
874 err = __map_osds(osdc, req);
875 if (err < 0)
876 return err;
877 if (req->r_osd == NULL) {
878 dout("send_request %p no up osds in pg\n", req);
879 ceph_monc_request_next_osdmap(&osdc->client->monc);
880 return 0;
881 }
882 1016
883 dout("send_request %p tid %llu to osd%d flags %d\n", 1017 dout("send_request %p tid %llu to osd%d flags %d\n",
884 req, req->r_tid, req->r_osd->o_osd, req->r_flags); 1018 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
@@ -898,6 +1032,21 @@ static int __send_request(struct ceph_osd_client *osdc,
898} 1032}
899 1033
900/* 1034/*
1035 * Send any requests in the queue (req_unsent).
1036 */
1037static void send_queued(struct ceph_osd_client *osdc)
1038{
1039 struct ceph_osd_request *req, *tmp;
1040
1041 dout("send_queued\n");
1042 mutex_lock(&osdc->request_mutex);
1043 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1044 __send_request(osdc, req);
1045 }
1046 mutex_unlock(&osdc->request_mutex);
1047}
1048
1049/*
901 * Timeout callback, called every N seconds when 1 or more osd 1050 * Timeout callback, called every N seconds when 1 or more osd
902 * requests has been active for more than N seconds. When this 1051 * requests has been active for more than N seconds. When this
903 * happens, we ping all OSDs with requests who have timed out to 1052 * happens, we ping all OSDs with requests who have timed out to
@@ -916,30 +1065,13 @@ static void handle_timeout(struct work_struct *work)
916 unsigned long keepalive = 1065 unsigned long keepalive =
917 osdc->client->options->osd_keepalive_timeout * HZ; 1066 osdc->client->options->osd_keepalive_timeout * HZ;
918 unsigned long last_stamp = 0; 1067 unsigned long last_stamp = 0;
919 struct rb_node *p;
920 struct list_head slow_osds; 1068 struct list_head slow_osds;
921
922 dout("timeout\n"); 1069 dout("timeout\n");
923 down_read(&osdc->map_sem); 1070 down_read(&osdc->map_sem);
924 1071
925 ceph_monc_request_next_osdmap(&osdc->client->monc); 1072 ceph_monc_request_next_osdmap(&osdc->client->monc);
926 1073
927 mutex_lock(&osdc->request_mutex); 1074 mutex_lock(&osdc->request_mutex);
928 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
929 req = rb_entry(p, struct ceph_osd_request, r_node);
930
931 if (req->r_resend) {
932 int err;
933
934 dout("osdc resending prev failed %lld\n", req->r_tid);
935 err = __send_request(osdc, req);
936 if (err)
937 dout("osdc failed again on %lld\n", req->r_tid);
938 else
939 req->r_resend = false;
940 continue;
941 }
942 }
943 1075
944 /* 1076 /*
945 * reset osds that appear to be _really_ unresponsive. this 1077 * reset osds that appear to be _really_ unresponsive. this
@@ -963,7 +1095,7 @@ static void handle_timeout(struct work_struct *work)
963 BUG_ON(!osd); 1095 BUG_ON(!osd);
964 pr_warning(" tid %llu timed out on osd%d, will reset osd\n", 1096 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
965 req->r_tid, osd->o_osd); 1097 req->r_tid, osd->o_osd);
966 __kick_requests(osdc, osd); 1098 __kick_osd_requests(osdc, osd);
967 } 1099 }
968 1100
969 /* 1101 /*
@@ -991,7 +1123,7 @@ static void handle_timeout(struct work_struct *work)
991 1123
992 __schedule_osd_timeout(osdc); 1124 __schedule_osd_timeout(osdc);
993 mutex_unlock(&osdc->request_mutex); 1125 mutex_unlock(&osdc->request_mutex);
994 1126 send_queued(osdc);
995 up_read(&osdc->map_sem); 1127 up_read(&osdc->map_sem);
996} 1128}
997 1129
@@ -1035,7 +1167,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1035 numops * sizeof(struct ceph_osd_op)) 1167 numops * sizeof(struct ceph_osd_op))
1036 goto bad; 1168 goto bad;
1037 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1169 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1038
1039 /* lookup */ 1170 /* lookup */
1040 mutex_lock(&osdc->request_mutex); 1171 mutex_lock(&osdc->request_mutex);
1041 req = __lookup_request(osdc, tid); 1172 req = __lookup_request(osdc, tid);
@@ -1079,6 +1210,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1079 1210
1080 dout("handle_reply tid %llu flags %d\n", tid, flags); 1211 dout("handle_reply tid %llu flags %d\n", tid, flags);
1081 1212
1213 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1214 __register_linger_request(osdc, req);
1215
1082 /* either this is a read, or we got the safe response */ 1216 /* either this is a read, or we got the safe response */
1083 if (result < 0 || 1217 if (result < 0 ||
1084 (flags & CEPH_OSD_FLAG_ONDISK) || 1218 (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1099,6 +1233,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1099 } 1233 }
1100 1234
1101done: 1235done:
1236 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1102 ceph_osdc_put_request(req); 1237 ceph_osdc_put_request(req);
1103 return; 1238 return;
1104 1239
@@ -1109,108 +1244,83 @@ bad:
1109 ceph_msg_dump(msg); 1244 ceph_msg_dump(msg);
1110} 1245}
1111 1246
1112 1247static void reset_changed_osds(struct ceph_osd_client *osdc)
1113static int __kick_requests(struct ceph_osd_client *osdc,
1114 struct ceph_osd *kickosd)
1115{ 1248{
1116 struct ceph_osd_request *req;
1117 struct rb_node *p, *n; 1249 struct rb_node *p, *n;
1118 int needmap = 0;
1119 int err;
1120 1250
1121 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); 1251 for (p = rb_first(&osdc->osds); p; p = n) {
1122 if (kickosd) { 1252 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1123 err = __reset_osd(osdc, kickosd); 1253
1124 if (err == -EAGAIN) 1254 n = rb_next(p);
1125 return 1; 1255 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1126 } else { 1256 memcmp(&osd->o_con.peer_addr,
1127 for (p = rb_first(&osdc->osds); p; p = n) { 1257 ceph_osd_addr(osdc->osdmap,
1128 struct ceph_osd *osd = 1258 osd->o_osd),
1129 rb_entry(p, struct ceph_osd, o_node); 1259 sizeof(struct ceph_entity_addr)) != 0)
1130 1260 __reset_osd(osdc, osd);
1131 n = rb_next(p);
1132 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1133 memcmp(&osd->o_con.peer_addr,
1134 ceph_osd_addr(osdc->osdmap,
1135 osd->o_osd),
1136 sizeof(struct ceph_entity_addr)) != 0)
1137 __reset_osd(osdc, osd);
1138 }
1139 } 1261 }
1262}
1263
1264/*
1265 * Requeue requests whose mapping to an OSD has changed. If requests map to
1266 * no osd, request a new map.
1267 *
1268 * Caller should hold map_sem for read and request_mutex.
1269 */
1270static void kick_requests(struct ceph_osd_client *osdc)
1271{
1272 struct ceph_osd_request *req, *nreq;
1273 struct rb_node *p;
1274 int needmap = 0;
1275 int err;
1140 1276
1277 dout("kick_requests\n");
1278 mutex_lock(&osdc->request_mutex);
1141 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 1279 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142 req = rb_entry(p, struct ceph_osd_request, r_node); 1280 req = rb_entry(p, struct ceph_osd_request, r_node);
1143 1281 err = __map_request(osdc, req);
1144 if (req->r_resend) { 1282 if (err < 0)
1145 dout(" r_resend set on tid %llu\n", req->r_tid); 1283 continue; /* error */
1146 __cancel_request(req); 1284 if (req->r_osd == NULL) {
1147 goto kick; 1285 dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1148 } 1286 needmap++; /* request a newer map */
1149 if (req->r_osd && kickosd == req->r_osd) { 1287 } else if (err > 0) {
1150 __cancel_request(req); 1288 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1151 goto kick; 1289 req->r_osd ? req->r_osd->o_osd : -1);
1290 if (!req->r_linger)
1291 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1152 } 1292 }
1293 }
1153 1294
1154 err = __map_osds(osdc, req); 1295 list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1296 r_linger_item) {
1297 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1298
1299 err = __map_request(osdc, req);
1155 if (err == 0) 1300 if (err == 0)
1156 continue; /* no change */ 1301 continue; /* no change and no osd was specified */
1157 if (err < 0) { 1302 if (err < 0)
1158 /* 1303 continue; /* hrm! */
1159 * FIXME: really, we should set the request
1160 * error and fail if this isn't a 'nofail'
1161 * request, but that's a fair bit more
1162 * complicated to do. So retry!
1163 */
1164 dout(" setting r_resend on %llu\n", req->r_tid);
1165 req->r_resend = true;
1166 continue;
1167 }
1168 if (req->r_osd == NULL) { 1304 if (req->r_osd == NULL) {
1169 dout("tid %llu maps to no valid osd\n", req->r_tid); 1305 dout("tid %llu maps to no valid osd\n", req->r_tid);
1170 needmap++; /* request a newer map */ 1306 needmap++; /* request a newer map */
1171 continue; 1307 continue;
1172 } 1308 }
1173 1309
1174kick: 1310 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1175 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
1176 req->r_osd ? req->r_osd->o_osd : -1); 1311 req->r_osd ? req->r_osd->o_osd : -1);
1177 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1312 __unregister_linger_request(osdc, req);
1178 err = __send_request(osdc, req); 1313 __register_request(osdc, req);
1179 if (err) {
1180 dout(" setting r_resend on %llu\n", req->r_tid);
1181 req->r_resend = true;
1182 }
1183 } 1314 }
1184
1185 return needmap;
1186}
1187
1188/*
1189 * Resubmit osd requests whose osd or osd address has changed. Request
1190 * a new osd map if osds are down, or we are otherwise unable to determine
1191 * how to direct a request.
1192 *
1193 * Close connections to down osds.
1194 *
1195 * If @who is specified, resubmit requests for that specific osd.
1196 *
1197 * Caller should hold map_sem for read and request_mutex.
1198 */
1199static void kick_requests(struct ceph_osd_client *osdc,
1200 struct ceph_osd *kickosd)
1201{
1202 int needmap;
1203
1204 mutex_lock(&osdc->request_mutex);
1205 needmap = __kick_requests(osdc, kickosd);
1206 mutex_unlock(&osdc->request_mutex); 1315 mutex_unlock(&osdc->request_mutex);
1207 1316
1208 if (needmap) { 1317 if (needmap) {
1209 dout("%d requests for down osds, need new map\n", needmap); 1318 dout("%d requests for down osds, need new map\n", needmap);
1210 ceph_monc_request_next_osdmap(&osdc->client->monc); 1319 ceph_monc_request_next_osdmap(&osdc->client->monc);
1211 } 1320 }
1212
1213} 1321}
1322
1323
1214/* 1324/*
1215 * Process updated osd map. 1325 * Process updated osd map.
1216 * 1326 *
@@ -1263,6 +1373,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1263 ceph_osdmap_destroy(osdc->osdmap); 1373 ceph_osdmap_destroy(osdc->osdmap);
1264 osdc->osdmap = newmap; 1374 osdc->osdmap = newmap;
1265 } 1375 }
1376 kick_requests(osdc);
1377 reset_changed_osds(osdc);
1266 } else { 1378 } else {
1267 dout("ignoring incremental map %u len %d\n", 1379 dout("ignoring incremental map %u len %d\n",
1268 epoch, maplen); 1380 epoch, maplen);
@@ -1300,6 +1412,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1300 osdc->osdmap = newmap; 1412 osdc->osdmap = newmap;
1301 if (oldmap) 1413 if (oldmap)
1302 ceph_osdmap_destroy(oldmap); 1414 ceph_osdmap_destroy(oldmap);
1415 kick_requests(osdc);
1303 } 1416 }
1304 p += maplen; 1417 p += maplen;
1305 nr_maps--; 1418 nr_maps--;
@@ -1308,8 +1421,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1308done: 1421done:
1309 downgrade_write(&osdc->map_sem); 1422 downgrade_write(&osdc->map_sem);
1310 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1423 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311 if (newmap) 1424 send_queued(osdc);
1312 kick_requests(osdc, NULL);
1313 up_read(&osdc->map_sem); 1425 up_read(&osdc->map_sem);
1314 wake_up_all(&osdc->client->auth_wq); 1426 wake_up_all(&osdc->client->auth_wq);
1315 return; 1427 return;
@@ -1322,6 +1434,223 @@ bad:
1322} 1434}
1323 1435
1324/* 1436/*
1437 * watch/notify callback event infrastructure
1438 *
1439 * These callbacks are used both for watch and notify operations.
1440 */
1441static void __release_event(struct kref *kref)
1442{
1443 struct ceph_osd_event *event =
1444 container_of(kref, struct ceph_osd_event, kref);
1445
1446 dout("__release_event %p\n", event);
1447 kfree(event);
1448}
1449
1450static void get_event(struct ceph_osd_event *event)
1451{
1452 kref_get(&event->kref);
1453}
1454
1455void ceph_osdc_put_event(struct ceph_osd_event *event)
1456{
1457 kref_put(&event->kref, __release_event);
1458}
1459EXPORT_SYMBOL(ceph_osdc_put_event);
1460
1461static void __insert_event(struct ceph_osd_client *osdc,
1462 struct ceph_osd_event *new)
1463{
1464 struct rb_node **p = &osdc->event_tree.rb_node;
1465 struct rb_node *parent = NULL;
1466 struct ceph_osd_event *event = NULL;
1467
1468 while (*p) {
1469 parent = *p;
1470 event = rb_entry(parent, struct ceph_osd_event, node);
1471 if (new->cookie < event->cookie)
1472 p = &(*p)->rb_left;
1473 else if (new->cookie > event->cookie)
1474 p = &(*p)->rb_right;
1475 else
1476 BUG();
1477 }
1478
1479 rb_link_node(&new->node, parent, p);
1480 rb_insert_color(&new->node, &osdc->event_tree);
1481}
1482
1483static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1484 u64 cookie)
1485{
1486 struct rb_node **p = &osdc->event_tree.rb_node;
1487 struct rb_node *parent = NULL;
1488 struct ceph_osd_event *event = NULL;
1489
1490 while (*p) {
1491 parent = *p;
1492 event = rb_entry(parent, struct ceph_osd_event, node);
1493 if (cookie < event->cookie)
1494 p = &(*p)->rb_left;
1495 else if (cookie > event->cookie)
1496 p = &(*p)->rb_right;
1497 else
1498 return event;
1499 }
1500 return NULL;
1501}
1502
1503static void __remove_event(struct ceph_osd_event *event)
1504{
1505 struct ceph_osd_client *osdc = event->osdc;
1506
1507 if (!RB_EMPTY_NODE(&event->node)) {
1508 dout("__remove_event removed %p\n", event);
1509 rb_erase(&event->node, &osdc->event_tree);
1510 ceph_osdc_put_event(event);
1511 } else {
1512 dout("__remove_event didn't remove %p\n", event);
1513 }
1514}
1515
1516int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1517 void (*event_cb)(u64, u64, u8, void *),
1518 int one_shot, void *data,
1519 struct ceph_osd_event **pevent)
1520{
1521 struct ceph_osd_event *event;
1522
1523 event = kmalloc(sizeof(*event), GFP_NOIO);
1524 if (!event)
1525 return -ENOMEM;
1526
1527 dout("create_event %p\n", event);
1528 event->cb = event_cb;
1529 event->one_shot = one_shot;
1530 event->data = data;
1531 event->osdc = osdc;
1532 INIT_LIST_HEAD(&event->osd_node);
1533 kref_init(&event->kref); /* one ref for us */
1534 kref_get(&event->kref); /* one ref for the caller */
1535 init_completion(&event->completion);
1536
1537 spin_lock(&osdc->event_lock);
1538 event->cookie = ++osdc->event_count;
1539 __insert_event(osdc, event);
1540 spin_unlock(&osdc->event_lock);
1541
1542 *pevent = event;
1543 return 0;
1544}
1545EXPORT_SYMBOL(ceph_osdc_create_event);
1546
1547void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1548{
1549 struct ceph_osd_client *osdc = event->osdc;
1550
1551 dout("cancel_event %p\n", event);
1552 spin_lock(&osdc->event_lock);
1553 __remove_event(event);
1554 spin_unlock(&osdc->event_lock);
1555 ceph_osdc_put_event(event); /* caller's */
1556}
1557EXPORT_SYMBOL(ceph_osdc_cancel_event);
1558
1559
1560static void do_event_work(struct work_struct *work)
1561{
1562 struct ceph_osd_event_work *event_work =
1563 container_of(work, struct ceph_osd_event_work, work);
1564 struct ceph_osd_event *event = event_work->event;
1565 u64 ver = event_work->ver;
1566 u64 notify_id = event_work->notify_id;
1567 u8 opcode = event_work->opcode;
1568
1569 dout("do_event_work completing %p\n", event);
1570 event->cb(ver, notify_id, opcode, event->data);
1571 complete(&event->completion);
1572 dout("do_event_work completed %p\n", event);
1573 ceph_osdc_put_event(event);
1574 kfree(event_work);
1575}
1576
1577
1578/*
1579 * Process osd watch notifications
1580 */
1581void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1582{
1583 void *p, *end;
1584 u8 proto_ver;
1585 u64 cookie, ver, notify_id;
1586 u8 opcode;
1587 struct ceph_osd_event *event;
1588 struct ceph_osd_event_work *event_work;
1589
1590 p = msg->front.iov_base;
1591 end = p + msg->front.iov_len;
1592
1593 ceph_decode_8_safe(&p, end, proto_ver, bad);
1594 ceph_decode_8_safe(&p, end, opcode, bad);
1595 ceph_decode_64_safe(&p, end, cookie, bad);
1596 ceph_decode_64_safe(&p, end, ver, bad);
1597 ceph_decode_64_safe(&p, end, notify_id, bad);
1598
1599 spin_lock(&osdc->event_lock);
1600 event = __find_event(osdc, cookie);
1601 if (event) {
1602 get_event(event);
1603 if (event->one_shot)
1604 __remove_event(event);
1605 }
1606 spin_unlock(&osdc->event_lock);
1607 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1608 cookie, ver, event);
1609 if (event) {
1610 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1611 if (!event_work) {
1612 dout("ERROR: could not allocate event_work\n");
1613 goto done_err;
1614 }
1615 INIT_WORK(&event_work->work, do_event_work);
1616 event_work->event = event;
1617 event_work->ver = ver;
1618 event_work->notify_id = notify_id;
1619 event_work->opcode = opcode;
1620 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1621 dout("WARNING: failed to queue notify event work\n");
1622 goto done_err;
1623 }
1624 }
1625
1626 return;
1627
1628done_err:
1629 complete(&event->completion);
1630 ceph_osdc_put_event(event);
1631 return;
1632
1633bad:
1634 pr_err("osdc handle_watch_notify corrupt msg\n");
1635 return;
1636}
1637
1638int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1639{
1640 int err;
1641
1642 dout("wait_event %p\n", event);
1643 err = wait_for_completion_interruptible_timeout(&event->completion,
1644 timeout * HZ);
1645 ceph_osdc_put_event(event);
1646 if (err > 0)
1647 err = 0;
1648 dout("wait_event %p returns %d\n", event, err);
1649 return err;
1650}
1651EXPORT_SYMBOL(ceph_osdc_wait_event);
1652
1653/*
1325 * Register request, send initial attempt. 1654 * Register request, send initial attempt.
1326 */ 1655 */
1327int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1656int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -1347,18 +1676,27 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1347 * the request still han't been touched yet. 1676 * the request still han't been touched yet.
1348 */ 1677 */
1349 if (req->r_sent == 0) { 1678 if (req->r_sent == 0) {
1350 rc = __send_request(osdc, req); 1679 rc = __map_request(osdc, req);
1351 if (rc) { 1680 if (rc < 0)
1352 if (nofail) { 1681 goto out_unlock;
1353 dout("osdc_start_request failed send, " 1682 if (req->r_osd == NULL) {
1354 " marking %lld\n", req->r_tid); 1683 dout("send_request %p no up osds in pg\n", req);
1355 req->r_resend = true; 1684 ceph_monc_request_next_osdmap(&osdc->client->monc);
1356 rc = 0; 1685 } else {
1357 } else { 1686 rc = __send_request(osdc, req);
1358 __unregister_request(osdc, req); 1687 if (rc) {
1688 if (nofail) {
1689 dout("osdc_start_request failed send, "
1690 " will retry %lld\n", req->r_tid);
1691 rc = 0;
1692 } else {
1693 __unregister_request(osdc, req);
1694 }
1359 } 1695 }
1360 } 1696 }
1361 } 1697 }
1698
1699out_unlock:
1362 mutex_unlock(&osdc->request_mutex); 1700 mutex_unlock(&osdc->request_mutex);
1363 up_read(&osdc->map_sem); 1701 up_read(&osdc->map_sem);
1364 return rc; 1702 return rc;
@@ -1441,9 +1779,15 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1441 INIT_LIST_HEAD(&osdc->osd_lru); 1779 INIT_LIST_HEAD(&osdc->osd_lru);
1442 osdc->requests = RB_ROOT; 1780 osdc->requests = RB_ROOT;
1443 INIT_LIST_HEAD(&osdc->req_lru); 1781 INIT_LIST_HEAD(&osdc->req_lru);
1782 INIT_LIST_HEAD(&osdc->req_unsent);
1783 INIT_LIST_HEAD(&osdc->req_notarget);
1784 INIT_LIST_HEAD(&osdc->req_linger);
1444 osdc->num_requests = 0; 1785 osdc->num_requests = 0;
1445 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1786 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1446 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1787 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1788 spin_lock_init(&osdc->event_lock);
1789 osdc->event_tree = RB_ROOT;
1790 osdc->event_count = 0;
1447 1791
1448 schedule_delayed_work(&osdc->osds_timeout_work, 1792 schedule_delayed_work(&osdc->osds_timeout_work,
1449 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1793 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1463,6 +1807,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1463 "osd_op_reply"); 1807 "osd_op_reply");
1464 if (err < 0) 1808 if (err < 0)
1465 goto out_msgpool; 1809 goto out_msgpool;
1810
1811 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1812 if (IS_ERR(osdc->notify_wq)) {
1813 err = PTR_ERR(osdc->notify_wq);
1814 osdc->notify_wq = NULL;
1815 goto out_msgpool;
1816 }
1466 return 0; 1817 return 0;
1467 1818
1468out_msgpool: 1819out_msgpool:
@@ -1476,6 +1827,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
1476 1827
1477void ceph_osdc_stop(struct ceph_osd_client *osdc) 1828void ceph_osdc_stop(struct ceph_osd_client *osdc)
1478{ 1829{
1830 flush_workqueue(osdc->notify_wq);
1831 destroy_workqueue(osdc->notify_wq);
1479 cancel_delayed_work_sync(&osdc->timeout_work); 1832 cancel_delayed_work_sync(&osdc->timeout_work);
1480 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1833 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1481 if (osdc->osdmap) { 1834 if (osdc->osdmap) {
@@ -1483,6 +1836,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1483 osdc->osdmap = NULL; 1836 osdc->osdmap = NULL;
1484 } 1837 }
1485 remove_old_osds(osdc, 1); 1838 remove_old_osds(osdc, 1);
1839 WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
1486 mempool_destroy(osdc->req_mempool); 1840 mempool_destroy(osdc->req_mempool);
1487 ceph_msgpool_destroy(&osdc->msgpool_op); 1841 ceph_msgpool_destroy(&osdc->msgpool_op);
1488 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1842 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1591,6 +1945,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1591 case CEPH_MSG_OSD_OPREPLY: 1945 case CEPH_MSG_OSD_OPREPLY:
1592 handle_reply(osdc, msg, con); 1946 handle_reply(osdc, msg, con);
1593 break; 1947 break;
1948 case CEPH_MSG_WATCH_NOTIFY:
1949 handle_watch_notify(osdc, msg);
1950 break;
1594 1951
1595 default: 1952 default:
1596 pr_err("received unknown message type %d %s\n", type, 1953 pr_err("received unknown message type %d %s\n", type,
@@ -1684,6 +2041,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1684 2041
1685 switch (type) { 2042 switch (type) {
1686 case CEPH_MSG_OSD_MAP: 2043 case CEPH_MSG_OSD_MAP:
2044 case CEPH_MSG_WATCH_NOTIFY:
1687 return ceph_msg_new(type, front, GFP_NOFS); 2045 return ceph_msg_new(type, front, GFP_NOFS);
1688 case CEPH_MSG_OSD_OPREPLY: 2046 case CEPH_MSG_OSD_OPREPLY:
1689 return get_reply(con, hdr, skip); 2047 return get_reply(con, hdr, skip);