aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-05-25 19:15:02 -0400
committerIlya Dryomov <idryomov@gmail.com>2016-05-25 19:15:02 -0400
commit922dab6134178cae317ae00de86376cba59f3147 (patch)
treea7047a5950b6a8505cc1e6852e4532656064fede /net
parentc525f03601f52c83ded046624138f2a45e0ba56c (diff)
libceph, rbd: ceph_osd_linger_request, watch/notify v2
This adds support and switches rbd to a new, more reliable version of watch/notify protocol. As with the OSD client update, this is mostly about getting the right structures linked into the right places so that reconnects are properly sent when needed. watch/notify v2 also requires sending regular pings to the OSDs - send_linger_ping(). A major change from the old watch/notify implementation is the introduction of ceph_osd_linger_request - linger requests no longer piggy back on ceph_osd_request. ceph_osd_event has been merged into ceph_osd_linger_request. All the details are now hidden within libceph, the interface consists of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack(). ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep the lifetime management simple. ceph_osdc_notify_ack() accepts an optional data payload, which is relayed back to the notifier. Portions of this patch are loosely based on work by Douglas Fuller <dfuller@redhat.com> and Mike Christie <michaelc@cs.wisc.edu>. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/ceph_strings.c16
-rw-r--r--net/ceph/debugfs.c36
-rw-r--r--net/ceph/osd_client.c1148
3 files changed, 951 insertions, 249 deletions
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 139a9cb19b0c..3773a4fa11e3 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -27,6 +27,22 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
27 } 27 }
28} 28}
29 29
30const char *ceph_osd_watch_op_name(int o)
31{
32 switch (o) {
33 case CEPH_OSD_WATCH_OP_UNWATCH:
34 return "unwatch";
35 case CEPH_OSD_WATCH_OP_WATCH:
36 return "watch";
37 case CEPH_OSD_WATCH_OP_RECONNECT:
38 return "reconnect";
39 case CEPH_OSD_WATCH_OP_PING:
40 return "ping";
41 default:
42 return "???";
43 }
44}
45
30const char *ceph_osd_state_name(int s) 46const char *ceph_osd_state_name(int s)
31{ 47{
32 switch (s) { 48 switch (s) {
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 61dbd9de4650..e64cb8583533 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -177,6 +177,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
177 177
178 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), 178 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
179 ceph_osd_op_name(op->op)); 179 ceph_osd_op_name(op->op));
180 if (op->op == CEPH_OSD_OP_WATCH)
181 seq_printf(s, "-%s",
182 ceph_osd_watch_op_name(op->watch.op));
180 } 183 }
181 184
182 seq_putc(s, '\n'); 185 seq_putc(s, '\n');
@@ -197,6 +200,31 @@ static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
197 mutex_unlock(&osd->lock); 200 mutex_unlock(&osd->lock);
198} 201}
199 202
203static void dump_linger_request(struct seq_file *s,
204 struct ceph_osd_linger_request *lreq)
205{
206 seq_printf(s, "%llu\t", lreq->linger_id);
207 dump_target(s, &lreq->t);
208
209 seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen,
210 lreq->committed ? "C" : "", lreq->last_error);
211}
212
213static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
214{
215 struct rb_node *n;
216
217 mutex_lock(&osd->lock);
218 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
219 struct ceph_osd_linger_request *lreq =
220 rb_entry(n, struct ceph_osd_linger_request, node);
221
222 dump_linger_request(s, lreq);
223 }
224
225 mutex_unlock(&osd->lock);
226}
227
200static int osdc_show(struct seq_file *s, void *pp) 228static int osdc_show(struct seq_file *s, void *pp)
201{ 229{
202 struct ceph_client *client = s->private; 230 struct ceph_client *client = s->private;
@@ -214,6 +242,14 @@ static int osdc_show(struct seq_file *s, void *pp)
214 } 242 }
215 dump_requests(s, &osdc->homeless_osd); 243 dump_requests(s, &osdc->homeless_osd);
216 244
245 seq_puts(s, "LINGER REQUESTS\n");
246 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
247 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
248
249 dump_linger_requests(s, osd);
250 }
251 dump_linger_requests(s, &osdc->homeless_osd);
252
217 up_read(&osdc->lock); 253 up_read(&osdc->lock);
218 return 0; 254 return 0;
219} 255}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ef1bcbe9af2d..ca0a7b58ba4f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -45,6 +45,10 @@ static const struct ceph_connection_operations osd_con_ops;
45 45
46static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 46static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
47static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 47static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
48static void link_linger(struct ceph_osd *osd,
49 struct ceph_osd_linger_request *lreq);
50static void unlink_linger(struct ceph_osd *osd,
51 struct ceph_osd_linger_request *lreq);
48 52
49#if 1 53#if 1
50static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 54static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
@@ -74,10 +78,15 @@ static inline void verify_osd_locked(struct ceph_osd *osd)
74 rwsem_is_locked(&osdc->lock)) && 78 rwsem_is_locked(&osdc->lock)) &&
75 !rwsem_is_wrlocked(&osdc->lock)); 79 !rwsem_is_wrlocked(&osdc->lock));
76} 80}
81static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
82{
83 WARN_ON(!mutex_is_locked(&lreq->lock));
84}
77#else 85#else
78static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 86static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
79static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 87static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
80static inline void verify_osd_locked(struct ceph_osd *osd) { } 88static inline void verify_osd_locked(struct ceph_osd *osd) { }
89static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
81#endif 90#endif
82 91
83/* 92/*
@@ -322,6 +331,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
322 case CEPH_OSD_OP_STAT: 331 case CEPH_OSD_OP_STAT:
323 ceph_osd_data_release(&op->raw_data_in); 332 ceph_osd_data_release(&op->raw_data_in);
324 break; 333 break;
334 case CEPH_OSD_OP_NOTIFY_ACK:
335 ceph_osd_data_release(&op->notify_ack.request_data);
336 break;
325 default: 337 default:
326 break; 338 break;
327 } 339 }
@@ -345,6 +357,29 @@ static void target_init(struct ceph_osd_request_target *t)
345 t->osd = CEPH_HOMELESS_OSD; 357 t->osd = CEPH_HOMELESS_OSD;
346} 358}
347 359
360static void target_copy(struct ceph_osd_request_target *dest,
361 const struct ceph_osd_request_target *src)
362{
363 ceph_oid_copy(&dest->base_oid, &src->base_oid);
364 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
365 ceph_oid_copy(&dest->target_oid, &src->target_oid);
366 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
367
368 dest->pgid = src->pgid; /* struct */
369 dest->pg_num = src->pg_num;
370 dest->pg_num_mask = src->pg_num_mask;
371 ceph_osds_copy(&dest->acting, &src->acting);
372 ceph_osds_copy(&dest->up, &src->up);
373 dest->size = src->size;
374 dest->min_size = src->min_size;
375 dest->sort_bitwise = src->sort_bitwise;
376
377 dest->flags = src->flags;
378 dest->paused = src->paused;
379
380 dest->osd = src->osd;
381}
382
348static void target_destroy(struct ceph_osd_request_target *t) 383static void target_destroy(struct ceph_osd_request_target *t)
349{ 384{
350 ceph_oid_destroy(&t->base_oid); 385 ceph_oid_destroy(&t->base_oid);
@@ -357,8 +392,6 @@ static void target_destroy(struct ceph_osd_request_target *t)
357static void request_release_checks(struct ceph_osd_request *req) 392static void request_release_checks(struct ceph_osd_request *req)
358{ 393{
359 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 394 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
360 WARN_ON(!list_empty(&req->r_linger_item));
361 WARN_ON(!list_empty(&req->r_linger_osd_item));
362 WARN_ON(!list_empty(&req->r_unsafe_item)); 395 WARN_ON(!list_empty(&req->r_unsafe_item));
363 WARN_ON(req->r_osd); 396 WARN_ON(req->r_osd);
364} 397}
@@ -419,13 +452,48 @@ static void request_init(struct ceph_osd_request *req)
419 init_completion(&req->r_completion); 452 init_completion(&req->r_completion);
420 init_completion(&req->r_safe_completion); 453 init_completion(&req->r_safe_completion);
421 RB_CLEAR_NODE(&req->r_node); 454 RB_CLEAR_NODE(&req->r_node);
422 INIT_LIST_HEAD(&req->r_linger_item);
423 INIT_LIST_HEAD(&req->r_linger_osd_item);
424 INIT_LIST_HEAD(&req->r_unsafe_item); 455 INIT_LIST_HEAD(&req->r_unsafe_item);
425 456
426 target_init(&req->r_t); 457 target_init(&req->r_t);
427} 458}
428 459
460/*
461 * This is ugly, but it allows us to reuse linger registration and ping
462 * requests, keeping the structure of the code around send_linger{_ping}()
463 * reasonable. Setting up a min_nr=2 mempool for each linger request
464 * and dealing with copying ops (this blasts req only, watch op remains
465 * intact) isn't any better.
466 */
467static void request_reinit(struct ceph_osd_request *req)
468{
469 struct ceph_osd_client *osdc = req->r_osdc;
470 bool mempool = req->r_mempool;
471 unsigned int num_ops = req->r_num_ops;
472 u64 snapid = req->r_snapid;
473 struct ceph_snap_context *snapc = req->r_snapc;
474 bool linger = req->r_linger;
475 struct ceph_msg *request_msg = req->r_request;
476 struct ceph_msg *reply_msg = req->r_reply;
477
478 dout("%s req %p\n", __func__, req);
479 WARN_ON(atomic_read(&req->r_kref.refcount) != 1);
480 request_release_checks(req);
481
482 WARN_ON(atomic_read(&request_msg->kref.refcount) != 1);
483 WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1);
484 target_destroy(&req->r_t);
485
486 request_init(req);
487 req->r_osdc = osdc;
488 req->r_mempool = mempool;
489 req->r_num_ops = num_ops;
490 req->r_snapid = snapid;
491 req->r_snapc = snapc;
492 req->r_linger = linger;
493 req->r_request = request_msg;
494 req->r_reply = reply_msg;
495}
496
429struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 497struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
430 struct ceph_snap_context *snapc, 498 struct ceph_snap_context *snapc,
431 unsigned int num_ops, 499 unsigned int num_ops,
@@ -681,21 +749,19 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
681} 749}
682EXPORT_SYMBOL(osd_req_op_xattr_init); 750EXPORT_SYMBOL(osd_req_op_xattr_init);
683 751
684void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 752/*
685 unsigned int which, u16 opcode, 753 * @watch_opcode: CEPH_OSD_WATCH_OP_*
686 u64 cookie, u64 version, int flag) 754 */
755static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
756 u64 cookie, u8 watch_opcode)
687{ 757{
688 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 758 struct ceph_osd_req_op *op;
689 opcode, 0);
690
691 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
692 759
760 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
693 op->watch.cookie = cookie; 761 op->watch.cookie = cookie;
694 op->watch.ver = version; 762 op->watch.op = watch_opcode;
695 if (opcode == CEPH_OSD_OP_WATCH && flag) 763 op->watch.gen = 0;
696 op->watch.flag = (u8)1;
697} 764}
698EXPORT_SYMBOL(osd_req_op_watch_init);
699 765
700void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 766void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
701 unsigned int which, 767 unsigned int which,
@@ -771,11 +837,13 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
771 break; 837 break;
772 case CEPH_OSD_OP_STARTSYNC: 838 case CEPH_OSD_OP_STARTSYNC:
773 break; 839 break;
774 case CEPH_OSD_OP_NOTIFY_ACK:
775 case CEPH_OSD_OP_WATCH: 840 case CEPH_OSD_OP_WATCH:
776 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 841 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
777 dst->watch.ver = cpu_to_le64(src->watch.ver); 842 dst->watch.ver = cpu_to_le64(0);
778 dst->watch.flag = src->watch.flag; 843 dst->watch.op = src->watch.op;
844 dst->watch.gen = cpu_to_le32(src->watch.gen);
845 break;
846 case CEPH_OSD_OP_NOTIFY_ACK:
779 break; 847 break;
780 case CEPH_OSD_OP_SETALLOCHINT: 848 case CEPH_OSD_OP_SETALLOCHINT:
781 dst->alloc_hint.expected_object_size = 849 dst->alloc_hint.expected_object_size =
@@ -915,7 +983,7 @@ static void osd_init(struct ceph_osd *osd)
915 atomic_set(&osd->o_ref, 1); 983 atomic_set(&osd->o_ref, 1);
916 RB_CLEAR_NODE(&osd->o_node); 984 RB_CLEAR_NODE(&osd->o_node);
917 osd->o_requests = RB_ROOT; 985 osd->o_requests = RB_ROOT;
918 INIT_LIST_HEAD(&osd->o_linger_requests); 986 osd->o_linger_requests = RB_ROOT;
919 INIT_LIST_HEAD(&osd->o_osd_lru); 987 INIT_LIST_HEAD(&osd->o_osd_lru);
920 INIT_LIST_HEAD(&osd->o_keepalive_item); 988 INIT_LIST_HEAD(&osd->o_keepalive_item);
921 osd->o_incarnation = 1; 989 osd->o_incarnation = 1;
@@ -926,7 +994,7 @@ static void osd_cleanup(struct ceph_osd *osd)
926{ 994{
927 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 995 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
928 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 996 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
929 WARN_ON(!list_empty(&osd->o_linger_requests)); 997 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
930 WARN_ON(!list_empty(&osd->o_osd_lru)); 998 WARN_ON(!list_empty(&osd->o_osd_lru));
931 WARN_ON(!list_empty(&osd->o_keepalive_item)); 999 WARN_ON(!list_empty(&osd->o_keepalive_item));
932 1000
@@ -996,7 +1064,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd)
996static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1064static void maybe_move_osd_to_lru(struct ceph_osd *osd)
997{ 1065{
998 if (RB_EMPTY_ROOT(&osd->o_requests) && 1066 if (RB_EMPTY_ROOT(&osd->o_requests) &&
999 list_empty(&osd->o_linger_requests)) 1067 RB_EMPTY_ROOT(&osd->o_linger_requests))
1000 __move_osd_to_lru(osd); 1068 __move_osd_to_lru(osd);
1001} 1069}
1002 1070
@@ -1036,6 +1104,17 @@ static void close_osd(struct ceph_osd *osd)
1036 unlink_request(osd, req); 1104 unlink_request(osd, req);
1037 link_request(&osdc->homeless_osd, req); 1105 link_request(&osdc->homeless_osd, req);
1038 } 1106 }
1107 for (n = rb_first(&osd->o_linger_requests); n; ) {
1108 struct ceph_osd_linger_request *lreq =
1109 rb_entry(n, struct ceph_osd_linger_request, node);
1110
1111 n = rb_next(n); /* unlink_linger() */
1112
1113 dout(" reassigning lreq %p linger_id %llu\n", lreq,
1114 lreq->linger_id);
1115 unlink_linger(osd, lreq);
1116 link_linger(&osdc->homeless_osd, lreq);
1117 }
1039 1118
1040 __remove_osd_from_lru(osd); 1119 __remove_osd_from_lru(osd);
1041 erase_osd(&osdc->osds, osd); 1120 erase_osd(&osdc->osds, osd);
@@ -1052,7 +1131,7 @@ static int reopen_osd(struct ceph_osd *osd)
1052 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1131 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1053 1132
1054 if (RB_EMPTY_ROOT(&osd->o_requests) && 1133 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1055 list_empty(&osd->o_linger_requests)) { 1134 RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1056 close_osd(osd); 1135 close_osd(osd);
1057 return -ENODEV; 1136 return -ENODEV;
1058 } 1137 }
@@ -1148,52 +1227,6 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1148 atomic_dec(&osd->o_osdc->num_homeless); 1227 atomic_dec(&osd->o_osdc->num_homeless);
1149} 1228}
1150 1229
1151static void __register_linger_request(struct ceph_osd *osd,
1152 struct ceph_osd_request *req)
1153{
1154 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1155 WARN_ON(!req->r_linger);
1156
1157 ceph_osdc_get_request(req);
1158 list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
1159 list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
1160 __remove_osd_from_lru(osd);
1161 req->r_osd = osd;
1162}
1163
1164static void __unregister_linger_request(struct ceph_osd_client *osdc,
1165 struct ceph_osd_request *req)
1166{
1167 WARN_ON(!req->r_linger);
1168
1169 if (list_empty(&req->r_linger_item)) {
1170 dout("%s %p tid %llu not registered\n", __func__, req,
1171 req->r_tid);
1172 return;
1173 }
1174
1175 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1176 list_del_init(&req->r_linger_item);
1177
1178 if (req->r_osd) {
1179 list_del_init(&req->r_linger_osd_item);
1180 maybe_move_osd_to_lru(req->r_osd);
1181 if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
1182 req->r_osd = NULL;
1183 }
1184 ceph_osdc_put_request(req);
1185}
1186
1187void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1188 struct ceph_osd_request *req)
1189{
1190 if (!req->r_linger) {
1191 dout("set_request_linger %p\n", req);
1192 req->r_linger = 1;
1193 }
1194}
1195EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1196
1197static bool __pool_full(struct ceph_pg_pool_info *pi) 1230static bool __pool_full(struct ceph_pg_pool_info *pi)
1198{ 1231{
1199 return pi->flags & CEPH_POOL_FLAG_FULL; 1232 return pi->flags & CEPH_POOL_FLAG_FULL;
@@ -1379,6 +1412,10 @@ static void setup_request_data(struct ceph_osd_request *req,
1379 op->xattr.value_len); 1412 op->xattr.value_len);
1380 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1413 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1381 break; 1414 break;
1415 case CEPH_OSD_OP_NOTIFY_ACK:
1416 ceph_osdc_msg_data_add(msg,
1417 &op->notify_ack.request_data);
1418 break;
1382 1419
1383 /* reply */ 1420 /* reply */
1384 case CEPH_OSD_OP_STAT: 1421 case CEPH_OSD_OP_STAT:
@@ -1684,6 +1721,460 @@ static void cancel_request(struct ceph_osd_request *req)
1684} 1721}
1685 1722
1686/* 1723/*
1724 * lingering requests, watch/notify v2 infrastructure
1725 */
1726static void linger_release(struct kref *kref)
1727{
1728 struct ceph_osd_linger_request *lreq =
1729 container_of(kref, struct ceph_osd_linger_request, kref);
1730
1731 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
1732 lreq->reg_req, lreq->ping_req);
1733 WARN_ON(!RB_EMPTY_NODE(&lreq->node));
1734 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
1735 WARN_ON(!list_empty(&lreq->scan_item));
1736 WARN_ON(lreq->osd);
1737
1738 if (lreq->reg_req)
1739 ceph_osdc_put_request(lreq->reg_req);
1740 if (lreq->ping_req)
1741 ceph_osdc_put_request(lreq->ping_req);
1742 target_destroy(&lreq->t);
1743 kfree(lreq);
1744}
1745
1746static void linger_put(struct ceph_osd_linger_request *lreq)
1747{
1748 if (lreq)
1749 kref_put(&lreq->kref, linger_release);
1750}
1751
1752static struct ceph_osd_linger_request *
1753linger_get(struct ceph_osd_linger_request *lreq)
1754{
1755 kref_get(&lreq->kref);
1756 return lreq;
1757}
1758
1759static struct ceph_osd_linger_request *
1760linger_alloc(struct ceph_osd_client *osdc)
1761{
1762 struct ceph_osd_linger_request *lreq;
1763
1764 lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
1765 if (!lreq)
1766 return NULL;
1767
1768 kref_init(&lreq->kref);
1769 mutex_init(&lreq->lock);
1770 RB_CLEAR_NODE(&lreq->node);
1771 RB_CLEAR_NODE(&lreq->osdc_node);
1772 INIT_LIST_HEAD(&lreq->scan_item);
1773 init_completion(&lreq->reg_commit_wait);
1774
1775 lreq->osdc = osdc;
1776 target_init(&lreq->t);
1777
1778 dout("%s lreq %p\n", __func__, lreq);
1779 return lreq;
1780}
1781
1782DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
1783DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
1784
1785/*
1786 * Create linger request <-> OSD session relation.
1787 *
1788 * @lreq has to be registered, @osd may be homeless.
1789 */
1790static void link_linger(struct ceph_osd *osd,
1791 struct ceph_osd_linger_request *lreq)
1792{
1793 verify_osd_locked(osd);
1794 WARN_ON(!lreq->linger_id || lreq->osd);
1795 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
1796 osd->o_osd, lreq, lreq->linger_id);
1797
1798 if (!osd_homeless(osd))
1799 __remove_osd_from_lru(osd);
1800 else
1801 atomic_inc(&osd->o_osdc->num_homeless);
1802
1803 get_osd(osd);
1804 insert_linger(&osd->o_linger_requests, lreq);
1805 lreq->osd = osd;
1806}
1807
1808static void unlink_linger(struct ceph_osd *osd,
1809 struct ceph_osd_linger_request *lreq)
1810{
1811 verify_osd_locked(osd);
1812 WARN_ON(lreq->osd != osd);
1813 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
1814 osd->o_osd, lreq, lreq->linger_id);
1815
1816 lreq->osd = NULL;
1817 erase_linger(&osd->o_linger_requests, lreq);
1818 put_osd(osd);
1819
1820 if (!osd_homeless(osd))
1821 maybe_move_osd_to_lru(osd);
1822 else
1823 atomic_dec(&osd->o_osdc->num_homeless);
1824}
1825
1826static bool __linger_registered(struct ceph_osd_linger_request *lreq)
1827{
1828 verify_osdc_locked(lreq->osdc);
1829
1830 return !RB_EMPTY_NODE(&lreq->osdc_node);
1831}
1832
1833static bool linger_registered(struct ceph_osd_linger_request *lreq)
1834{
1835 struct ceph_osd_client *osdc = lreq->osdc;
1836 bool registered;
1837
1838 down_read(&osdc->lock);
1839 registered = __linger_registered(lreq);
1840 up_read(&osdc->lock);
1841
1842 return registered;
1843}
1844
1845static void linger_register(struct ceph_osd_linger_request *lreq)
1846{
1847 struct ceph_osd_client *osdc = lreq->osdc;
1848
1849 verify_osdc_wrlocked(osdc);
1850 WARN_ON(lreq->linger_id);
1851
1852 linger_get(lreq);
1853 lreq->linger_id = ++osdc->last_linger_id;
1854 insert_linger_osdc(&osdc->linger_requests, lreq);
1855}
1856
1857static void linger_unregister(struct ceph_osd_linger_request *lreq)
1858{
1859 struct ceph_osd_client *osdc = lreq->osdc;
1860
1861 verify_osdc_wrlocked(osdc);
1862
1863 erase_linger_osdc(&osdc->linger_requests, lreq);
1864 linger_put(lreq);
1865}
1866
1867static void cancel_linger_request(struct ceph_osd_request *req)
1868{
1869 struct ceph_osd_linger_request *lreq = req->r_priv;
1870
1871 WARN_ON(!req->r_linger);
1872 cancel_request(req);
1873 linger_put(lreq);
1874}
1875
1876struct linger_work {
1877 struct work_struct work;
1878 struct ceph_osd_linger_request *lreq;
1879
1880 union {
1881 struct {
1882 u64 notify_id;
1883 u64 notifier_id;
1884 void *payload; /* points into @msg front */
1885 size_t payload_len;
1886
1887 struct ceph_msg *msg; /* for ceph_msg_put() */
1888 } notify;
1889 struct {
1890 int err;
1891 } error;
1892 };
1893};
1894
1895static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
1896 work_func_t workfn)
1897{
1898 struct linger_work *lwork;
1899
1900 lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
1901 if (!lwork)
1902 return NULL;
1903
1904 INIT_WORK(&lwork->work, workfn);
1905 lwork->lreq = linger_get(lreq);
1906
1907 return lwork;
1908}
1909
1910static void lwork_free(struct linger_work *lwork)
1911{
1912 struct ceph_osd_linger_request *lreq = lwork->lreq;
1913
1914 linger_put(lreq);
1915 kfree(lwork);
1916}
1917
1918static void lwork_queue(struct linger_work *lwork)
1919{
1920 struct ceph_osd_linger_request *lreq = lwork->lreq;
1921 struct ceph_osd_client *osdc = lreq->osdc;
1922
1923 verify_lreq_locked(lreq);
1924 queue_work(osdc->notify_wq, &lwork->work);
1925}
1926
1927static void do_watch_notify(struct work_struct *w)
1928{
1929 struct linger_work *lwork = container_of(w, struct linger_work, work);
1930 struct ceph_osd_linger_request *lreq = lwork->lreq;
1931
1932 if (!linger_registered(lreq)) {
1933 dout("%s lreq %p not registered\n", __func__, lreq);
1934 goto out;
1935 }
1936
1937 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
1938 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
1939 lwork->notify.payload_len);
1940 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
1941 lwork->notify.notifier_id, lwork->notify.payload,
1942 lwork->notify.payload_len);
1943
1944out:
1945 ceph_msg_put(lwork->notify.msg);
1946 lwork_free(lwork);
1947}
1948
1949static void do_watch_error(struct work_struct *w)
1950{
1951 struct linger_work *lwork = container_of(w, struct linger_work, work);
1952 struct ceph_osd_linger_request *lreq = lwork->lreq;
1953
1954 if (!linger_registered(lreq)) {
1955 dout("%s lreq %p not registered\n", __func__, lreq);
1956 goto out;
1957 }
1958
1959 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
1960 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
1961
1962out:
1963 lwork_free(lwork);
1964}
1965
1966static void queue_watch_error(struct ceph_osd_linger_request *lreq)
1967{
1968 struct linger_work *lwork;
1969
1970 lwork = lwork_alloc(lreq, do_watch_error);
1971 if (!lwork) {
1972 pr_err("failed to allocate error-lwork\n");
1973 return;
1974 }
1975
1976 lwork->error.err = lreq->last_error;
1977 lwork_queue(lwork);
1978}
1979
1980static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
1981 int result)
1982{
1983 if (!completion_done(&lreq->reg_commit_wait)) {
1984 lreq->reg_commit_error = (result <= 0 ? result : 0);
1985 complete_all(&lreq->reg_commit_wait);
1986 }
1987}
1988
1989static void linger_commit_cb(struct ceph_osd_request *req)
1990{
1991 struct ceph_osd_linger_request *lreq = req->r_priv;
1992
1993 mutex_lock(&lreq->lock);
1994 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
1995 lreq->linger_id, req->r_result);
1996 WARN_ON(!__linger_registered(lreq));
1997 linger_reg_commit_complete(lreq, req->r_result);
1998 lreq->committed = true;
1999
2000 mutex_unlock(&lreq->lock);
2001 linger_put(lreq);
2002}
2003
2004static int normalize_watch_error(int err)
2005{
2006 /*
2007 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
2008 * notification and a failure to reconnect because we raced with
2009 * the delete appear the same to the user.
2010 */
2011 if (err == -ENOENT)
2012 err = -ENOTCONN;
2013
2014 return err;
2015}
2016
2017static void linger_reconnect_cb(struct ceph_osd_request *req)
2018{
2019 struct ceph_osd_linger_request *lreq = req->r_priv;
2020
2021 mutex_lock(&lreq->lock);
2022 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
2023 lreq, lreq->linger_id, req->r_result, lreq->last_error);
2024 if (req->r_result < 0) {
2025 if (!lreq->last_error) {
2026 lreq->last_error = normalize_watch_error(req->r_result);
2027 queue_watch_error(lreq);
2028 }
2029 }
2030
2031 mutex_unlock(&lreq->lock);
2032 linger_put(lreq);
2033}
2034
2035static void send_linger(struct ceph_osd_linger_request *lreq)
2036{
2037 struct ceph_osd_request *req = lreq->reg_req;
2038 struct ceph_osd_req_op *op = &req->r_ops[0];
2039
2040 verify_osdc_wrlocked(req->r_osdc);
2041 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2042
2043 if (req->r_osd)
2044 cancel_linger_request(req);
2045
2046 request_reinit(req);
2047 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2048 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2049 req->r_flags = lreq->t.flags;
2050 req->r_mtime = lreq->mtime;
2051
2052 mutex_lock(&lreq->lock);
2053 if (lreq->committed) {
2054 WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2055 op->watch.cookie != lreq->linger_id);
2056 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2057 op->watch.gen = ++lreq->register_gen;
2058 dout("lreq %p reconnect register_gen %u\n", lreq,
2059 op->watch.gen);
2060 req->r_callback = linger_reconnect_cb;
2061 } else {
2062 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
2063 dout("lreq %p register\n", lreq);
2064 req->r_callback = linger_commit_cb;
2065 }
2066 mutex_unlock(&lreq->lock);
2067
2068 req->r_priv = linger_get(lreq);
2069 req->r_linger = true;
2070
2071 submit_request(req, true);
2072}
2073
2074static void linger_ping_cb(struct ceph_osd_request *req)
2075{
2076 struct ceph_osd_linger_request *lreq = req->r_priv;
2077
2078 mutex_lock(&lreq->lock);
2079 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
2080 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
2081 lreq->last_error);
2082 if (lreq->register_gen == req->r_ops[0].watch.gen) {
2083 if (req->r_result && !lreq->last_error) {
2084 lreq->last_error = normalize_watch_error(req->r_result);
2085 queue_watch_error(lreq);
2086 }
2087 } else {
2088 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
2089 lreq->register_gen, req->r_ops[0].watch.gen);
2090 }
2091
2092 mutex_unlock(&lreq->lock);
2093 linger_put(lreq);
2094}
2095
2096static void send_linger_ping(struct ceph_osd_linger_request *lreq)
2097{
2098 struct ceph_osd_client *osdc = lreq->osdc;
2099 struct ceph_osd_request *req = lreq->ping_req;
2100 struct ceph_osd_req_op *op = &req->r_ops[0];
2101
2102 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
2103 dout("%s PAUSERD\n", __func__);
2104 return;
2105 }
2106
2107 lreq->ping_sent = jiffies;
2108 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
2109 __func__, lreq, lreq->linger_id, lreq->ping_sent,
2110 lreq->register_gen);
2111
2112 if (req->r_osd)
2113 cancel_linger_request(req);
2114
2115 request_reinit(req);
2116 target_copy(&req->r_t, &lreq->t);
2117
2118 WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2119 op->watch.cookie != lreq->linger_id ||
2120 op->watch.op != CEPH_OSD_WATCH_OP_PING);
2121 op->watch.gen = lreq->register_gen;
2122 req->r_callback = linger_ping_cb;
2123 req->r_priv = linger_get(lreq);
2124 req->r_linger = true;
2125
2126 ceph_osdc_get_request(req);
2127 account_request(req);
2128 req->r_tid = atomic64_inc_return(&osdc->last_tid);
2129 link_request(lreq->osd, req);
2130 send_request(req);
2131}
2132
2133static void linger_submit(struct ceph_osd_linger_request *lreq)
2134{
2135 struct ceph_osd_client *osdc = lreq->osdc;
2136 struct ceph_osd *osd;
2137
2138 calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
2139 osd = lookup_create_osd(osdc, lreq->t.osd, true);
2140 link_linger(osd, lreq);
2141
2142 send_linger(lreq);
2143}
2144
2145/*
2146 * @lreq has to be both registered and linked.
2147 */
2148static void __linger_cancel(struct ceph_osd_linger_request *lreq)
2149{
2150 if (lreq->ping_req->r_osd)
2151 cancel_linger_request(lreq->ping_req);
2152 if (lreq->reg_req->r_osd)
2153 cancel_linger_request(lreq->reg_req);
2154 unlink_linger(lreq->osd, lreq);
2155 linger_unregister(lreq);
2156}
2157
2158static void linger_cancel(struct ceph_osd_linger_request *lreq)
2159{
2160 struct ceph_osd_client *osdc = lreq->osdc;
2161
2162 down_write(&osdc->lock);
2163 if (__linger_registered(lreq))
2164 __linger_cancel(lreq);
2165 up_write(&osdc->lock);
2166}
2167
2168static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
2169{
2170 int ret;
2171
2172 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2173 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
2174 return ret ?: lreq->reg_commit_error;
2175}
2176
2177/*
1687 * Timeout callback, called every N seconds. When 1 or more OSD 2178 * Timeout callback, called every N seconds. When 1 or more OSD
1688 * requests has been active for more than N seconds, we send a keepalive 2179 * requests has been active for more than N seconds, we send a keepalive
1689 * (tag + timestamp) to its OSD to ensure any communications channel 2180 * (tag + timestamp) to its OSD to ensure any communications channel
@@ -1720,6 +2211,19 @@ static void handle_timeout(struct work_struct *work)
1720 found = true; 2211 found = true;
1721 } 2212 }
1722 } 2213 }
2214 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
2215 struct ceph_osd_linger_request *lreq =
2216 rb_entry(p, struct ceph_osd_linger_request, node);
2217
2218 dout(" lreq %p linger_id %llu is served by osd%d\n",
2219 lreq, lreq->linger_id, osd->o_osd);
2220 found = true;
2221
2222 mutex_lock(&lreq->lock);
2223 if (lreq->committed && !lreq->last_error)
2224 send_linger_ping(lreq);
2225 mutex_unlock(&lreq->lock);
2226 }
1723 2227
1724 if (found) 2228 if (found)
1725 list_move_tail(&osd->o_keepalive_item, &slow_osds); 2229 list_move_tail(&osd->o_keepalive_item, &slow_osds);
@@ -1756,7 +2260,7 @@ static void handle_osds_timeout(struct work_struct *work)
1756 break; 2260 break;
1757 2261
1758 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 2262 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1759 WARN_ON(!list_empty(&osd->o_linger_requests)); 2263 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1760 close_osd(osd); 2264 close_osd(osd);
1761 } 2265 }
1762 2266
@@ -2082,7 +2586,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
2082 __finish_request(req); 2586 __finish_request(req);
2083 if (req->r_linger) { 2587 if (req->r_linger) {
2084 WARN_ON(req->r_unsafe_callback); 2588 WARN_ON(req->r_unsafe_callback);
2085 __register_linger_request(osd, req); 2589 dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
2590 __complete_request(req);
2086 } 2591 }
2087 } 2592 }
2088 2593
@@ -2093,7 +2598,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
2093 if (already_acked && req->r_unsafe_callback) { 2598 if (already_acked && req->r_unsafe_callback) {
2094 dout("req %p tid %llu safe-cb\n", req, req->r_tid); 2599 dout("req %p tid %llu safe-cb\n", req, req->r_tid);
2095 req->r_unsafe_callback(req, false); 2600 req->r_unsafe_callback(req, false);
2096 } else { 2601 } else if (!req->r_linger) {
2097 dout("req %p tid %llu cb\n", req, req->r_tid); 2602 dout("req %p tid %llu cb\n", req, req->r_tid);
2098 __complete_request(req); 2603 __complete_request(req);
2099 } 2604 }
@@ -2145,6 +2650,26 @@ static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
2145 return pi->was_full && !__pool_full(pi); 2650 return pi->was_full && !__pool_full(pi);
2146} 2651}
2147 2652
2653static enum calc_target_result
2654recalc_linger_target(struct ceph_osd_linger_request *lreq)
2655{
2656 struct ceph_osd_client *osdc = lreq->osdc;
2657 enum calc_target_result ct_res;
2658
2659 ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
2660 if (ct_res == CALC_TARGET_NEED_RESEND) {
2661 struct ceph_osd *osd;
2662
2663 osd = lookup_create_osd(osdc, lreq->t.osd, true);
2664 if (osd != lreq->osd) {
2665 unlink_linger(lreq->osd, lreq);
2666 link_linger(osd, lreq);
2667 }
2668 }
2669
2670 return ct_res;
2671}
2672
2148/* 2673/*
2149 * Requeue requests whose mapping to an OSD has changed. 2674 * Requeue requests whose mapping to an OSD has changed.
2150 */ 2675 */
@@ -2159,6 +2684,39 @@ static void scan_requests(struct ceph_osd *osd,
2159 struct rb_node *n; 2684 struct rb_node *n;
2160 bool force_resend_writes; 2685 bool force_resend_writes;
2161 2686
2687 for (n = rb_first(&osd->o_linger_requests); n; ) {
2688 struct ceph_osd_linger_request *lreq =
2689 rb_entry(n, struct ceph_osd_linger_request, node);
2690 enum calc_target_result ct_res;
2691
2692 n = rb_next(n); /* recalc_linger_target() */
2693
2694 dout("%s lreq %p linger_id %llu\n", __func__, lreq,
2695 lreq->linger_id);
2696 ct_res = recalc_linger_target(lreq);
2697 switch (ct_res) {
2698 case CALC_TARGET_NO_ACTION:
2699 force_resend_writes = cleared_full ||
2700 (check_pool_cleared_full &&
2701 pool_cleared_full(osdc, lreq->t.base_oloc.pool));
2702 if (!force_resend && !force_resend_writes)
2703 break;
2704
2705 /* fall through */
2706 case CALC_TARGET_NEED_RESEND:
2707 /*
2708 * scan_requests() for the previous epoch(s)
2709 * may have already added it to the list, since
2710 * it's not unlinked here.
2711 */
2712 if (list_empty(&lreq->scan_item))
2713 list_add_tail(&lreq->scan_item, need_resend_linger);
2714 break;
2715 case CALC_TARGET_POOL_DNE:
2716 break;
2717 }
2718 }
2719
2162 for (n = rb_first(&osd->o_requests); n; ) { 2720 for (n = rb_first(&osd->o_requests); n; ) {
2163 struct ceph_osd_request *req = 2721 struct ceph_osd_request *req =
2164 rb_entry(n, struct ceph_osd_request, r_node); 2722 rb_entry(n, struct ceph_osd_request, r_node);
@@ -2263,6 +2821,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
2263 struct rb_root *need_resend, 2821 struct rb_root *need_resend,
2264 struct list_head *need_resend_linger) 2822 struct list_head *need_resend_linger)
2265{ 2823{
2824 struct ceph_osd_linger_request *lreq, *nlreq;
2266 struct rb_node *n; 2825 struct rb_node *n;
2267 2826
2268 for (n = rb_first(need_resend); n; ) { 2827 for (n = rb_first(need_resend); n; ) {
@@ -2280,8 +2839,17 @@ static void kick_requests(struct ceph_osd_client *osdc,
2280 if (!req->r_linger) { 2839 if (!req->r_linger) {
2281 if (!osd_homeless(osd) && !req->r_t.paused) 2840 if (!osd_homeless(osd) && !req->r_t.paused)
2282 send_request(req); 2841 send_request(req);
2842 } else {
2843 cancel_linger_request(req);
2283 } 2844 }
2284 } 2845 }
2846
2847 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
2848 if (!osd_homeless(lreq->osd))
2849 send_linger(lreq);
2850
2851 list_del_init(&lreq->scan_item);
2852 }
2285} 2853}
2286 2854
2287/* 2855/*
@@ -2406,15 +2974,25 @@ static void kick_osd_requests(struct ceph_osd *osd)
2406{ 2974{
2407 struct rb_node *n; 2975 struct rb_node *n;
2408 2976
2409 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 2977 for (n = rb_first(&osd->o_requests); n; ) {
2410 struct ceph_osd_request *req = 2978 struct ceph_osd_request *req =
2411 rb_entry(n, struct ceph_osd_request, r_node); 2979 rb_entry(n, struct ceph_osd_request, r_node);
2412 2980
2981 n = rb_next(n); /* cancel_linger_request() */
2982
2413 if (!req->r_linger) { 2983 if (!req->r_linger) {
2414 if (!req->r_t.paused) 2984 if (!req->r_t.paused)
2415 send_request(req); 2985 send_request(req);
2986 } else {
2987 cancel_linger_request(req);
2416 } 2988 }
2417 } 2989 }
2990 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
2991 struct ceph_osd_linger_request *lreq =
2992 rb_entry(n, struct ceph_osd_linger_request, node);
2993
2994 send_linger(lreq);
2995 }
2418} 2996}
2419 2997
2420/* 2998/*
@@ -2442,192 +3020,76 @@ out_unlock:
2442} 3020}
2443 3021
2444/* 3022/*
2445 * watch/notify callback event infrastructure
2446 *
2447 * These callbacks are used both for watch and notify operations.
2448 */
2449static void __release_event(struct kref *kref)
2450{
2451 struct ceph_osd_event *event =
2452 container_of(kref, struct ceph_osd_event, kref);
2453
2454 dout("__release_event %p\n", event);
2455 kfree(event);
2456}
2457
2458static void get_event(struct ceph_osd_event *event)
2459{
2460 kref_get(&event->kref);
2461}
2462
2463void ceph_osdc_put_event(struct ceph_osd_event *event)
2464{
2465 kref_put(&event->kref, __release_event);
2466}
2467EXPORT_SYMBOL(ceph_osdc_put_event);
2468
2469static void __insert_event(struct ceph_osd_client *osdc,
2470 struct ceph_osd_event *new)
2471{
2472 struct rb_node **p = &osdc->event_tree.rb_node;
2473 struct rb_node *parent = NULL;
2474 struct ceph_osd_event *event = NULL;
2475
2476 while (*p) {
2477 parent = *p;
2478 event = rb_entry(parent, struct ceph_osd_event, node);
2479 if (new->cookie < event->cookie)
2480 p = &(*p)->rb_left;
2481 else if (new->cookie > event->cookie)
2482 p = &(*p)->rb_right;
2483 else
2484 BUG();
2485 }
2486
2487 rb_link_node(&new->node, parent, p);
2488 rb_insert_color(&new->node, &osdc->event_tree);
2489}
2490
2491static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
2492 u64 cookie)
2493{
2494 struct rb_node **p = &osdc->event_tree.rb_node;
2495 struct rb_node *parent = NULL;
2496 struct ceph_osd_event *event = NULL;
2497
2498 while (*p) {
2499 parent = *p;
2500 event = rb_entry(parent, struct ceph_osd_event, node);
2501 if (cookie < event->cookie)
2502 p = &(*p)->rb_left;
2503 else if (cookie > event->cookie)
2504 p = &(*p)->rb_right;
2505 else
2506 return event;
2507 }
2508 return NULL;
2509}
2510
2511static void __remove_event(struct ceph_osd_event *event)
2512{
2513 struct ceph_osd_client *osdc = event->osdc;
2514
2515 if (!RB_EMPTY_NODE(&event->node)) {
2516 dout("__remove_event removed %p\n", event);
2517 rb_erase(&event->node, &osdc->event_tree);
2518 ceph_osdc_put_event(event);
2519 } else {
2520 dout("__remove_event didn't remove %p\n", event);
2521 }
2522}
2523
2524int ceph_osdc_create_event(struct ceph_osd_client *osdc,
2525 void (*event_cb)(u64, u64, u8, void *),
2526 void *data, struct ceph_osd_event **pevent)
2527{
2528 struct ceph_osd_event *event;
2529
2530 event = kmalloc(sizeof(*event), GFP_NOIO);
2531 if (!event)
2532 return -ENOMEM;
2533
2534 dout("create_event %p\n", event);
2535 event->cb = event_cb;
2536 event->one_shot = 0;
2537 event->data = data;
2538 event->osdc = osdc;
2539 INIT_LIST_HEAD(&event->osd_node);
2540 RB_CLEAR_NODE(&event->node);
2541 kref_init(&event->kref); /* one ref for us */
2542 kref_get(&event->kref); /* one ref for the caller */
2543
2544 spin_lock(&osdc->event_lock);
2545 event->cookie = ++osdc->event_count;
2546 __insert_event(osdc, event);
2547 spin_unlock(&osdc->event_lock);
2548
2549 *pevent = event;
2550 return 0;
2551}
2552EXPORT_SYMBOL(ceph_osdc_create_event);
2553
2554void ceph_osdc_cancel_event(struct ceph_osd_event *event)
2555{
2556 struct ceph_osd_client *osdc = event->osdc;
2557
2558 dout("cancel_event %p\n", event);
2559 spin_lock(&osdc->event_lock);
2560 __remove_event(event);
2561 spin_unlock(&osdc->event_lock);
2562 ceph_osdc_put_event(event); /* caller's */
2563}
2564EXPORT_SYMBOL(ceph_osdc_cancel_event);
2565
2566
2567static void do_event_work(struct work_struct *work)
2568{
2569 struct ceph_osd_event_work *event_work =
2570 container_of(work, struct ceph_osd_event_work, work);
2571 struct ceph_osd_event *event = event_work->event;
2572 u64 ver = event_work->ver;
2573 u64 notify_id = event_work->notify_id;
2574 u8 opcode = event_work->opcode;
2575
2576 dout("do_event_work completing %p\n", event);
2577 event->cb(ver, notify_id, opcode, event->data);
2578 dout("do_event_work completed %p\n", event);
2579 ceph_osdc_put_event(event);
2580 kfree(event_work);
2581}
2582
2583
2584/*
2585 * Process osd watch notifications 3023 * Process osd watch notifications
2586 */ 3024 */
2587static void handle_watch_notify(struct ceph_osd_client *osdc, 3025static void handle_watch_notify(struct ceph_osd_client *osdc,
2588 struct ceph_msg *msg) 3026 struct ceph_msg *msg)
2589{ 3027{
2590 void *p, *end; 3028 void *p = msg->front.iov_base;
2591 u8 proto_ver; 3029 void *const end = p + msg->front.iov_len;
2592 u64 cookie, ver, notify_id; 3030 struct ceph_osd_linger_request *lreq;
2593 u8 opcode; 3031 struct linger_work *lwork;
2594 struct ceph_osd_event *event; 3032 u8 proto_ver, opcode;
2595 struct ceph_osd_event_work *event_work; 3033 u64 cookie, notify_id;
2596 3034 u64 notifier_id = 0;
2597 p = msg->front.iov_base; 3035 void *payload = NULL;
2598 end = p + msg->front.iov_len; 3036 u32 payload_len = 0;
2599 3037
2600 ceph_decode_8_safe(&p, end, proto_ver, bad); 3038 ceph_decode_8_safe(&p, end, proto_ver, bad);
2601 ceph_decode_8_safe(&p, end, opcode, bad); 3039 ceph_decode_8_safe(&p, end, opcode, bad);
2602 ceph_decode_64_safe(&p, end, cookie, bad); 3040 ceph_decode_64_safe(&p, end, cookie, bad);
2603 ceph_decode_64_safe(&p, end, ver, bad); 3041 p += 8; /* skip ver */
2604 ceph_decode_64_safe(&p, end, notify_id, bad); 3042 ceph_decode_64_safe(&p, end, notify_id, bad);
2605 3043
2606 spin_lock(&osdc->event_lock); 3044 if (proto_ver >= 1) {
2607 event = __find_event(osdc, cookie); 3045 ceph_decode_32_safe(&p, end, payload_len, bad);
2608 if (event) { 3046 ceph_decode_need(&p, end, payload_len, bad);
2609 BUG_ON(event->one_shot); 3047 payload = p;
2610 get_event(event); 3048 p += payload_len;
2611 } 3049 }
2612 spin_unlock(&osdc->event_lock); 3050
2613 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 3051 if (le16_to_cpu(msg->hdr.version) >= 2)
2614 cookie, ver, event); 3052 p += 4; /* skip return_code */
2615 if (event) { 3053
2616 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 3054 if (le16_to_cpu(msg->hdr.version) >= 3)
2617 if (!event_work) { 3055 ceph_decode_64_safe(&p, end, notifier_id, bad);
2618 pr_err("couldn't allocate event_work\n"); 3056
2619 ceph_osdc_put_event(event); 3057 down_read(&osdc->lock);
2620 return; 3058 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
3059 if (!lreq) {
3060 dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
3061 cookie);
3062 goto out_unlock_osdc;
3063 }
3064
3065 mutex_lock(&lreq->lock);
3066 dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie,
3067 lreq);
3068 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
3069 if (!lreq->last_error) {
3070 lreq->last_error = -ENOTCONN;
3071 queue_watch_error(lreq);
3072 }
3073 } else {
3074 /* CEPH_WATCH_EVENT_NOTIFY */
3075 lwork = lwork_alloc(lreq, do_watch_notify);
3076 if (!lwork) {
3077 pr_err("failed to allocate notify-lwork\n");
3078 goto out_unlock_lreq;
2621 } 3079 }
2622 INIT_WORK(&event_work->work, do_event_work);
2623 event_work->event = event;
2624 event_work->ver = ver;
2625 event_work->notify_id = notify_id;
2626 event_work->opcode = opcode;
2627 3080
2628 queue_work(osdc->notify_wq, &event_work->work); 3081 lwork->notify.notify_id = notify_id;
3082 lwork->notify.notifier_id = notifier_id;
3083 lwork->notify.payload = payload;
3084 lwork->notify.payload_len = payload_len;
3085 lwork->notify.msg = ceph_msg_get(msg);
3086 lwork_queue(lwork);
2629 } 3087 }
2630 3088
3089out_unlock_lreq:
3090 mutex_unlock(&lreq->lock);
3091out_unlock_osdc:
3092 up_read(&osdc->lock);
2631 return; 3093 return;
2632 3094
2633bad: 3095bad:
@@ -2659,8 +3121,6 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req)
2659 struct ceph_osd_client *osdc = req->r_osdc; 3121 struct ceph_osd_client *osdc = req->r_osdc;
2660 3122
2661 down_write(&osdc->lock); 3123 down_write(&osdc->lock);
2662 if (req->r_linger)
2663 __unregister_linger_request(osdc, req);
2664 if (req->r_osd) 3124 if (req->r_osd)
2665 cancel_request(req); 3125 cancel_request(req);
2666 up_write(&osdc->lock); 3126 up_write(&osdc->lock);
@@ -2743,6 +3203,198 @@ again:
2743} 3203}
2744EXPORT_SYMBOL(ceph_osdc_sync); 3204EXPORT_SYMBOL(ceph_osdc_sync);
2745 3205
3206static struct ceph_osd_request *
3207alloc_linger_request(struct ceph_osd_linger_request *lreq)
3208{
3209 struct ceph_osd_request *req;
3210
3211 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
3212 if (!req)
3213 return NULL;
3214
3215 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3216 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3217
3218 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
3219 ceph_osdc_put_request(req);
3220 return NULL;
3221 }
3222
3223 return req;
3224}
3225
3226/*
3227 * Returns a handle, caller owns a ref.
3228 */
3229struct ceph_osd_linger_request *
3230ceph_osdc_watch(struct ceph_osd_client *osdc,
3231 struct ceph_object_id *oid,
3232 struct ceph_object_locator *oloc,
3233 rados_watchcb2_t wcb,
3234 rados_watcherrcb_t errcb,
3235 void *data)
3236{
3237 struct ceph_osd_linger_request *lreq;
3238 int ret;
3239
3240 lreq = linger_alloc(osdc);
3241 if (!lreq)
3242 return ERR_PTR(-ENOMEM);
3243
3244 lreq->wcb = wcb;
3245 lreq->errcb = errcb;
3246 lreq->data = data;
3247
3248 ceph_oid_copy(&lreq->t.base_oid, oid);
3249 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
3250 lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
3251 lreq->mtime = CURRENT_TIME;
3252
3253 lreq->reg_req = alloc_linger_request(lreq);
3254 if (!lreq->reg_req) {
3255 ret = -ENOMEM;
3256 goto err_put_lreq;
3257 }
3258
3259 lreq->ping_req = alloc_linger_request(lreq);
3260 if (!lreq->ping_req) {
3261 ret = -ENOMEM;
3262 goto err_put_lreq;
3263 }
3264
3265 down_write(&osdc->lock);
3266 linger_register(lreq); /* before osd_req_op_* */
3267 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
3268 CEPH_OSD_WATCH_OP_WATCH);
3269 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
3270 CEPH_OSD_WATCH_OP_PING);
3271 linger_submit(lreq);
3272 up_write(&osdc->lock);
3273
3274 ret = linger_reg_commit_wait(lreq);
3275 if (ret) {
3276 linger_cancel(lreq);
3277 goto err_put_lreq;
3278 }
3279
3280 return lreq;
3281
3282err_put_lreq:
3283 linger_put(lreq);
3284 return ERR_PTR(ret);
3285}
3286EXPORT_SYMBOL(ceph_osdc_watch);
3287
3288/*
3289 * Releases a ref.
3290 *
3291 * Times out after mount_timeout to preserve rbd unmap behaviour
3292 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
3293 * with mount_timeout").
3294 */
3295int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
3296 struct ceph_osd_linger_request *lreq)
3297{
3298 struct ceph_options *opts = osdc->client->options;
3299 struct ceph_osd_request *req;
3300 int ret;
3301
3302 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3303 if (!req)
3304 return -ENOMEM;
3305
3306 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3307 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3308 req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
3309 req->r_mtime = CURRENT_TIME;
3310 osd_req_op_watch_init(req, 0, lreq->linger_id,
3311 CEPH_OSD_WATCH_OP_UNWATCH);
3312
3313 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3314 if (ret)
3315 goto out_put_req;
3316
3317 ceph_osdc_start_request(osdc, req, false);
3318 linger_cancel(lreq);
3319 linger_put(lreq);
3320 ret = wait_request_timeout(req, opts->mount_timeout);
3321
3322out_put_req:
3323 ceph_osdc_put_request(req);
3324 return ret;
3325}
3326EXPORT_SYMBOL(ceph_osdc_unwatch);
3327
3328static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
3329 u64 notify_id, u64 cookie, void *payload,
3330 size_t payload_len)
3331{
3332 struct ceph_osd_req_op *op;
3333 struct ceph_pagelist *pl;
3334 int ret;
3335
3336 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
3337
3338 pl = kmalloc(sizeof(*pl), GFP_NOIO);
3339 if (!pl)
3340 return -ENOMEM;
3341
3342 ceph_pagelist_init(pl);
3343 ret = ceph_pagelist_encode_64(pl, notify_id);
3344 ret |= ceph_pagelist_encode_64(pl, cookie);
3345 if (payload) {
3346 ret |= ceph_pagelist_encode_32(pl, payload_len);
3347 ret |= ceph_pagelist_append(pl, payload, payload_len);
3348 } else {
3349 ret |= ceph_pagelist_encode_32(pl, 0);
3350 }
3351 if (ret) {
3352 ceph_pagelist_release(pl);
3353 return -ENOMEM;
3354 }
3355
3356 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
3357 op->indata_len = pl->length;
3358 return 0;
3359}
3360
3361int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
3362 struct ceph_object_id *oid,
3363 struct ceph_object_locator *oloc,
3364 u64 notify_id,
3365 u64 cookie,
3366 void *payload,
3367 size_t payload_len)
3368{
3369 struct ceph_osd_request *req;
3370 int ret;
3371
3372 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3373 if (!req)
3374 return -ENOMEM;
3375
3376 ceph_oid_copy(&req->r_base_oid, oid);
3377 ceph_oloc_copy(&req->r_base_oloc, oloc);
3378 req->r_flags = CEPH_OSD_FLAG_READ;
3379
3380 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3381 if (ret)
3382 goto out_put_req;
3383
3384 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
3385 payload_len);
3386 if (ret)
3387 goto out_put_req;
3388
3389 ceph_osdc_start_request(osdc, req, false);
3390 ret = ceph_osdc_wait_request(osdc, req);
3391
3392out_put_req:
3393 ceph_osdc_put_request(req);
3394 return ret;
3395}
3396EXPORT_SYMBOL(ceph_osdc_notify_ack);
3397
2746/* 3398/*
2747 * Call all pending notify callbacks - for use after a watch is 3399 * Call all pending notify callbacks - for use after a watch is
2748 * unregistered, to make sure no more callbacks for it will be invoked 3400 * unregistered, to make sure no more callbacks for it will be invoked
@@ -2767,15 +3419,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2767 osdc->osds = RB_ROOT; 3419 osdc->osds = RB_ROOT;
2768 INIT_LIST_HEAD(&osdc->osd_lru); 3420 INIT_LIST_HEAD(&osdc->osd_lru);
2769 spin_lock_init(&osdc->osd_lru_lock); 3421 spin_lock_init(&osdc->osd_lru_lock);
2770 INIT_LIST_HEAD(&osdc->req_linger);
2771 osd_init(&osdc->homeless_osd); 3422 osd_init(&osdc->homeless_osd);
2772 osdc->homeless_osd.o_osdc = osdc; 3423 osdc->homeless_osd.o_osdc = osdc;
2773 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 3424 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
3425 osdc->linger_requests = RB_ROOT;
2774 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 3426 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
2775 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 3427 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
2776 spin_lock_init(&osdc->event_lock);
2777 osdc->event_tree = RB_ROOT;
2778 osdc->event_count = 0;
2779 3428
2780 err = -ENOMEM; 3429 err = -ENOMEM;
2781 osdc->osdmap = ceph_osdmap_alloc(); 3430 osdc->osdmap = ceph_osdmap_alloc();
@@ -2838,6 +3487,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
2838 osd_cleanup(&osdc->homeless_osd); 3487 osd_cleanup(&osdc->homeless_osd);
2839 3488
2840 WARN_ON(!list_empty(&osdc->osd_lru)); 3489 WARN_ON(!list_empty(&osdc->osd_lru));
3490 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
2841 WARN_ON(atomic_read(&osdc->num_requests)); 3491 WARN_ON(atomic_read(&osdc->num_requests));
2842 WARN_ON(atomic_read(&osdc->num_homeless)); 3492 WARN_ON(atomic_read(&osdc->num_homeless));
2843 3493