aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/osd_client.h52
-rw-r--r--net/ceph/ceph_common.c1
-rw-r--r--net/ceph/osd_client.c385
3 files changed, 426 insertions, 12 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index e791b8e46353..f88eacb111d4 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -32,6 +32,7 @@ struct ceph_osd {
32 struct rb_node o_node; 32 struct rb_node o_node;
33 struct ceph_connection o_con; 33 struct ceph_connection o_con;
34 struct list_head o_requests; 34 struct list_head o_requests;
35 struct list_head o_linger_requests;
35 struct list_head o_osd_lru; 36 struct list_head o_osd_lru;
36 struct ceph_authorizer *o_authorizer; 37 struct ceph_authorizer *o_authorizer;
37 void *o_authorizer_buf, *o_authorizer_reply_buf; 38 void *o_authorizer_buf, *o_authorizer_reply_buf;
@@ -47,6 +48,8 @@ struct ceph_osd_request {
47 struct rb_node r_node; 48 struct rb_node r_node;
48 struct list_head r_req_lru_item; 49 struct list_head r_req_lru_item;
49 struct list_head r_osd_item; 50 struct list_head r_osd_item;
51 struct list_head r_linger_item;
52 struct list_head r_linger_osd;
50 struct ceph_osd *r_osd; 53 struct ceph_osd *r_osd;
51 struct ceph_pg r_pgid; 54 struct ceph_pg r_pgid;
52 int r_pg_osds[CEPH_PG_MAX_SIZE]; 55 int r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -59,6 +62,7 @@ struct ceph_osd_request {
59 int r_flags; /* any additional flags for the osd */ 62 int r_flags; /* any additional flags for the osd */
60 u32 r_sent; /* >0 if r_request is sending/sent */ 63 u32 r_sent; /* >0 if r_request is sending/sent */
61 int r_got_reply; 64 int r_got_reply;
65 int r_linger;
62 66
63 struct ceph_osd_client *r_osdc; 67 struct ceph_osd_client *r_osdc;
64 struct kref r_kref; 68 struct kref r_kref;
@@ -89,6 +93,26 @@ struct ceph_osd_request {
89 struct ceph_pagelist *r_trail; /* trailing part of the data */ 93 struct ceph_pagelist *r_trail; /* trailing part of the data */
90}; 94};
91 95
96struct ceph_osd_event {
97 u64 cookie;
98 int one_shot;
99 struct ceph_osd_client *osdc;
100 void (*cb)(u64, u64, u8, void *);
101 void *data;
102 struct rb_node node;
103 struct list_head osd_node;
104 struct kref kref;
105 struct completion completion;
106};
107
108struct ceph_osd_event_work {
109 struct work_struct work;
110 struct ceph_osd_event *event;
111 u64 ver;
112 u64 notify_id;
113 u8 opcode;
114};
115
92struct ceph_osd_client { 116struct ceph_osd_client {
93 struct ceph_client *client; 117 struct ceph_client *client;
94 118
@@ -106,6 +130,7 @@ struct ceph_osd_client {
106 struct list_head req_lru; /* in-flight lru */ 130 struct list_head req_lru; /* in-flight lru */
107 struct list_head req_unsent; /* unsent/need-resend queue */ 131 struct list_head req_unsent; /* unsent/need-resend queue */
108 struct list_head req_notarget; /* map to no osd */ 132 struct list_head req_notarget; /* map to no osd */
133 struct list_head req_linger; /* lingering requests */
109 int num_requests; 134 int num_requests;
110 struct delayed_work timeout_work; 135 struct delayed_work timeout_work;
111 struct delayed_work osds_timeout_work; 136 struct delayed_work osds_timeout_work;
@@ -117,6 +142,12 @@ struct ceph_osd_client {
117 142
118 struct ceph_msgpool msgpool_op; 143 struct ceph_msgpool msgpool_op;
119 struct ceph_msgpool msgpool_op_reply; 144 struct ceph_msgpool msgpool_op_reply;
145
146 spinlock_t event_lock;
147 struct rb_root event_tree;
148 u64 event_count;
149
150 struct workqueue_struct *notify_wq;
120}; 151};
121 152
122struct ceph_osd_req_op { 153struct ceph_osd_req_op {
@@ -151,6 +182,13 @@ struct ceph_osd_req_op {
151 struct { 182 struct {
152 u64 snapid; 183 u64 snapid;
153 } snap; 184 } snap;
185 struct {
186 u64 cookie;
187 u64 ver;
188 __u8 flag;
189 u32 prot_ver;
190 u32 timeout;
191 } watch;
154 }; 192 };
155 u32 payload_len; 193 u32 payload_len;
156}; 194};
@@ -199,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
199 bool use_mempool, int num_reply, 237 bool use_mempool, int num_reply,
200 int page_align); 238 int page_align);
201 239
240extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
241 struct ceph_osd_request *req);
242extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
243 struct ceph_osd_request *req);
244
202static inline void ceph_osdc_get_request(struct ceph_osd_request *req) 245static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
203{ 246{
204 kref_get(&req->r_kref); 247 kref_get(&req->r_kref);
@@ -234,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
234 struct page **pages, int nr_pages, 277 struct page **pages, int nr_pages,
235 int flags, int do_sync, bool nofail); 278 int flags, int do_sync, bool nofail);
236 279
280/* watch/notify events */
281extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
282 void (*event_cb)(u64, u64, u8, void *),
283 int one_shot, void *data,
284 struct ceph_osd_event **pevent);
285extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
286extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
287 unsigned long timeout);
288extern void ceph_osdc_put_event(struct ceph_osd_event *event);
237#endif 289#endif
238 290
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index f3e4a13fea0c..95f96ab94bba 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
62 case CEPH_MSG_OSD_MAP: return "osd_map"; 62 case CEPH_MSG_OSD_MAP: return "osd_map";
63 case CEPH_MSG_OSD_OP: return "osd_op"; 63 case CEPH_MSG_OSD_OP: return "osd_op";
64 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; 64 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
65 case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
65 default: return "unknown"; 66 default: return "unknown";
66 } 67 }
67} 68}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index b85ed5a5503d..02212ed50852 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,6 +25,12 @@ static const struct ceph_connection_operations osd_con_ops;
25 25
26static void send_queued(struct ceph_osd_client *osdc); 26static void send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req);
30static void __unregister_linger_request(struct ceph_osd_client *osdc,
31 struct ceph_osd_request *req);
32static int __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req);
28 34
29static int op_needs_trail(int op) 35static int op_needs_trail(int op)
30{ 36{
@@ -33,6 +39,7 @@ static int op_needs_trail(int op)
33 case CEPH_OSD_OP_SETXATTR: 39 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR: 40 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL: 41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
36 return 1; 43 return 1;
37 default: 44 default:
38 return 0; 45 return 0;
@@ -208,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
208 init_completion(&req->r_completion); 215 init_completion(&req->r_completion);
209 init_completion(&req->r_safe_completion); 216 init_completion(&req->r_safe_completion);
210 INIT_LIST_HEAD(&req->r_unsafe_item); 217 INIT_LIST_HEAD(&req->r_unsafe_item);
218 INIT_LIST_HEAD(&req->r_linger_item);
219 INIT_LIST_HEAD(&req->r_linger_osd);
211 req->r_flags = flags; 220 req->r_flags = flags;
212 221
213 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 222 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -314,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
314 break; 323 break;
315 case CEPH_OSD_OP_STARTSYNC: 324 case CEPH_OSD_OP_STARTSYNC:
316 break; 325 break;
326 case CEPH_OSD_OP_NOTIFY:
327 {
328 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
329 __le32 timeout = cpu_to_le32(src->watch.timeout);
330
331 BUG_ON(!req->r_trail);
332
333 ceph_pagelist_append(req->r_trail,
334 &prot_ver, sizeof(prot_ver));
335 ceph_pagelist_append(req->r_trail,
336 &timeout, sizeof(timeout));
337 }
338 case CEPH_OSD_OP_NOTIFY_ACK:
339 case CEPH_OSD_OP_WATCH:
340 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
341 dst->watch.ver = cpu_to_le64(src->watch.ver);
342 dst->watch.flag = src->watch.flag;
343 break;
317 default: 344 default:
318 pr_err("unrecognized osd opcode %d\n", dst->op); 345 pr_err("unrecognized osd opcode %d\n", dst->op);
319 WARN_ON(1); 346 WARN_ON(1);
@@ -534,7 +561,7 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
534static void __kick_osd_requests(struct ceph_osd_client *osdc, 561static void __kick_osd_requests(struct ceph_osd_client *osdc,
535 struct ceph_osd *osd) 562 struct ceph_osd *osd)
536{ 563{
537 struct ceph_osd_request *req; 564 struct ceph_osd_request *req, *nreq;
538 int err; 565 int err;
539 566
540 dout("__kick_osd_requests osd%d\n", osd->o_osd); 567 dout("__kick_osd_requests osd%d\n", osd->o_osd);
@@ -546,7 +573,17 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
546 list_move(&req->r_req_lru_item, &osdc->req_unsent); 573 list_move(&req->r_req_lru_item, &osdc->req_unsent);
547 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 574 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
548 osd->o_osd); 575 osd->o_osd);
549 req->r_flags |= CEPH_OSD_FLAG_RETRY; 576 if (!req->r_linger)
577 req->r_flags |= CEPH_OSD_FLAG_RETRY;
578 }
579
580 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
581 r_linger_osd) {
582 __unregister_linger_request(osdc, req);
583 __register_request(osdc, req);
584 list_move(&req->r_req_lru_item, &osdc->req_unsent);
585 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
586 osd->o_osd);
550 } 587 }
551} 588}
552 589
@@ -590,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
590 atomic_set(&osd->o_ref, 1); 627 atomic_set(&osd->o_ref, 1);
591 osd->o_osdc = osdc; 628 osd->o_osdc = osdc;
592 INIT_LIST_HEAD(&osd->o_requests); 629 INIT_LIST_HEAD(&osd->o_requests);
630 INIT_LIST_HEAD(&osd->o_linger_requests);
593 INIT_LIST_HEAD(&osd->o_osd_lru); 631 INIT_LIST_HEAD(&osd->o_osd_lru);
594 osd->o_incarnation = 1; 632 osd->o_incarnation = 1;
595 633
@@ -679,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
679 int ret = 0; 717 int ret = 0;
680 718
681 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 719 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
682 if (list_empty(&osd->o_requests)) { 720 if (list_empty(&osd->o_requests) &&
721 list_empty(&osd->o_linger_requests)) {
683 __remove_osd(osdc, osd); 722 __remove_osd(osdc, osd);
684 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 723 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
685 &osd->o_con.peer_addr, 724 &osd->o_con.peer_addr,
@@ -752,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
752 * Register request, assign tid. If this is the first request, set up 791 * Register request, assign tid. If this is the first request, set up
753 * the timeout event. 792 * the timeout event.
754 */ 793 */
755static void register_request(struct ceph_osd_client *osdc, 794static void __register_request(struct ceph_osd_client *osdc,
756 struct ceph_osd_request *req) 795 struct ceph_osd_request *req)
757{ 796{
758 mutex_lock(&osdc->request_mutex);
759 req->r_tid = ++osdc->last_tid; 797 req->r_tid = ++osdc->last_tid;
760 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 798 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
761 INIT_LIST_HEAD(&req->r_req_lru_item); 799 INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -769,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
769 dout(" first request, scheduling timeout\n"); 807 dout(" first request, scheduling timeout\n");
770 __schedule_osd_timeout(osdc); 808 __schedule_osd_timeout(osdc);
771 } 809 }
810}
811
812static void register_request(struct ceph_osd_client *osdc,
813 struct ceph_osd_request *req)
814{
815 mutex_lock(&osdc->request_mutex);
816 __register_request(osdc, req);
772 mutex_unlock(&osdc->request_mutex); 817 mutex_unlock(&osdc->request_mutex);
773} 818}
774 819
@@ -787,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
787 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 832 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
788 833
789 list_del_init(&req->r_osd_item); 834 list_del_init(&req->r_osd_item);
790 if (list_empty(&req->r_osd->o_requests)) 835 if (list_empty(&req->r_osd->o_requests) &&
836 list_empty(&req->r_osd->o_linger_requests)) {
837 dout("moving osd to %p lru\n", req->r_osd);
791 __move_osd_to_lru(osdc, req->r_osd); 838 __move_osd_to_lru(osdc, req->r_osd);
792 req->r_osd = NULL; 839 }
840 if (list_empty(&req->r_osd_item) &&
841 list_empty(&req->r_linger_item))
842 req->r_osd = NULL;
793 } 843 }
794 844
795 ceph_osdc_put_request(req); 845 ceph_osdc_put_request(req);
@@ -812,6 +862,58 @@ static void __cancel_request(struct ceph_osd_request *req)
812 } 862 }
813} 863}
814 864
865static void __register_linger_request(struct ceph_osd_client *osdc,
866 struct ceph_osd_request *req)
867{
868 dout("__register_linger_request %p\n", req);
869 list_add_tail(&req->r_linger_item, &osdc->req_linger);
870 list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
871}
872
873static void __unregister_linger_request(struct ceph_osd_client *osdc,
874 struct ceph_osd_request *req)
875{
876 dout("__unregister_linger_request %p\n", req);
877 if (req->r_osd) {
878 list_del_init(&req->r_linger_item);
879 list_del_init(&req->r_linger_osd);
880
881 if (list_empty(&req->r_osd->o_requests) &&
882 list_empty(&req->r_osd->o_linger_requests)) {
883 dout("moving osd to %p lru\n", req->r_osd);
884 __move_osd_to_lru(osdc, req->r_osd);
885 }
886 req->r_osd = NULL;
887 }
888}
889
890void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
891 struct ceph_osd_request *req)
892{
893 mutex_lock(&osdc->request_mutex);
894 if (req->r_linger) {
895 __unregister_linger_request(osdc, req);
896 ceph_osdc_put_request(req);
897 }
898 mutex_unlock(&osdc->request_mutex);
899}
900EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
901
902void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
903 struct ceph_osd_request *req)
904{
905 if (!req->r_linger) {
906 dout("set_request_linger %p\n", req);
907 req->r_linger = 1;
908 /*
909 * caller is now responsible for calling
910 * unregister_linger_request
911 */
912 ceph_osdc_get_request(req);
913 }
914}
915EXPORT_SYMBOL(ceph_osdc_set_request_linger);
916
815/* 917/*
816 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 918 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
817 * (as needed), and set the request r_osd appropriately. If there is 919 * (as needed), and set the request r_osd appropriately. If there is
@@ -958,7 +1060,6 @@ static void handle_timeout(struct work_struct *work)
958 osdc->client->options->osd_keepalive_timeout * HZ; 1060 osdc->client->options->osd_keepalive_timeout * HZ;
959 unsigned long last_stamp = 0; 1061 unsigned long last_stamp = 0;
960 struct list_head slow_osds; 1062 struct list_head slow_osds;
961
962 dout("timeout\n"); 1063 dout("timeout\n");
963 down_read(&osdc->map_sem); 1064 down_read(&osdc->map_sem);
964 1065
@@ -1060,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1060 numops * sizeof(struct ceph_osd_op)) 1161 numops * sizeof(struct ceph_osd_op))
1061 goto bad; 1162 goto bad;
1062 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1163 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1063
1064 /* lookup */ 1164 /* lookup */
1065 mutex_lock(&osdc->request_mutex); 1165 mutex_lock(&osdc->request_mutex);
1066 req = __lookup_request(osdc, tid); 1166 req = __lookup_request(osdc, tid);
@@ -1104,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1104 1204
1105 dout("handle_reply tid %llu flags %d\n", tid, flags); 1205 dout("handle_reply tid %llu flags %d\n", tid, flags);
1106 1206
1207 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1208 __register_linger_request(osdc, req);
1209
1107 /* either this is a read, or we got the safe response */ 1210 /* either this is a read, or we got the safe response */
1108 if (result < 0 || 1211 if (result < 0 ||
1109 (flags & CEPH_OSD_FLAG_ONDISK) || 1212 (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1124,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1124 } 1227 }
1125 1228
1126done: 1229done:
1230 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1127 ceph_osdc_put_request(req); 1231 ceph_osdc_put_request(req);
1128 return; 1232 return;
1129 1233
@@ -1159,7 +1263,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1159 */ 1263 */
1160static void kick_requests(struct ceph_osd_client *osdc) 1264static void kick_requests(struct ceph_osd_client *osdc)
1161{ 1265{
1162 struct ceph_osd_request *req; 1266 struct ceph_osd_request *req, *nreq;
1163 struct rb_node *p; 1267 struct rb_node *p;
1164 int needmap = 0; 1268 int needmap = 0;
1165 int err; 1269 int err;
@@ -1177,8 +1281,30 @@ static void kick_requests(struct ceph_osd_client *osdc)
1177 } else if (err > 0) { 1281 } else if (err > 0) {
1178 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, 1282 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1179 req->r_osd ? req->r_osd->o_osd : -1); 1283 req->r_osd ? req->r_osd->o_osd : -1);
1180 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1284 if (!req->r_linger)
1285 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1286 }
1287 }
1288
1289 list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1290 r_linger_item) {
1291 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1292
1293 err = __map_request(osdc, req);
1294 if (err == 0)
1295 continue; /* no change and no osd was specified */
1296 if (err < 0)
1297 continue; /* hrm! */
1298 if (req->r_osd == NULL) {
1299 dout("tid %llu maps to no valid osd\n", req->r_tid);
1300 needmap++; /* request a newer map */
1301 continue;
1181 } 1302 }
1303
1304 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1305 req->r_osd ? req->r_osd->o_osd : -1);
1306 __unregister_linger_request(osdc, req);
1307 __register_request(osdc, req);
1182 } 1308 }
1183 mutex_unlock(&osdc->request_mutex); 1309 mutex_unlock(&osdc->request_mutex);
1184 1310
@@ -1302,6 +1428,223 @@ bad:
1302} 1428}
1303 1429
1304/* 1430/*
1431 * watch/notify callback event infrastructure
1432 *
1433 * These callbacks are used both for watch and notify operations.
1434 */
1435static void __release_event(struct kref *kref)
1436{
1437 struct ceph_osd_event *event =
1438 container_of(kref, struct ceph_osd_event, kref);
1439
1440 dout("__release_event %p\n", event);
1441 kfree(event);
1442}
1443
1444static void get_event(struct ceph_osd_event *event)
1445{
1446 kref_get(&event->kref);
1447}
1448
1449void ceph_osdc_put_event(struct ceph_osd_event *event)
1450{
1451 kref_put(&event->kref, __release_event);
1452}
1453EXPORT_SYMBOL(ceph_osdc_put_event);
1454
1455static void __insert_event(struct ceph_osd_client *osdc,
1456 struct ceph_osd_event *new)
1457{
1458 struct rb_node **p = &osdc->event_tree.rb_node;
1459 struct rb_node *parent = NULL;
1460 struct ceph_osd_event *event = NULL;
1461
1462 while (*p) {
1463 parent = *p;
1464 event = rb_entry(parent, struct ceph_osd_event, node);
1465 if (new->cookie < event->cookie)
1466 p = &(*p)->rb_left;
1467 else if (new->cookie > event->cookie)
1468 p = &(*p)->rb_right;
1469 else
1470 BUG();
1471 }
1472
1473 rb_link_node(&new->node, parent, p);
1474 rb_insert_color(&new->node, &osdc->event_tree);
1475}
1476
1477static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1478 u64 cookie)
1479{
1480 struct rb_node **p = &osdc->event_tree.rb_node;
1481 struct rb_node *parent = NULL;
1482 struct ceph_osd_event *event = NULL;
1483
1484 while (*p) {
1485 parent = *p;
1486 event = rb_entry(parent, struct ceph_osd_event, node);
1487 if (cookie < event->cookie)
1488 p = &(*p)->rb_left;
1489 else if (cookie > event->cookie)
1490 p = &(*p)->rb_right;
1491 else
1492 return event;
1493 }
1494 return NULL;
1495}
1496
1497static void __remove_event(struct ceph_osd_event *event)
1498{
1499 struct ceph_osd_client *osdc = event->osdc;
1500
1501 if (!RB_EMPTY_NODE(&event->node)) {
1502 dout("__remove_event removed %p\n", event);
1503 rb_erase(&event->node, &osdc->event_tree);
1504 ceph_osdc_put_event(event);
1505 } else {
1506 dout("__remove_event didn't remove %p\n", event);
1507 }
1508}
1509
1510int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1511 void (*event_cb)(u64, u64, u8, void *),
1512 int one_shot, void *data,
1513 struct ceph_osd_event **pevent)
1514{
1515 struct ceph_osd_event *event;
1516
1517 event = kmalloc(sizeof(*event), GFP_NOIO);
1518 if (!event)
1519 return -ENOMEM;
1520
1521 dout("create_event %p\n", event);
1522 event->cb = event_cb;
1523 event->one_shot = one_shot;
1524 event->data = data;
1525 event->osdc = osdc;
1526 INIT_LIST_HEAD(&event->osd_node);
1527 kref_init(&event->kref); /* one ref for us */
1528 kref_get(&event->kref); /* one ref for the caller */
1529 init_completion(&event->completion);
1530
1531 spin_lock(&osdc->event_lock);
1532 event->cookie = ++osdc->event_count;
1533 __insert_event(osdc, event);
1534 spin_unlock(&osdc->event_lock);
1535
1536 *pevent = event;
1537 return 0;
1538}
1539EXPORT_SYMBOL(ceph_osdc_create_event);
1540
1541void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1542{
1543 struct ceph_osd_client *osdc = event->osdc;
1544
1545 dout("cancel_event %p\n", event);
1546 spin_lock(&osdc->event_lock);
1547 __remove_event(event);
1548 spin_unlock(&osdc->event_lock);
1549 ceph_osdc_put_event(event); /* caller's */
1550}
1551EXPORT_SYMBOL(ceph_osdc_cancel_event);
1552
1553
1554static void do_event_work(struct work_struct *work)
1555{
1556 struct ceph_osd_event_work *event_work =
1557 container_of(work, struct ceph_osd_event_work, work);
1558 struct ceph_osd_event *event = event_work->event;
1559 u64 ver = event_work->ver;
1560 u64 notify_id = event_work->notify_id;
1561 u8 opcode = event_work->opcode;
1562
1563 dout("do_event_work completing %p\n", event);
1564 event->cb(ver, notify_id, opcode, event->data);
1565 complete(&event->completion);
1566 dout("do_event_work completed %p\n", event);
1567 ceph_osdc_put_event(event);
1568 kfree(event_work);
1569}
1570
1571
1572/*
1573 * Process osd watch notifications
1574 */
1575void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1576{
1577 void *p, *end;
1578 u8 proto_ver;
1579 u64 cookie, ver, notify_id;
1580 u8 opcode;
1581 struct ceph_osd_event *event;
1582 struct ceph_osd_event_work *event_work;
1583
1584 p = msg->front.iov_base;
1585 end = p + msg->front.iov_len;
1586
1587 ceph_decode_8_safe(&p, end, proto_ver, bad);
1588 ceph_decode_8_safe(&p, end, opcode, bad);
1589 ceph_decode_64_safe(&p, end, cookie, bad);
1590 ceph_decode_64_safe(&p, end, ver, bad);
1591 ceph_decode_64_safe(&p, end, notify_id, bad);
1592
1593 spin_lock(&osdc->event_lock);
1594 event = __find_event(osdc, cookie);
1595 if (event) {
1596 get_event(event);
1597 if (event->one_shot)
1598 __remove_event(event);
1599 }
1600 spin_unlock(&osdc->event_lock);
1601 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1602 cookie, ver, event);
1603 if (event) {
1604 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1605 INIT_WORK(&event_work->work, do_event_work);
1606 if (!event_work) {
1607 dout("ERROR: could not allocate event_work\n");
1608 goto done_err;
1609 }
1610 event_work->event = event;
1611 event_work->ver = ver;
1612 event_work->notify_id = notify_id;
1613 event_work->opcode = opcode;
1614 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1615 dout("WARNING: failed to queue notify event work\n");
1616 goto done_err;
1617 }
1618 }
1619
1620 return;
1621
1622done_err:
1623 complete(&event->completion);
1624 ceph_osdc_put_event(event);
1625 return;
1626
1627bad:
1628 pr_err("osdc handle_watch_notify corrupt msg\n");
1629 return;
1630}
1631
1632int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1633{
1634 int err;
1635
1636 dout("wait_event %p\n", event);
1637 err = wait_for_completion_interruptible_timeout(&event->completion,
1638 timeout * HZ);
1639 ceph_osdc_put_event(event);
1640 if (err > 0)
1641 err = 0;
1642 dout("wait_event %p returns %d\n", event, err);
1643 return err;
1644}
1645EXPORT_SYMBOL(ceph_osdc_wait_event);
1646
1647/*
1305 * Register request, send initial attempt. 1648 * Register request, send initial attempt.
1306 */ 1649 */
1307int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1650int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -1430,9 +1773,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1430 INIT_LIST_HEAD(&osdc->req_lru); 1773 INIT_LIST_HEAD(&osdc->req_lru);
1431 INIT_LIST_HEAD(&osdc->req_unsent); 1774 INIT_LIST_HEAD(&osdc->req_unsent);
1432 INIT_LIST_HEAD(&osdc->req_notarget); 1775 INIT_LIST_HEAD(&osdc->req_notarget);
1776 INIT_LIST_HEAD(&osdc->req_linger);
1433 osdc->num_requests = 0; 1777 osdc->num_requests = 0;
1434 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1778 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1435 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1779 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1780 spin_lock_init(&osdc->event_lock);
1781 osdc->event_tree = RB_ROOT;
1782 osdc->event_count = 0;
1436 1783
1437 schedule_delayed_work(&osdc->osds_timeout_work, 1784 schedule_delayed_work(&osdc->osds_timeout_work,
1438 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1785 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1452,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1452 "osd_op_reply"); 1799 "osd_op_reply");
1453 if (err < 0) 1800 if (err < 0)
1454 goto out_msgpool; 1801 goto out_msgpool;
1802
1803 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1804 if (IS_ERR(osdc->notify_wq)) {
1805 err = PTR_ERR(osdc->notify_wq);
1806 osdc->notify_wq = NULL;
1807 goto out_msgpool;
1808 }
1455 return 0; 1809 return 0;
1456 1810
1457out_msgpool: 1811out_msgpool:
@@ -1465,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
1465 1819
1466void ceph_osdc_stop(struct ceph_osd_client *osdc) 1820void ceph_osdc_stop(struct ceph_osd_client *osdc)
1467{ 1821{
1822 flush_workqueue(osdc->notify_wq);
1823 destroy_workqueue(osdc->notify_wq);
1468 cancel_delayed_work_sync(&osdc->timeout_work); 1824 cancel_delayed_work_sync(&osdc->timeout_work);
1469 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1825 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1470 if (osdc->osdmap) { 1826 if (osdc->osdmap) {
@@ -1472,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1472 osdc->osdmap = NULL; 1828 osdc->osdmap = NULL;
1473 } 1829 }
1474 remove_old_osds(osdc, 1); 1830 remove_old_osds(osdc, 1);
1831 WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
1475 mempool_destroy(osdc->req_mempool); 1832 mempool_destroy(osdc->req_mempool);
1476 ceph_msgpool_destroy(&osdc->msgpool_op); 1833 ceph_msgpool_destroy(&osdc->msgpool_op);
1477 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1834 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1580,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1580 case CEPH_MSG_OSD_OPREPLY: 1937 case CEPH_MSG_OSD_OPREPLY:
1581 handle_reply(osdc, msg, con); 1938 handle_reply(osdc, msg, con);
1582 break; 1939 break;
1940 case CEPH_MSG_WATCH_NOTIFY:
1941 handle_watch_notify(osdc, msg);
1942 break;
1583 1943
1584 default: 1944 default:
1585 pr_err("received unknown message type %d %s\n", type, 1945 pr_err("received unknown message type %d %s\n", type,
@@ -1673,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1673 2033
1674 switch (type) { 2034 switch (type) {
1675 case CEPH_MSG_OSD_MAP: 2035 case CEPH_MSG_OSD_MAP:
2036 case CEPH_MSG_WATCH_NOTIFY:
1676 return ceph_msg_new(type, front, GFP_NOFS); 2037 return ceph_msg_new(type, front, GFP_NOFS);
1677 case CEPH_MSG_OSD_OPREPLY: 2038 case CEPH_MSG_OSD_OPREPLY:
1678 return get_reply(con, hdr, skip); 2039 return get_reply(con, hdr, skip);