aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2011-03-21 18:07:16 -0400
committerSage Weil <sage@newdream.net>2011-03-22 14:33:55 -0400
commita40c4f10e3fb96030358e49abd010c1f08446fa3 (patch)
tree1aa1f6ca618cd021d944f7da7caeb5b182beaee4 /net
parent55b00bae111030bd0dfcc898a920e54725aed1bf (diff)
libceph: add lingering request and watch/notify event framework
Lingering requests are requests that are sent to the OSD normally but tracked also after we get a successful request. This keeps the OSD connection open and resends the original request if the object moves to another OSD. The OSD can then send notification messages back to us if another client initiates a notify. This framework will be used by RBD so that the client gets notification when a snapshot is created by another node or tool. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/ceph_common.c1
-rw-r--r--net/ceph/osd_client.c385
2 files changed, 374 insertions, 12 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index f3e4a13fea0c..95f96ab94bba 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
62 case CEPH_MSG_OSD_MAP: return "osd_map"; 62 case CEPH_MSG_OSD_MAP: return "osd_map";
63 case CEPH_MSG_OSD_OP: return "osd_op"; 63 case CEPH_MSG_OSD_OP: return "osd_op";
64 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; 64 case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
65 case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
65 default: return "unknown"; 66 default: return "unknown";
66 } 67 }
67} 68}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index b85ed5a5503d..02212ed50852 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,6 +25,12 @@ static const struct ceph_connection_operations osd_con_ops;
25 25
26static void send_queued(struct ceph_osd_client *osdc); 26static void send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req);
30static void __unregister_linger_request(struct ceph_osd_client *osdc,
31 struct ceph_osd_request *req);
32static int __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req);
28 34
29static int op_needs_trail(int op) 35static int op_needs_trail(int op)
30{ 36{
@@ -33,6 +39,7 @@ static int op_needs_trail(int op)
33 case CEPH_OSD_OP_SETXATTR: 39 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR: 40 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL: 41 case CEPH_OSD_OP_CALL:
42 case CEPH_OSD_OP_NOTIFY:
36 return 1; 43 return 1;
37 default: 44 default:
38 return 0; 45 return 0;
@@ -208,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
208 init_completion(&req->r_completion); 215 init_completion(&req->r_completion);
209 init_completion(&req->r_safe_completion); 216 init_completion(&req->r_safe_completion);
210 INIT_LIST_HEAD(&req->r_unsafe_item); 217 INIT_LIST_HEAD(&req->r_unsafe_item);
218 INIT_LIST_HEAD(&req->r_linger_item);
219 INIT_LIST_HEAD(&req->r_linger_osd);
211 req->r_flags = flags; 220 req->r_flags = flags;
212 221
213 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 222 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -314,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
314 break; 323 break;
315 case CEPH_OSD_OP_STARTSYNC: 324 case CEPH_OSD_OP_STARTSYNC:
316 break; 325 break;
326 case CEPH_OSD_OP_NOTIFY:
327 {
328 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
329 __le32 timeout = cpu_to_le32(src->watch.timeout);
330
331 BUG_ON(!req->r_trail);
332
333 ceph_pagelist_append(req->r_trail,
334 &prot_ver, sizeof(prot_ver));
335 ceph_pagelist_append(req->r_trail,
336 &timeout, sizeof(timeout));
337 }
338 case CEPH_OSD_OP_NOTIFY_ACK:
339 case CEPH_OSD_OP_WATCH:
340 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
341 dst->watch.ver = cpu_to_le64(src->watch.ver);
342 dst->watch.flag = src->watch.flag;
343 break;
317 default: 344 default:
318 pr_err("unrecognized osd opcode %d\n", dst->op); 345 pr_err("unrecognized osd opcode %d\n", dst->op);
319 WARN_ON(1); 346 WARN_ON(1);
@@ -534,7 +561,7 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
534static void __kick_osd_requests(struct ceph_osd_client *osdc, 561static void __kick_osd_requests(struct ceph_osd_client *osdc,
535 struct ceph_osd *osd) 562 struct ceph_osd *osd)
536{ 563{
537 struct ceph_osd_request *req; 564 struct ceph_osd_request *req, *nreq;
538 int err; 565 int err;
539 566
540 dout("__kick_osd_requests osd%d\n", osd->o_osd); 567 dout("__kick_osd_requests osd%d\n", osd->o_osd);
@@ -546,7 +573,17 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
546 list_move(&req->r_req_lru_item, &osdc->req_unsent); 573 list_move(&req->r_req_lru_item, &osdc->req_unsent);
547 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 574 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
548 osd->o_osd); 575 osd->o_osd);
549 req->r_flags |= CEPH_OSD_FLAG_RETRY; 576 if (!req->r_linger)
577 req->r_flags |= CEPH_OSD_FLAG_RETRY;
578 }
579
580 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
581 r_linger_osd) {
582 __unregister_linger_request(osdc, req);
583 __register_request(osdc, req);
584 list_move(&req->r_req_lru_item, &osdc->req_unsent);
585 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
586 osd->o_osd);
550 } 587 }
551} 588}
552 589
@@ -590,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
590 atomic_set(&osd->o_ref, 1); 627 atomic_set(&osd->o_ref, 1);
591 osd->o_osdc = osdc; 628 osd->o_osdc = osdc;
592 INIT_LIST_HEAD(&osd->o_requests); 629 INIT_LIST_HEAD(&osd->o_requests);
630 INIT_LIST_HEAD(&osd->o_linger_requests);
593 INIT_LIST_HEAD(&osd->o_osd_lru); 631 INIT_LIST_HEAD(&osd->o_osd_lru);
594 osd->o_incarnation = 1; 632 osd->o_incarnation = 1;
595 633
@@ -679,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
679 int ret = 0; 717 int ret = 0;
680 718
681 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 719 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
682 if (list_empty(&osd->o_requests)) { 720 if (list_empty(&osd->o_requests) &&
721 list_empty(&osd->o_linger_requests)) {
683 __remove_osd(osdc, osd); 722 __remove_osd(osdc, osd);
684 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 723 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
685 &osd->o_con.peer_addr, 724 &osd->o_con.peer_addr,
@@ -752,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
752 * Register request, assign tid. If this is the first request, set up 791 * Register request, assign tid. If this is the first request, set up
753 * the timeout event. 792 * the timeout event.
754 */ 793 */
755static void register_request(struct ceph_osd_client *osdc, 794static void __register_request(struct ceph_osd_client *osdc,
756 struct ceph_osd_request *req) 795 struct ceph_osd_request *req)
757{ 796{
758 mutex_lock(&osdc->request_mutex);
759 req->r_tid = ++osdc->last_tid; 797 req->r_tid = ++osdc->last_tid;
760 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 798 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
761 INIT_LIST_HEAD(&req->r_req_lru_item); 799 INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -769,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
769 dout(" first request, scheduling timeout\n"); 807 dout(" first request, scheduling timeout\n");
770 __schedule_osd_timeout(osdc); 808 __schedule_osd_timeout(osdc);
771 } 809 }
810}
811
812static void register_request(struct ceph_osd_client *osdc,
813 struct ceph_osd_request *req)
814{
815 mutex_lock(&osdc->request_mutex);
816 __register_request(osdc, req);
772 mutex_unlock(&osdc->request_mutex); 817 mutex_unlock(&osdc->request_mutex);
773} 818}
774 819
@@ -787,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
787 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 832 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
788 833
789 list_del_init(&req->r_osd_item); 834 list_del_init(&req->r_osd_item);
790 if (list_empty(&req->r_osd->o_requests)) 835 if (list_empty(&req->r_osd->o_requests) &&
836 list_empty(&req->r_osd->o_linger_requests)) {
837 dout("moving osd to %p lru\n", req->r_osd);
791 __move_osd_to_lru(osdc, req->r_osd); 838 __move_osd_to_lru(osdc, req->r_osd);
792 req->r_osd = NULL; 839 }
840 if (list_empty(&req->r_osd_item) &&
841 list_empty(&req->r_linger_item))
842 req->r_osd = NULL;
793 } 843 }
794 844
795 ceph_osdc_put_request(req); 845 ceph_osdc_put_request(req);
@@ -812,6 +862,58 @@ static void __cancel_request(struct ceph_osd_request *req)
812 } 862 }
813} 863}
814 864
865static void __register_linger_request(struct ceph_osd_client *osdc,
866 struct ceph_osd_request *req)
867{
868 dout("__register_linger_request %p\n", req);
869 list_add_tail(&req->r_linger_item, &osdc->req_linger);
870 list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
871}
872
873static void __unregister_linger_request(struct ceph_osd_client *osdc,
874 struct ceph_osd_request *req)
875{
876 dout("__unregister_linger_request %p\n", req);
877 if (req->r_osd) {
878 list_del_init(&req->r_linger_item);
879 list_del_init(&req->r_linger_osd);
880
881 if (list_empty(&req->r_osd->o_requests) &&
882 list_empty(&req->r_osd->o_linger_requests)) {
883 dout("moving osd to %p lru\n", req->r_osd);
884 __move_osd_to_lru(osdc, req->r_osd);
885 }
886 req->r_osd = NULL;
887 }
888}
889
890void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
891 struct ceph_osd_request *req)
892{
893 mutex_lock(&osdc->request_mutex);
894 if (req->r_linger) {
895 __unregister_linger_request(osdc, req);
896 ceph_osdc_put_request(req);
897 }
898 mutex_unlock(&osdc->request_mutex);
899}
900EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
901
902void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
903 struct ceph_osd_request *req)
904{
905 if (!req->r_linger) {
906 dout("set_request_linger %p\n", req);
907 req->r_linger = 1;
908 /*
909 * caller is now responsible for calling
910 * unregister_linger_request
911 */
912 ceph_osdc_get_request(req);
913 }
914}
915EXPORT_SYMBOL(ceph_osdc_set_request_linger);
916
815/* 917/*
816 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 918 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
817 * (as needed), and set the request r_osd appropriately. If there is 919 * (as needed), and set the request r_osd appropriately. If there is
@@ -958,7 +1060,6 @@ static void handle_timeout(struct work_struct *work)
958 osdc->client->options->osd_keepalive_timeout * HZ; 1060 osdc->client->options->osd_keepalive_timeout * HZ;
959 unsigned long last_stamp = 0; 1061 unsigned long last_stamp = 0;
960 struct list_head slow_osds; 1062 struct list_head slow_osds;
961
962 dout("timeout\n"); 1063 dout("timeout\n");
963 down_read(&osdc->map_sem); 1064 down_read(&osdc->map_sem);
964 1065
@@ -1060,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1060 numops * sizeof(struct ceph_osd_op)) 1161 numops * sizeof(struct ceph_osd_op))
1061 goto bad; 1162 goto bad;
1062 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1163 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1063
1064 /* lookup */ 1164 /* lookup */
1065 mutex_lock(&osdc->request_mutex); 1165 mutex_lock(&osdc->request_mutex);
1066 req = __lookup_request(osdc, tid); 1166 req = __lookup_request(osdc, tid);
@@ -1104,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1104 1204
1105 dout("handle_reply tid %llu flags %d\n", tid, flags); 1205 dout("handle_reply tid %llu flags %d\n", tid, flags);
1106 1206
1207 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1208 __register_linger_request(osdc, req);
1209
1107 /* either this is a read, or we got the safe response */ 1210 /* either this is a read, or we got the safe response */
1108 if (result < 0 || 1211 if (result < 0 ||
1109 (flags & CEPH_OSD_FLAG_ONDISK) || 1212 (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1124,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1124 } 1227 }
1125 1228
1126done: 1229done:
1230 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1127 ceph_osdc_put_request(req); 1231 ceph_osdc_put_request(req);
1128 return; 1232 return;
1129 1233
@@ -1159,7 +1263,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1159 */ 1263 */
1160static void kick_requests(struct ceph_osd_client *osdc) 1264static void kick_requests(struct ceph_osd_client *osdc)
1161{ 1265{
1162 struct ceph_osd_request *req; 1266 struct ceph_osd_request *req, *nreq;
1163 struct rb_node *p; 1267 struct rb_node *p;
1164 int needmap = 0; 1268 int needmap = 0;
1165 int err; 1269 int err;
@@ -1177,8 +1281,30 @@ static void kick_requests(struct ceph_osd_client *osdc)
1177 } else if (err > 0) { 1281 } else if (err > 0) {
1178 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, 1282 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1179 req->r_osd ? req->r_osd->o_osd : -1); 1283 req->r_osd ? req->r_osd->o_osd : -1);
1180 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1284 if (!req->r_linger)
1285 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1286 }
1287 }
1288
1289 list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1290 r_linger_item) {
1291 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1292
1293 err = __map_request(osdc, req);
1294 if (err == 0)
1295 continue; /* no change and no osd was specified */
1296 if (err < 0)
1297 continue; /* hrm! */
1298 if (req->r_osd == NULL) {
1299 dout("tid %llu maps to no valid osd\n", req->r_tid);
1300 needmap++; /* request a newer map */
1301 continue;
1181 } 1302 }
1303
1304 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1305 req->r_osd ? req->r_osd->o_osd : -1);
1306 __unregister_linger_request(osdc, req);
1307 __register_request(osdc, req);
1182 } 1308 }
1183 mutex_unlock(&osdc->request_mutex); 1309 mutex_unlock(&osdc->request_mutex);
1184 1310
@@ -1302,6 +1428,223 @@ bad:
1302} 1428}
1303 1429
1304/* 1430/*
1431 * watch/notify callback event infrastructure
1432 *
1433 * These callbacks are used both for watch and notify operations.
1434 */
1435static void __release_event(struct kref *kref)
1436{
1437 struct ceph_osd_event *event =
1438 container_of(kref, struct ceph_osd_event, kref);
1439
1440 dout("__release_event %p\n", event);
1441 kfree(event);
1442}
1443
1444static void get_event(struct ceph_osd_event *event)
1445{
1446 kref_get(&event->kref);
1447}
1448
1449void ceph_osdc_put_event(struct ceph_osd_event *event)
1450{
1451 kref_put(&event->kref, __release_event);
1452}
1453EXPORT_SYMBOL(ceph_osdc_put_event);
1454
1455static void __insert_event(struct ceph_osd_client *osdc,
1456 struct ceph_osd_event *new)
1457{
1458 struct rb_node **p = &osdc->event_tree.rb_node;
1459 struct rb_node *parent = NULL;
1460 struct ceph_osd_event *event = NULL;
1461
1462 while (*p) {
1463 parent = *p;
1464 event = rb_entry(parent, struct ceph_osd_event, node);
1465 if (new->cookie < event->cookie)
1466 p = &(*p)->rb_left;
1467 else if (new->cookie > event->cookie)
1468 p = &(*p)->rb_right;
1469 else
1470 BUG();
1471 }
1472
1473 rb_link_node(&new->node, parent, p);
1474 rb_insert_color(&new->node, &osdc->event_tree);
1475}
1476
1477static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1478 u64 cookie)
1479{
1480 struct rb_node **p = &osdc->event_tree.rb_node;
1481 struct rb_node *parent = NULL;
1482 struct ceph_osd_event *event = NULL;
1483
1484 while (*p) {
1485 parent = *p;
1486 event = rb_entry(parent, struct ceph_osd_event, node);
1487 if (cookie < event->cookie)
1488 p = &(*p)->rb_left;
1489 else if (cookie > event->cookie)
1490 p = &(*p)->rb_right;
1491 else
1492 return event;
1493 }
1494 return NULL;
1495}
1496
1497static void __remove_event(struct ceph_osd_event *event)
1498{
1499 struct ceph_osd_client *osdc = event->osdc;
1500
1501 if (!RB_EMPTY_NODE(&event->node)) {
1502 dout("__remove_event removed %p\n", event);
1503 rb_erase(&event->node, &osdc->event_tree);
1504 ceph_osdc_put_event(event);
1505 } else {
1506 dout("__remove_event didn't remove %p\n", event);
1507 }
1508}
1509
1510int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1511 void (*event_cb)(u64, u64, u8, void *),
1512 int one_shot, void *data,
1513 struct ceph_osd_event **pevent)
1514{
1515 struct ceph_osd_event *event;
1516
1517 event = kmalloc(sizeof(*event), GFP_NOIO);
1518 if (!event)
1519 return -ENOMEM;
1520
1521 dout("create_event %p\n", event);
1522 event->cb = event_cb;
1523 event->one_shot = one_shot;
1524 event->data = data;
1525 event->osdc = osdc;
1526 INIT_LIST_HEAD(&event->osd_node);
1527 kref_init(&event->kref); /* one ref for us */
1528 kref_get(&event->kref); /* one ref for the caller */
1529 init_completion(&event->completion);
1530
1531 spin_lock(&osdc->event_lock);
1532 event->cookie = ++osdc->event_count;
1533 __insert_event(osdc, event);
1534 spin_unlock(&osdc->event_lock);
1535
1536 *pevent = event;
1537 return 0;
1538}
1539EXPORT_SYMBOL(ceph_osdc_create_event);
1540
1541void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1542{
1543 struct ceph_osd_client *osdc = event->osdc;
1544
1545 dout("cancel_event %p\n", event);
1546 spin_lock(&osdc->event_lock);
1547 __remove_event(event);
1548 spin_unlock(&osdc->event_lock);
1549 ceph_osdc_put_event(event); /* caller's */
1550}
1551EXPORT_SYMBOL(ceph_osdc_cancel_event);
1552
1553
1554static void do_event_work(struct work_struct *work)
1555{
1556 struct ceph_osd_event_work *event_work =
1557 container_of(work, struct ceph_osd_event_work, work);
1558 struct ceph_osd_event *event = event_work->event;
1559 u64 ver = event_work->ver;
1560 u64 notify_id = event_work->notify_id;
1561 u8 opcode = event_work->opcode;
1562
1563 dout("do_event_work completing %p\n", event);
1564 event->cb(ver, notify_id, opcode, event->data);
1565 complete(&event->completion);
1566 dout("do_event_work completed %p\n", event);
1567 ceph_osdc_put_event(event);
1568 kfree(event_work);
1569}
1570
1571
1572/*
1573 * Process osd watch notifications
1574 */
1575void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1576{
1577 void *p, *end;
1578 u8 proto_ver;
1579 u64 cookie, ver, notify_id;
1580 u8 opcode;
1581 struct ceph_osd_event *event;
1582 struct ceph_osd_event_work *event_work;
1583
1584 p = msg->front.iov_base;
1585 end = p + msg->front.iov_len;
1586
1587 ceph_decode_8_safe(&p, end, proto_ver, bad);
1588 ceph_decode_8_safe(&p, end, opcode, bad);
1589 ceph_decode_64_safe(&p, end, cookie, bad);
1590 ceph_decode_64_safe(&p, end, ver, bad);
1591 ceph_decode_64_safe(&p, end, notify_id, bad);
1592
1593 spin_lock(&osdc->event_lock);
1594 event = __find_event(osdc, cookie);
1595 if (event) {
1596 get_event(event);
1597 if (event->one_shot)
1598 __remove_event(event);
1599 }
1600 spin_unlock(&osdc->event_lock);
1601 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1602 cookie, ver, event);
1603 if (event) {
1604 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1605 INIT_WORK(&event_work->work, do_event_work);
1606 if (!event_work) {
1607 dout("ERROR: could not allocate event_work\n");
1608 goto done_err;
1609 }
1610 event_work->event = event;
1611 event_work->ver = ver;
1612 event_work->notify_id = notify_id;
1613 event_work->opcode = opcode;
1614 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1615 dout("WARNING: failed to queue notify event work\n");
1616 goto done_err;
1617 }
1618 }
1619
1620 return;
1621
1622done_err:
1623 complete(&event->completion);
1624 ceph_osdc_put_event(event);
1625 return;
1626
1627bad:
1628 pr_err("osdc handle_watch_notify corrupt msg\n");
1629 return;
1630}
1631
1632int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1633{
1634 int err;
1635
1636 dout("wait_event %p\n", event);
1637 err = wait_for_completion_interruptible_timeout(&event->completion,
1638 timeout * HZ);
1639 ceph_osdc_put_event(event);
1640 if (err > 0)
1641 err = 0;
1642 dout("wait_event %p returns %d\n", event, err);
1643 return err;
1644}
1645EXPORT_SYMBOL(ceph_osdc_wait_event);
1646
1647/*
1305 * Register request, send initial attempt. 1648 * Register request, send initial attempt.
1306 */ 1649 */
1307int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1650int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -1430,9 +1773,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1430 INIT_LIST_HEAD(&osdc->req_lru); 1773 INIT_LIST_HEAD(&osdc->req_lru);
1431 INIT_LIST_HEAD(&osdc->req_unsent); 1774 INIT_LIST_HEAD(&osdc->req_unsent);
1432 INIT_LIST_HEAD(&osdc->req_notarget); 1775 INIT_LIST_HEAD(&osdc->req_notarget);
1776 INIT_LIST_HEAD(&osdc->req_linger);
1433 osdc->num_requests = 0; 1777 osdc->num_requests = 0;
1434 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1778 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1435 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1779 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1780 spin_lock_init(&osdc->event_lock);
1781 osdc->event_tree = RB_ROOT;
1782 osdc->event_count = 0;
1436 1783
1437 schedule_delayed_work(&osdc->osds_timeout_work, 1784 schedule_delayed_work(&osdc->osds_timeout_work,
1438 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1785 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1452,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1452 "osd_op_reply"); 1799 "osd_op_reply");
1453 if (err < 0) 1800 if (err < 0)
1454 goto out_msgpool; 1801 goto out_msgpool;
1802
1803 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1804 if (IS_ERR(osdc->notify_wq)) {
1805 err = PTR_ERR(osdc->notify_wq);
1806 osdc->notify_wq = NULL;
1807 goto out_msgpool;
1808 }
1455 return 0; 1809 return 0;
1456 1810
1457out_msgpool: 1811out_msgpool:
@@ -1465,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
1465 1819
1466void ceph_osdc_stop(struct ceph_osd_client *osdc) 1820void ceph_osdc_stop(struct ceph_osd_client *osdc)
1467{ 1821{
1822 flush_workqueue(osdc->notify_wq);
1823 destroy_workqueue(osdc->notify_wq);
1468 cancel_delayed_work_sync(&osdc->timeout_work); 1824 cancel_delayed_work_sync(&osdc->timeout_work);
1469 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1825 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1470 if (osdc->osdmap) { 1826 if (osdc->osdmap) {
@@ -1472,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
1472 osdc->osdmap = NULL; 1828 osdc->osdmap = NULL;
1473 } 1829 }
1474 remove_old_osds(osdc, 1); 1830 remove_old_osds(osdc, 1);
1831 WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
1475 mempool_destroy(osdc->req_mempool); 1832 mempool_destroy(osdc->req_mempool);
1476 ceph_msgpool_destroy(&osdc->msgpool_op); 1833 ceph_msgpool_destroy(&osdc->msgpool_op);
1477 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1834 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1580,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1580 case CEPH_MSG_OSD_OPREPLY: 1937 case CEPH_MSG_OSD_OPREPLY:
1581 handle_reply(osdc, msg, con); 1938 handle_reply(osdc, msg, con);
1582 break; 1939 break;
1940 case CEPH_MSG_WATCH_NOTIFY:
1941 handle_watch_notify(osdc, msg);
1942 break;
1583 1943
1584 default: 1944 default:
1585 pr_err("received unknown message type %d %s\n", type, 1945 pr_err("received unknown message type %d %s\n", type,
@@ -1673,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1673 2033
1674 switch (type) { 2034 switch (type) {
1675 case CEPH_MSG_OSD_MAP: 2035 case CEPH_MSG_OSD_MAP:
2036 case CEPH_MSG_WATCH_NOTIFY:
1676 return ceph_msg_new(type, front, GFP_NOFS); 2037 return ceph_msg_new(type, front, GFP_NOFS);
1677 case CEPH_MSG_OSD_OPREPLY: 2038 case CEPH_MSG_OSD_OPREPLY:
1678 return get_reply(con, hdr, skip); 2039 return get_reply(con, hdr, skip);