aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/file.c52
-rw-r--r--include/linux/ceph/osd_client.h4
-rw-r--r--net/ceph/osd_client.c12
3 files changed, 40 insertions, 28 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ae23e31a8f38..a65acf355384 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -446,19 +446,35 @@ done:
446} 446}
447 447
448/* 448/*
449 * Write commit callback, called if we requested both an ACK and 449 * Write commit request unsafe callback, called to tell us when a
450 * ONDISK commit reply from the OSD. 450 * request is unsafe (that is, in flight--has been handed to the
451 * messenger to send to its target osd). It is called again when
452 * we've received a response message indicating the request is
453 * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request
454 * is completed early (and unsuccessfully) due to a timeout or
455 * interrupt.
456 *
457 * This is used if we requested both an ACK and ONDISK commit reply
458 * from the OSD.
451 */ 459 */
452static void sync_write_commit(struct ceph_osd_request *req, 460static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
453 struct ceph_msg *msg)
454{ 461{
455 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 462 struct ceph_inode_info *ci = ceph_inode(req->r_inode);
456 463
457 dout("sync_write_commit %p tid %llu\n", req, req->r_tid); 464 dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid,
458 spin_lock(&ci->i_unsafe_lock); 465 unsafe ? "un" : "");
459 list_del_init(&req->r_unsafe_item); 466 if (unsafe) {
460 spin_unlock(&ci->i_unsafe_lock); 467 ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
461 ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); 468 spin_lock(&ci->i_unsafe_lock);
469 list_add_tail(&req->r_unsafe_item,
470 &ci->i_unsafe_writes);
471 spin_unlock(&ci->i_unsafe_lock);
472 } else {
473 spin_lock(&ci->i_unsafe_lock);
474 list_del_init(&req->r_unsafe_item);
475 spin_unlock(&ci->i_unsafe_lock);
476 ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
477 }
462} 478}
463 479
464/* 480/*
@@ -570,7 +586,8 @@ more:
570 586
571 if ((file->f_flags & O_SYNC) == 0) { 587 if ((file->f_flags & O_SYNC) == 0) {
572 /* get a second commit callback */ 588 /* get a second commit callback */
573 req->r_safe_callback = sync_write_commit; 589 req->r_unsafe_callback = ceph_sync_write_unsafe;
590 req->r_inode = inode;
574 own_pages = true; 591 own_pages = true;
575 } 592 }
576 } 593 }
@@ -581,21 +598,8 @@ more:
581 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 598 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
582 599
583 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 600 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
584 if (!ret) { 601 if (!ret)
585 if (req->r_safe_callback) {
586 /*
587 * Add to inode unsafe list only after we
588 * start_request so that a tid has been assigned.
589 */
590 spin_lock(&ci->i_unsafe_lock);
591 list_add_tail(&req->r_unsafe_item,
592 &ci->i_unsafe_writes);
593 spin_unlock(&ci->i_unsafe_lock);
594 ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
595 }
596
597 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 602 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
598 }
599 603
600 if (file->f_flags & O_DIRECT) 604 if (file->f_flags & O_DIRECT)
601 ceph_put_page_vector(pages, num_pages, false); 605 ceph_put_page_vector(pages, num_pages, false);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2a68a7465c18..0d3358ef5285 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -29,6 +29,7 @@ struct ceph_authorizer;
29 */ 29 */
30typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *, 30typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
31 struct ceph_msg *); 31 struct ceph_msg *);
32typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool);
32 33
33/* a given osd we're communicating with */ 34/* a given osd we're communicating with */
34struct ceph_osd { 35struct ceph_osd {
@@ -149,7 +150,8 @@ struct ceph_osd_request {
149 struct kref r_kref; 150 struct kref r_kref;
150 bool r_mempool; 151 bool r_mempool;
151 struct completion r_completion, r_safe_completion; 152 struct completion r_completion, r_safe_completion;
152 ceph_osdc_callback_t r_callback, r_safe_callback; 153 ceph_osdc_callback_t r_callback;
154 ceph_osdc_unsafe_callback_t r_unsafe_callback;
153 struct ceph_eversion r_reassert_version; 155 struct ceph_eversion r_reassert_version;
154 struct list_head r_unsafe_item; 156 struct list_head r_unsafe_item;
155 157
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 939be67199ca..0c5bf2fb5075 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1314,8 +1314,14 @@ static void __send_request(struct ceph_osd_client *osdc,
1314 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1314 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1315 1315
1316 ceph_msg_get(req->r_request); /* send consumes a ref */ 1316 ceph_msg_get(req->r_request); /* send consumes a ref */
1317 ceph_con_send(&req->r_osd->o_con, req->r_request); 1317
1318 /* Mark the request unsafe if this is the first timet's being sent. */
1319
1320 if (!req->r_sent && req->r_unsafe_callback)
1321 req->r_unsafe_callback(req, true);
1318 req->r_sent = req->r_osd->o_incarnation; 1322 req->r_sent = req->r_osd->o_incarnation;
1323
1324 ceph_con_send(&req->r_osd->o_con, req->r_request);
1319} 1325}
1320 1326
1321/* 1327/*
@@ -1403,8 +1409,8 @@ static void handle_osds_timeout(struct work_struct *work)
1403 1409
1404static void complete_request(struct ceph_osd_request *req) 1410static void complete_request(struct ceph_osd_request *req)
1405{ 1411{
1406 if (req->r_safe_callback) 1412 if (req->r_unsafe_callback)
1407 req->r_safe_callback(req, NULL); 1413 req->r_unsafe_callback(req, false);
1408 complete_all(&req->r_safe_completion); /* fsync waiter */ 1414 complete_all(&req->r_safe_completion); /* fsync waiter */
1409} 1415}
1410 1416