aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_req.c
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2011-11-28 09:04:49 -0500
committerPhilipp Reisner <philipp.reisner@linbit.com>2012-11-08 10:58:35 -0500
commitb6dd1a89767bc33e9c98b3195f8925b46c5c95f3 (patch)
treee82371062171f5cade79cb0c4a6cd22486b5f082 /drivers/block/drbd/drbd_req.c
parentd5b27b01f17ef1f0badc45f9eea521be3457c9cb (diff)
drbd: remove struct drbd_tl_epoch objects (barrier works)
cherry-picked and adapted from drbd 9 devel branch DRBD requests (struct drbd_request) are already on the per resource transfer log list, and carry their epoch number. We do not need to additionally link them on other ring lists in other structs. The drbd sender thread can recognize itself when to send a P_BARRIER, by tracking the currently processed epoch, and how many writes have been processed for that epoch. If the epoch of the request to be processed does not match the currently processed epoch, any writes have been processed in it, a P_BARRIER for this last processed epoch is send out first. The new epoch then becomes the currently processed epoch. To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK, the sender thread also needs to handle the case when the current epoch was closed already, but no new requests are queued yet, and send out P_BARRIER as soon as possible. This is done by comparing the per resource "current transfer log epoch" (tconn->current_tle_nr) with the per connection "currently processed epoch number" (tconn->send.current_epoch_nr), while waiting for new requests to be processed in wait_for_work(). Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd/drbd_req.c')
-rw-r--r--drivers/block/drbd/drbd_req.c157
1 files changed, 43 insertions, 114 deletions
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index e609557a9425..ca28b56b7a2f 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -149,46 +149,16 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
149 drbd_req_free(req); 149 drbd_req_free(req);
150} 150}
151 151
152static void queue_barrier(struct drbd_conf *mdev) 152static void wake_all_senders(struct drbd_tconn *tconn) {
153{ 153 wake_up(&tconn->sender_work.q_wait);
154 struct drbd_tl_epoch *b;
155 struct drbd_tconn *tconn = mdev->tconn;
156
157 /* We are within the req_lock. Once we queued the barrier for sending,
158 * we set the CREATE_BARRIER bit. It is cleared as soon as a new
159 * barrier/epoch object is added. This is the only place this bit is
160 * set. It indicates that the barrier for this epoch is already queued,
161 * and no new epoch has been created yet. */
162 if (test_bit(CREATE_BARRIER, &tconn->flags))
163 return;
164
165 b = tconn->newest_tle;
166 b->w.cb = w_send_barrier;
167 b->w.mdev = mdev;
168 /* inc_ap_pending done here, so we won't
169 * get imbalanced on connection loss.
170 * dec_ap_pending will be done in got_BarrierAck
171 * or (on connection loss) in tl_clear. */
172 inc_ap_pending(mdev);
173 drbd_queue_work(&tconn->sender_work, &b->w);
174 set_bit(CREATE_BARRIER, &tconn->flags);
175} 154}
176 155
177static void _about_to_complete_local_write(struct drbd_conf *mdev, 156/* must hold resource->req_lock */
178 struct drbd_request *req) 157static void start_new_tl_epoch(struct drbd_tconn *tconn)
179{ 158{
180 const unsigned long s = req->rq_state; 159 tconn->current_tle_writes = 0;
181 160 atomic_inc(&tconn->current_tle_nr);
182 /* Before we can signal completion to the upper layers, 161 wake_all_senders(tconn);
183 * we may need to close the current epoch.
184 * We can skip this, if this request has not even been sent, because we
185 * did not have a fully established connection yet/anymore, during
186 * bitmap exchange, or while we are C_AHEAD due to congestion policy.
187 */
188 if (mdev->state.conn >= C_CONNECTED &&
189 (s & RQ_NET_SENT) != 0 &&
190 req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
191 queue_barrier(mdev);
192} 162}
193 163
194void complete_master_bio(struct drbd_conf *mdev, 164void complete_master_bio(struct drbd_conf *mdev,
@@ -320,9 +290,16 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m)
320 } else if (!(s & RQ_POSTPONED)) 290 } else if (!(s & RQ_POSTPONED))
321 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0); 291 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
322 292
323 /* for writes we need to do some extra housekeeping */ 293 /* Before we can signal completion to the upper layers,
324 if (rw == WRITE) 294 * we may need to close the current transfer log epoch.
325 _about_to_complete_local_write(mdev, req); 295 * We are within the request lock, so we can simply compare
296 * the request epoch number with the current transfer log
297 * epoch number. If they match, increase the current_tle_nr,
298 * and reset the transfer log epoch write_cnt.
299 */
300 if (rw == WRITE &&
301 req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
302 start_new_tl_epoch(mdev->tconn);
326 303
327 /* Update disk stats */ 304 /* Update disk stats */
328 _drbd_end_io_acct(mdev, req); 305 _drbd_end_io_acct(mdev, req);
@@ -514,15 +491,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
514 * hurting performance. */ 491 * hurting performance. */
515 set_bit(UNPLUG_REMOTE, &mdev->flags); 492 set_bit(UNPLUG_REMOTE, &mdev->flags);
516 493
517 /* see __drbd_make_request,
518 * just after it grabs the req_lock */
519 D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
520
521 req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
522
523 /* increment size of current epoch */
524 mdev->tconn->newest_tle->n_writes++;
525
526 /* queue work item to send data */ 494 /* queue work item to send data */
527 D_ASSERT(req->rq_state & RQ_NET_PENDING); 495 D_ASSERT(req->rq_state & RQ_NET_PENDING);
528 req->rq_state |= RQ_NET_QUEUED; 496 req->rq_state |= RQ_NET_QUEUED;
@@ -534,8 +502,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
534 nc = rcu_dereference(mdev->tconn->net_conf); 502 nc = rcu_dereference(mdev->tconn->net_conf);
535 p = nc->max_epoch_size; 503 p = nc->max_epoch_size;
536 rcu_read_unlock(); 504 rcu_read_unlock();
537 if (mdev->tconn->newest_tle->n_writes >= p) 505 if (mdev->tconn->current_tle_writes >= p)
538 queue_barrier(mdev); 506 start_new_tl_epoch(mdev->tconn);
539 507
540 break; 508 break;
541 509
@@ -692,6 +660,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
692 During connection handshake, we ensure that the peer was not rebooted. */ 660 During connection handshake, we ensure that the peer was not rebooted. */
693 if (!(req->rq_state & RQ_NET_OK)) { 661 if (!(req->rq_state & RQ_NET_OK)) {
694 if (req->w.cb) { 662 if (req->w.cb) {
663 /* w.cb expected to be w_send_dblock, or w_send_read_req */
695 drbd_queue_work(&mdev->tconn->sender_work, &req->w); 664 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
696 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; 665 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
697 } 666 }
@@ -708,7 +677,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
708 * this is bad, because if the connection is lost now, 677 * this is bad, because if the connection is lost now,
709 * we won't be able to clean them up... */ 678 * we won't be able to clean them up... */
710 dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n"); 679 dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
711 list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
712 } 680 }
713 if ((req->rq_state & RQ_NET_MASK) != 0) { 681 if ((req->rq_state & RQ_NET_MASK) != 0) {
714 req->rq_state |= RQ_NET_DONE; 682 req->rq_state |= RQ_NET_DONE;
@@ -835,7 +803,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
835 const int rw = bio_rw(bio); 803 const int rw = bio_rw(bio);
836 const int size = bio->bi_size; 804 const int size = bio->bi_size;
837 const sector_t sector = bio->bi_sector; 805 const sector_t sector = bio->bi_sector;
838 struct drbd_tl_epoch *b = NULL;
839 struct drbd_request *req; 806 struct drbd_request *req;
840 struct net_conf *nc; 807 struct net_conf *nc;
841 int local, remote, send_oos = 0; 808 int local, remote, send_oos = 0;
@@ -916,24 +883,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
916 goto fail_free_complete; 883 goto fail_free_complete;
917 } 884 }
918 885
919 /* For WRITE request, we have to make sure that we have an
920 * unused_spare_tle, in case we need to start a new epoch.
921 * I try to be smart and avoid to pre-allocate always "just in case",
922 * but there is a race between testing the bit and pointer outside the
923 * spinlock, and grabbing the spinlock.
924 * if we lost that race, we retry. */
925 if (rw == WRITE && (remote || send_oos) &&
926 mdev->tconn->unused_spare_tle == NULL &&
927 test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
928allocate_barrier:
929 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
930 if (!b) {
931 dev_err(DEV, "Failed to alloc barrier.\n");
932 err = -ENOMEM;
933 goto fail_free_complete;
934 }
935 }
936
937 /* GOOD, everything prepared, grab the spin_lock */ 886 /* GOOD, everything prepared, grab the spin_lock */
938 spin_lock_irq(&mdev->tconn->req_lock); 887 spin_lock_irq(&mdev->tconn->req_lock);
939 888
@@ -969,42 +918,9 @@ allocate_barrier:
969 } 918 }
970 } 919 }
971 920
972 if (b && mdev->tconn->unused_spare_tle == NULL) {
973 mdev->tconn->unused_spare_tle = b;
974 b = NULL;
975 }
976 if (rw == WRITE && (remote || send_oos) &&
977 mdev->tconn->unused_spare_tle == NULL &&
978 test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
979 /* someone closed the current epoch
980 * while we were grabbing the spinlock */
981 spin_unlock_irq(&mdev->tconn->req_lock);
982 goto allocate_barrier;
983 }
984
985
986 /* Update disk stats */ 921 /* Update disk stats */
987 _drbd_start_io_acct(mdev, req, bio); 922 _drbd_start_io_acct(mdev, req, bio);
988 923
989 /* _maybe_start_new_epoch(mdev);
990 * If we need to generate a write barrier packet, we have to add the
991 * new epoch (barrier) object, and queue the barrier packet for sending,
992 * and queue the req's data after it _within the same lock_, otherwise
993 * we have race conditions were the reorder domains could be mixed up.
994 *
995 * Even read requests may start a new epoch and queue the corresponding
996 * barrier packet. To get the write ordering right, we only have to
997 * make sure that, if this is a write request and it triggered a
998 * barrier packet, this request is queued within the same spinlock. */
999 if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
1000 test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
1001 _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
1002 mdev->tconn->unused_spare_tle = NULL;
1003 } else {
1004 D_ASSERT(!(remote && rw == WRITE &&
1005 test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
1006 }
1007
1008 /* NOTE 924 /* NOTE
1009 * Actually, 'local' may be wrong here already, since we may have failed 925 * Actually, 'local' may be wrong here already, since we may have failed
1010 * to write to the meta data, and may become wrong anytime because of 926 * to write to the meta data, and may become wrong anytime because of
@@ -1025,7 +941,12 @@ allocate_barrier:
1025 if (local) 941 if (local)
1026 _req_mod(req, TO_BE_SUBMITTED); 942 _req_mod(req, TO_BE_SUBMITTED);
1027 943
1028 list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests); 944 /* which transfer log epoch does this belong to? */
945 req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
946 if (rw == WRITE)
947 mdev->tconn->current_tle_writes++;
948
949 list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
1029 950
1030 /* NOTE remote first: to get the concurrent write detection right, 951 /* NOTE remote first: to get the concurrent write detection right,
1031 * we must register the request before start of local IO. */ 952 * we must register the request before start of local IO. */
@@ -1059,7 +980,9 @@ allocate_barrier:
1059 } 980 }
1060 981
1061 if (congested) { 982 if (congested) {
1062 queue_barrier(mdev); /* last barrier, after mirrored writes */ 983 if (mdev->tconn->current_tle_writes)
984 /* start a new epoch for non-mirrored writes */
985 start_new_tl_epoch(mdev->tconn);
1063 986
1064 if (nc->on_congestion == OC_PULL_AHEAD) 987 if (nc->on_congestion == OC_PULL_AHEAD)
1065 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL); 988 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
@@ -1070,7 +993,6 @@ allocate_barrier:
1070 rcu_read_unlock(); 993 rcu_read_unlock();
1071 994
1072 spin_unlock_irq(&mdev->tconn->req_lock); 995 spin_unlock_irq(&mdev->tconn->req_lock);
1073 kfree(b); /* if someone else has beaten us to it... */
1074 996
1075 if (local) { 997 if (local) {
1076 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 998 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
@@ -1108,7 +1030,6 @@ fail_and_free_req:
1108 1030
1109 drbd_req_free(req); 1031 drbd_req_free(req);
1110 dec_ap_bio(mdev); 1032 dec_ap_bio(mdev);
1111 kfree(b);
1112 1033
1113 return ret; 1034 return ret;
1114} 1035}
@@ -1164,12 +1085,23 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
1164 return limit; 1085 return limit;
1165} 1086}
1166 1087
1088struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
1089{
1090 /* Walk the transfer log,
1091 * and find the oldest not yet completed request */
1092 struct drbd_request *r;
1093 list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
1094 if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING))
1095 return r;
1096 }
1097 return NULL;
1098}
1099
1167void request_timer_fn(unsigned long data) 1100void request_timer_fn(unsigned long data)
1168{ 1101{
1169 struct drbd_conf *mdev = (struct drbd_conf *) data; 1102 struct drbd_conf *mdev = (struct drbd_conf *) data;
1170 struct drbd_tconn *tconn = mdev->tconn; 1103 struct drbd_tconn *tconn = mdev->tconn;
1171 struct drbd_request *req; /* oldest request */ 1104 struct drbd_request *req; /* oldest request */
1172 struct list_head *le;
1173 struct net_conf *nc; 1105 struct net_conf *nc;
1174 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ 1106 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1175 unsigned long now; 1107 unsigned long now;
@@ -1193,16 +1125,13 @@ void request_timer_fn(unsigned long data)
1193 now = jiffies; 1125 now = jiffies;
1194 1126
1195 spin_lock_irq(&tconn->req_lock); 1127 spin_lock_irq(&tconn->req_lock);
1196 le = &tconn->oldest_tle->requests; 1128 req = find_oldest_request(tconn);
1197 if (list_empty(le)) { 1129 if (!req) {
1198 spin_unlock_irq(&tconn->req_lock); 1130 spin_unlock_irq(&tconn->req_lock);
1199 mod_timer(&mdev->request_timer, now + et); 1131 mod_timer(&mdev->request_timer, now + et);
1200 return; 1132 return;
1201 } 1133 }
1202 1134
1203 le = le->prev;
1204 req = list_entry(le, struct drbd_request, tl_requests);
1205
1206 /* The request is considered timed out, if 1135 /* The request is considered timed out, if
1207 * - we have some effective timeout from the configuration, 1136 * - we have some effective timeout from the configuration,
1208 * with above state restrictions applied, 1137 * with above state restrictions applied,