aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_req.c
diff options
context:
space:
mode:
authorPhilipp Reisner <philipp.reisner@linbit.com>2012-11-09 08:18:43 -0500
committerPhilipp Reisner <philipp.reisner@linbit.com>2012-11-09 08:20:23 -0500
commit986836503e49ccf7e84b813715d344964ec93566 (patch)
treeb3bea7428efde5b77096cef80e5b6bfee494cc12 /drivers/block/drbd/drbd_req.c
parentccae7868b0c5697508a541c531cf96b361d62c1c (diff)
parent328e0f125bf41f4f33f684db22015f92cb44fe56 (diff)
Merge branch 'drbd-8.4_ed6' into for-3.8-drivers-drbd-8.4_ed6
Diffstat (limited to 'drivers/block/drbd/drbd_req.c')
-rw-r--r--drivers/block/drbd/drbd_req.c1569
1 files changed, 755 insertions, 814 deletions
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 135ea76ed502..f58a4a4b4dfb 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -31,6 +31,8 @@
31#include "drbd_req.h" 31#include "drbd_req.h"
32 32
33 33
34static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size);
35
34/* Update disk stats at start of I/O request */ 36/* Update disk stats at start of I/O request */
35static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio) 37static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
36{ 38{
@@ -40,6 +42,8 @@ static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req
40 part_round_stats(cpu, &mdev->vdisk->part0); 42 part_round_stats(cpu, &mdev->vdisk->part0);
41 part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]); 43 part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
42 part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio)); 44 part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
45 (void) cpu; /* The macro invocations above want the cpu argument, I do not like
46 the compiler warning about cpu only assigned but never used... */
43 part_inc_in_flight(&mdev->vdisk->part0, rw); 47 part_inc_in_flight(&mdev->vdisk->part0, rw);
44 part_stat_unlock(); 48 part_stat_unlock();
45} 49}
@@ -57,9 +61,51 @@ static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
57 part_stat_unlock(); 61 part_stat_unlock();
58} 62}
59 63
60static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw) 64static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
65 struct bio *bio_src)
66{
67 struct drbd_request *req;
68
69 req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
70 if (!req)
71 return NULL;
72
73 drbd_req_make_private_bio(req, bio_src);
74 req->rq_state = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
75 req->w.mdev = mdev;
76 req->master_bio = bio_src;
77 req->epoch = 0;
78
79 drbd_clear_interval(&req->i);
80 req->i.sector = bio_src->bi_sector;
81 req->i.size = bio_src->bi_size;
82 req->i.local = true;
83 req->i.waiting = false;
84
85 INIT_LIST_HEAD(&req->tl_requests);
86 INIT_LIST_HEAD(&req->w.list);
87
88 /* one reference to be put by __drbd_make_request */
89 atomic_set(&req->completion_ref, 1);
90 /* one kref as long as completion_ref > 0 */
91 kref_init(&req->kref);
92 return req;
93}
94
95void drbd_req_destroy(struct kref *kref)
61{ 96{
62 const unsigned long s = req->rq_state; 97 struct drbd_request *req = container_of(kref, struct drbd_request, kref);
98 struct drbd_conf *mdev = req->w.mdev;
99 const unsigned s = req->rq_state;
100
101 if ((req->master_bio && !(s & RQ_POSTPONED)) ||
102 atomic_read(&req->completion_ref) ||
103 (s & RQ_LOCAL_PENDING) ||
104 ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
105 dev_err(DEV, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
106 s, atomic_read(&req->completion_ref));
107 return;
108 }
63 109
64 /* remove it from the transfer log. 110 /* remove it from the transfer log.
65 * well, only if it had been there in the first 111 * well, only if it had been there in the first
@@ -67,24 +113,33 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
67 * and never sent), it should still be "empty" as 113 * and never sent), it should still be "empty" as
68 * initialized in drbd_req_new(), so we can list_del() it 114 * initialized in drbd_req_new(), so we can list_del() it
69 * here unconditionally */ 115 * here unconditionally */
70 list_del(&req->tl_requests); 116 list_del_init(&req->tl_requests);
71 117
72 /* if it was a write, we may have to set the corresponding 118 /* if it was a write, we may have to set the corresponding
73 * bit(s) out-of-sync first. If it had a local part, we need to 119 * bit(s) out-of-sync first. If it had a local part, we need to
74 * release the reference to the activity log. */ 120 * release the reference to the activity log. */
75 if (rw == WRITE) { 121 if (s & RQ_WRITE) {
76 /* Set out-of-sync unless both OK flags are set 122 /* Set out-of-sync unless both OK flags are set
77 * (local only or remote failed). 123 * (local only or remote failed).
78 * Other places where we set out-of-sync: 124 * Other places where we set out-of-sync:
79 * READ with local io-error */ 125 * READ with local io-error */
80 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
81 drbd_set_out_of_sync(mdev, req->sector, req->size);
82 126
83 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS)) 127 /* There is a special case:
84 drbd_set_in_sync(mdev, req->sector, req->size); 128 * we may notice late that IO was suspended,
129 * and postpone, or schedule for retry, a write,
130 * before it even was submitted or sent.
131 * In that case we do not want to touch the bitmap at all.
132 */
133 if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
134 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
135 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
136
137 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
138 drbd_set_in_sync(mdev, req->i.sector, req->i.size);
139 }
85 140
86 /* one might be tempted to move the drbd_al_complete_io 141 /* one might be tempted to move the drbd_al_complete_io
87 * to the local io completion callback drbd_endio_pri. 142 * to the local io completion callback drbd_request_endio.
88 * but, if this was a mirror write, we may only 143 * but, if this was a mirror write, we may only
89 * drbd_al_complete_io after this is RQ_NET_DONE, 144 * drbd_al_complete_io after this is RQ_NET_DONE,
90 * otherwise the extent could be dropped from the al 145 * otherwise the extent could be dropped from the al
@@ -93,109 +148,35 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
93 * but after the extent has been dropped from the al, 148 * but after the extent has been dropped from the al,
94 * we would forget to resync the corresponding extent. 149 * we would forget to resync the corresponding extent.
95 */ 150 */
96 if (s & RQ_LOCAL_MASK) { 151 if (s & RQ_IN_ACT_LOG) {
97 if (get_ldev_if_state(mdev, D_FAILED)) { 152 if (get_ldev_if_state(mdev, D_FAILED)) {
98 if (s & RQ_IN_ACT_LOG) 153 drbd_al_complete_io(mdev, &req->i);
99 drbd_al_complete_io(mdev, req->sector);
100 put_ldev(mdev); 154 put_ldev(mdev);
101 } else if (__ratelimit(&drbd_ratelimit_state)) { 155 } else if (__ratelimit(&drbd_ratelimit_state)) {
102 dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu), " 156 dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu, %u), "
103 "but my Disk seems to have failed :(\n", 157 "but my Disk seems to have failed :(\n",
104 (unsigned long long) req->sector); 158 (unsigned long long) req->i.sector, req->i.size);
105 } 159 }
106 } 160 }
107 } 161 }
108 162
109 drbd_req_free(req); 163 mempool_free(req, drbd_request_mempool);
110} 164}
111 165
112static void queue_barrier(struct drbd_conf *mdev) 166static void wake_all_senders(struct drbd_tconn *tconn) {
113{ 167 wake_up(&tconn->sender_work.q_wait);
114 struct drbd_tl_epoch *b;
115
116 /* We are within the req_lock. Once we queued the barrier for sending,
117 * we set the CREATE_BARRIER bit. It is cleared as soon as a new
118 * barrier/epoch object is added. This is the only place this bit is
119 * set. It indicates that the barrier for this epoch is already queued,
120 * and no new epoch has been created yet. */
121 if (drbd_test_flag(mdev, CREATE_BARRIER))
122 return;
123
124 b = mdev->newest_tle;
125 b->w.cb = w_send_barrier;
126 /* inc_ap_pending done here, so we won't
127 * get imbalanced on connection loss.
128 * dec_ap_pending will be done in got_BarrierAck
129 * or (on connection loss) in tl_clear. */
130 inc_ap_pending(mdev);
131 drbd_queue_work(&mdev->data.work, &b->w);
132 drbd_set_flag(mdev, CREATE_BARRIER);
133} 168}
134 169
135static void _about_to_complete_local_write(struct drbd_conf *mdev, 170/* must hold resource->req_lock */
136 struct drbd_request *req) 171static void start_new_tl_epoch(struct drbd_tconn *tconn)
137{ 172{
138 const unsigned long s = req->rq_state; 173 /* no point closing an epoch, if it is empty, anyways. */
139 struct drbd_request *i; 174 if (tconn->current_tle_writes == 0)
140 struct drbd_epoch_entry *e; 175 return;
141 struct hlist_node *n;
142 struct hlist_head *slot;
143
144 /* Before we can signal completion to the upper layers,
145 * we may need to close the current epoch.
146 * We can skip this, if this request has not even been sent, because we
147 * did not have a fully established connection yet/anymore, during
148 * bitmap exchange, or while we are C_AHEAD due to congestion policy.
149 */
150 if (mdev->state.conn >= C_CONNECTED &&
151 (s & RQ_NET_SENT) != 0 &&
152 req->epoch == mdev->newest_tle->br_number)
153 queue_barrier(mdev);
154
155 /* we need to do the conflict detection stuff,
156 * if we have the ee_hash (two_primaries) and
157 * this has been on the network */
158 if ((s & RQ_NET_DONE) && mdev->ee_hash != NULL) {
159 const sector_t sector = req->sector;
160 const int size = req->size;
161
162 /* ASSERT:
163 * there must be no conflicting requests, since
164 * they must have been failed on the spot */
165#define OVERLAPS overlaps(sector, size, i->sector, i->size)
166 slot = tl_hash_slot(mdev, sector);
167 hlist_for_each_entry(i, n, slot, collision) {
168 if (OVERLAPS) {
169 dev_alert(DEV, "LOGIC BUG: completed: %p %llus +%u; "
170 "other: %p %llus +%u\n",
171 req, (unsigned long long)sector, size,
172 i, (unsigned long long)i->sector, i->size);
173 }
174 }
175 176
176 /* maybe "wake" those conflicting epoch entries 177 tconn->current_tle_writes = 0;
177 * that wait for this request to finish. 178 atomic_inc(&tconn->current_tle_nr);
178 * 179 wake_all_senders(tconn);
179 * currently, there can be only _one_ such ee
180 * (well, or some more, which would be pending
181 * P_DISCARD_ACK not yet sent by the asender...),
182 * since we block the receiver thread upon the
183 * first conflict detection, which will wait on
184 * misc_wait. maybe we want to assert that?
185 *
186 * anyways, if we found one,
187 * we just have to do a wake_up. */
188#undef OVERLAPS
189#define OVERLAPS overlaps(sector, size, e->sector, e->size)
190 slot = ee_hash_slot(mdev, req->sector);
191 hlist_for_each_entry(e, n, slot, collision) {
192 if (OVERLAPS) {
193 wake_up(&mdev->misc_wait);
194 break;
195 }
196 }
197 }
198#undef OVERLAPS
199} 180}
200 181
201void complete_master_bio(struct drbd_conf *mdev, 182void complete_master_bio(struct drbd_conf *mdev,
@@ -205,17 +186,33 @@ void complete_master_bio(struct drbd_conf *mdev,
205 dec_ap_bio(mdev); 186 dec_ap_bio(mdev);
206} 187}
207 188
189
190static void drbd_remove_request_interval(struct rb_root *root,
191 struct drbd_request *req)
192{
193 struct drbd_conf *mdev = req->w.mdev;
194 struct drbd_interval *i = &req->i;
195
196 drbd_remove_interval(root, i);
197
198 /* Wake up any processes waiting for this request to complete. */
199 if (i->waiting)
200 wake_up(&mdev->misc_wait);
201}
202
208/* Helper for __req_mod(). 203/* Helper for __req_mod().
209 * Set m->bio to the master bio, if it is fit to be completed, 204 * Set m->bio to the master bio, if it is fit to be completed,
210 * or leave it alone (it is initialized to NULL in __req_mod), 205 * or leave it alone (it is initialized to NULL in __req_mod),
211 * if it has already been completed, or cannot be completed yet. 206 * if it has already been completed, or cannot be completed yet.
212 * If m->bio is set, the error status to be returned is placed in m->error. 207 * If m->bio is set, the error status to be returned is placed in m->error.
213 */ 208 */
214void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m) 209static
210void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
215{ 211{
216 const unsigned long s = req->rq_state; 212 const unsigned s = req->rq_state;
217 struct drbd_conf *mdev = req->mdev; 213 struct drbd_conf *mdev = req->w.mdev;
218 int rw = req->rq_state & RQ_WRITE ? WRITE : READ; 214 int rw;
215 int error, ok;
219 216
220 /* we must not complete the master bio, while it is 217 /* we must not complete the master bio, while it is
221 * still being processed by _drbd_send_zc_bio (drbd_send_dblock) 218 * still being processed by _drbd_send_zc_bio (drbd_send_dblock)
@@ -226,178 +223,219 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
226 * the receiver, 223 * the receiver,
227 * the bio_endio completion callbacks. 224 * the bio_endio completion callbacks.
228 */ 225 */
229 if (s & RQ_NET_QUEUED) 226 if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
230 return; 227 (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
231 if (s & RQ_NET_PENDING) 228 (s & RQ_COMPLETION_SUSP)) {
229 dev_err(DEV, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
232 return; 230 return;
233 if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) 231 }
232
233 if (!req->master_bio) {
234 dev_err(DEV, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
234 return; 235 return;
236 }
235 237
236 if (req->master_bio) { 238 rw = bio_rw(req->master_bio);
237 /* this is data_received (remote read)
238 * or protocol C P_WRITE_ACK
239 * or protocol B P_RECV_ACK
240 * or protocol A "handed_over_to_network" (SendAck)
241 * or canceled or failed,
242 * or killed from the transfer log due to connection loss.
243 */
244 239
245 /* 240 /*
246 * figure out whether to report success or failure. 241 * figure out whether to report success or failure.
247 * 242 *
248 * report success when at least one of the operations succeeded. 243 * report success when at least one of the operations succeeded.
249 * or, to put the other way, 244 * or, to put the other way,
250 * only report failure, when both operations failed. 245 * only report failure, when both operations failed.
251 * 246 *
252 * what to do about the failures is handled elsewhere. 247 * what to do about the failures is handled elsewhere.
253 * what we need to do here is just: complete the master_bio. 248 * what we need to do here is just: complete the master_bio.
254 * 249 *
255 * local completion error, if any, has been stored as ERR_PTR 250 * local completion error, if any, has been stored as ERR_PTR
256 * in private_bio within drbd_endio_pri. 251 * in private_bio within drbd_request_endio.
257 */ 252 */
258 int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK); 253 ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
259 int error = PTR_ERR(req->private_bio); 254 error = PTR_ERR(req->private_bio);
260 255
261 /* remove the request from the conflict detection 256 /* remove the request from the conflict detection
262 * respective block_id verification hash */ 257 * respective block_id verification hash */
263 if (!hlist_unhashed(&req->collision)) 258 if (!drbd_interval_empty(&req->i)) {
264 hlist_del(&req->collision); 259 struct rb_root *root;
265 else
266 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
267 260
268 /* for writes we need to do some extra housekeeping */
269 if (rw == WRITE) 261 if (rw == WRITE)
270 _about_to_complete_local_write(mdev, req); 262 root = &mdev->write_requests;
263 else
264 root = &mdev->read_requests;
265 drbd_remove_request_interval(root, req);
266 } else if (!(s & RQ_POSTPONED))
267 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
271 268
272 /* Update disk stats */ 269 /* Before we can signal completion to the upper layers,
273 _drbd_end_io_acct(mdev, req); 270 * we may need to close the current transfer log epoch.
271 * We are within the request lock, so we can simply compare
272 * the request epoch number with the current transfer log
273 * epoch number. If they match, increase the current_tle_nr,
274 * and reset the transfer log epoch write_cnt.
275 */
276 if (rw == WRITE &&
277 req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
278 start_new_tl_epoch(mdev->tconn);
279
280 /* Update disk stats */
281 _drbd_end_io_acct(mdev, req);
282
283 /* If READ failed,
284 * have it be pushed back to the retry work queue,
285 * so it will re-enter __drbd_make_request(),
286 * and be re-assigned to a suitable local or remote path,
287 * or failed if we do not have access to good data anymore.
288 *
289 * Unless it was failed early by __drbd_make_request(),
290 * because no path was available, in which case
291 * it was not even added to the transfer_log.
292 *
293 * READA may fail, and will not be retried.
294 *
295 * WRITE should have used all available paths already.
296 */
297 if (!ok && rw == READ && !list_empty(&req->tl_requests))
298 req->rq_state |= RQ_POSTPONED;
274 299
300 if (!(req->rq_state & RQ_POSTPONED)) {
275 m->error = ok ? 0 : (error ?: -EIO); 301 m->error = ok ? 0 : (error ?: -EIO);
276 m->bio = req->master_bio; 302 m->bio = req->master_bio;
277 req->master_bio = NULL; 303 req->master_bio = NULL;
278 } 304 }
305}
279 306
280 if (s & RQ_LOCAL_PENDING) 307static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
281 return; 308{
309 struct drbd_conf *mdev = req->w.mdev;
310 D_ASSERT(m || (req->rq_state & RQ_POSTPONED));
311
312 if (!atomic_sub_and_test(put, &req->completion_ref))
313 return 0;
282 314
283 if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) { 315 drbd_req_complete(req, m);
284 /* this is disconnected (local only) operation, 316
285 * or protocol C P_WRITE_ACK, 317 if (req->rq_state & RQ_POSTPONED) {
286 * or protocol A or B P_BARRIER_ACK, 318 /* don't destroy the req object just yet,
287 * or killed from the transfer log due to connection loss. */ 319 * but queue it for retry */
288 _req_is_done(mdev, req, rw); 320 drbd_restart_request(req);
321 return 0;
289 } 322 }
290 /* else: network part and not DONE yet. that is 323
291 * protocol A or B, barrier ack still pending... */ 324 return 1;
292} 325}
293 326
294static void _req_may_be_done_not_susp(struct drbd_request *req, struct bio_and_error *m) 327/* I'd like this to be the only place that manipulates
328 * req->completion_ref and req->kref. */
329static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
330 int clear, int set)
295{ 331{
296 struct drbd_conf *mdev = req->mdev; 332 struct drbd_conf *mdev = req->w.mdev;
333 unsigned s = req->rq_state;
334 int c_put = 0;
335 int k_put = 0;
297 336
298 if (!is_susp(mdev->state)) 337 if (drbd_suspended(mdev) && !((s | clear) & RQ_COMPLETION_SUSP))
299 _req_may_be_done(req, m); 338 set |= RQ_COMPLETION_SUSP;
300}
301 339
302/* 340 /* apply */
303 * checks whether there was an overlapping request
304 * or ee already registered.
305 *
306 * if so, return 1, in which case this request is completed on the spot,
307 * without ever being submitted or send.
308 *
309 * return 0 if it is ok to submit this request.
310 *
311 * NOTE:
312 * paranoia: assume something above us is broken, and issues different write
313 * requests for the same block simultaneously...
314 *
315 * To ensure these won't be reordered differently on both nodes, resulting in
316 * diverging data sets, we discard the later one(s). Not that this is supposed
317 * to happen, but this is the rationale why we also have to check for
318 * conflicting requests with local origin, and why we have to do so regardless
319 * of whether we allowed multiple primaries.
320 *
321 * BTW, in case we only have one primary, the ee_hash is empty anyways, and the
322 * second hlist_for_each_entry becomes a noop. This is even simpler than to
323 * grab a reference on the net_conf, and check for the two_primaries flag...
324 */
325static int _req_conflicts(struct drbd_request *req)
326{
327 struct drbd_conf *mdev = req->mdev;
328 const sector_t sector = req->sector;
329 const int size = req->size;
330 struct drbd_request *i;
331 struct drbd_epoch_entry *e;
332 struct hlist_node *n;
333 struct hlist_head *slot;
334 341
335 D_ASSERT(hlist_unhashed(&req->collision)); 342 req->rq_state &= ~clear;
343 req->rq_state |= set;
336 344
337 if (!get_net_conf(mdev)) 345 /* no change? */
338 return 0; 346 if (req->rq_state == s)
347 return;
339 348
340 /* BUG_ON */ 349 /* intent: get references */
341 ERR_IF (mdev->tl_hash_s == 0) 350
342 goto out_no_conflict; 351 if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
343 BUG_ON(mdev->tl_hash == NULL); 352 atomic_inc(&req->completion_ref);
344 353
345#define OVERLAPS overlaps(i->sector, i->size, sector, size) 354 if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
346 slot = tl_hash_slot(mdev, sector); 355 inc_ap_pending(mdev);
347 hlist_for_each_entry(i, n, slot, collision) { 356 atomic_inc(&req->completion_ref);
348 if (OVERLAPS) {
349 dev_alert(DEV, "%s[%u] Concurrent local write detected! "
350 "[DISCARD L] new: %llus +%u; "
351 "pending: %llus +%u\n",
352 current->comm, current->pid,
353 (unsigned long long)sector, size,
354 (unsigned long long)i->sector, i->size);
355 goto out_conflict;
356 }
357 } 357 }
358 358
359 if (mdev->ee_hash_s) { 359 if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED))
360 /* now, check for overlapping requests with remote origin */ 360 atomic_inc(&req->completion_ref);
361 BUG_ON(mdev->ee_hash == NULL); 361
362#undef OVERLAPS 362 if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
363#define OVERLAPS overlaps(e->sector, e->size, sector, size) 363 kref_get(&req->kref); /* wait for the DONE */
364 slot = ee_hash_slot(mdev, sector); 364
365 hlist_for_each_entry(e, n, slot, collision) { 365 if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT))
366 if (OVERLAPS) { 366 atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
367 dev_alert(DEV, "%s[%u] Concurrent remote write detected!" 367
368 " [DISCARD L] new: %llus +%u; " 368 if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
369 "pending: %llus +%u\n", 369 atomic_inc(&req->completion_ref);
370 current->comm, current->pid, 370
371 (unsigned long long)sector, size, 371 /* progress: put references */
372 (unsigned long long)e->sector, e->size); 372
373 goto out_conflict; 373 if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
374 } 374 ++c_put;
375 } 375
376 if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
377 D_ASSERT(req->rq_state & RQ_LOCAL_PENDING);
378 /* local completion may still come in later,
379 * we need to keep the req object around. */
380 kref_get(&req->kref);
381 ++c_put;
376 } 382 }
377#undef OVERLAPS
378 383
379out_no_conflict: 384 if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
380 /* this is like it should be, and what we expected. 385 if (req->rq_state & RQ_LOCAL_ABORTED)
381 * our users do behave after all... */ 386 ++k_put;
382 put_net_conf(mdev); 387 else
383 return 0; 388 ++c_put;
389 }
384 390
385out_conflict: 391 if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
386 put_net_conf(mdev); 392 dec_ap_pending(mdev);
387 return 1; 393 ++c_put;
394 }
395
396 if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED))
397 ++c_put;
398
399 if ((s & RQ_EXP_BARR_ACK) && !(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
400 if (req->rq_state & RQ_NET_SENT)
401 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
402 ++k_put;
403 }
404
405 /* potentially complete and destroy */
406
407 if (k_put || c_put) {
408 /* Completion does it's own kref_put. If we are going to
409 * kref_sub below, we need req to be still around then. */
410 int at_least = k_put + !!c_put;
411 int refcount = atomic_read(&req->kref.refcount);
412 if (refcount < at_least)
413 dev_err(DEV,
414 "mod_rq_state: Logic BUG: %x -> %x: refcount = %d, should be >= %d\n",
415 s, req->rq_state, refcount, at_least);
416 }
417
418 /* If we made progress, retry conflicting peer requests, if any. */
419 if (req->i.waiting)
420 wake_up(&mdev->misc_wait);
421
422 if (c_put)
423 k_put += drbd_req_put_completion_ref(req, m, c_put);
424 if (k_put)
425 kref_sub(&req->kref, k_put, drbd_req_destroy);
388} 426}
389 427
390static void drbd_report_io_error(struct drbd_conf *mdev, struct drbd_request *req) 428static void drbd_report_io_error(struct drbd_conf *mdev, struct drbd_request *req)
391{ 429{
392 char b[BDEVNAME_SIZE]; 430 char b[BDEVNAME_SIZE];
393 431
394 if (__ratelimit(&drbd_ratelimit_state)) 432 if (!__ratelimit(&drbd_ratelimit_state))
395 return; 433 return;
396 434
397 dev_warn(DEV, "local %s IO error sector %llu+%u on %s\n", 435 dev_warn(DEV, "local %s IO error sector %llu+%u on %s\n",
398 (req->rq_state & RQ_WRITE) ? "WRITE" : "READ", 436 (req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
399 (unsigned long long)req->sector, 437 (unsigned long long)req->i.sector,
400 req->size >> 9, 438 req->i.size >> 9,
401 bdevname(mdev->ldev->backing_bdev, b)); 439 bdevname(mdev->ldev->backing_bdev, b));
402} 440}
403 441
@@ -416,9 +454,12 @@ static void drbd_report_io_error(struct drbd_conf *mdev, struct drbd_request *re
416int __req_mod(struct drbd_request *req, enum drbd_req_event what, 454int __req_mod(struct drbd_request *req, enum drbd_req_event what,
417 struct bio_and_error *m) 455 struct bio_and_error *m)
418{ 456{
419 struct drbd_conf *mdev = req->mdev; 457 struct drbd_conf *mdev = req->w.mdev;
420 int rv = 0; 458 struct net_conf *nc;
421 m->bio = NULL; 459 int p, rv = 0;
460
461 if (m)
462 m->bio = NULL;
422 463
423 switch (what) { 464 switch (what) {
424 default: 465 default:
@@ -427,118 +468,91 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
427 468
428 /* does not happen... 469 /* does not happen...
429 * initialization done in drbd_req_new 470 * initialization done in drbd_req_new
430 case created: 471 case CREATED:
431 break; 472 break;
432 */ 473 */
433 474
434 case to_be_send: /* via network */ 475 case TO_BE_SENT: /* via network */
435 /* reached via drbd_make_request_common 476 /* reached via __drbd_make_request
436 * and from w_read_retry_remote */ 477 * and from w_read_retry_remote */
437 D_ASSERT(!(req->rq_state & RQ_NET_MASK)); 478 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
438 req->rq_state |= RQ_NET_PENDING; 479 rcu_read_lock();
439 inc_ap_pending(mdev); 480 nc = rcu_dereference(mdev->tconn->net_conf);
481 p = nc->wire_protocol;
482 rcu_read_unlock();
483 req->rq_state |=
484 p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
485 p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
486 mod_rq_state(req, m, 0, RQ_NET_PENDING);
440 break; 487 break;
441 488
442 case to_be_submitted: /* locally */ 489 case TO_BE_SUBMITTED: /* locally */
443 /* reached via drbd_make_request_common */ 490 /* reached via __drbd_make_request */
444 D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK)); 491 D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
445 req->rq_state |= RQ_LOCAL_PENDING; 492 mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
446 break; 493 break;
447 494
448 case completed_ok: 495 case COMPLETED_OK:
449 if (req->rq_state & RQ_WRITE) 496 if (req->rq_state & RQ_WRITE)
450 mdev->writ_cnt += req->size>>9; 497 mdev->writ_cnt += req->i.size >> 9;
451 else 498 else
452 mdev->read_cnt += req->size>>9; 499 mdev->read_cnt += req->i.size >> 9;
453 500
454 req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK); 501 mod_rq_state(req, m, RQ_LOCAL_PENDING,
455 req->rq_state &= ~RQ_LOCAL_PENDING; 502 RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
456
457 _req_may_be_done_not_susp(req, m);
458 break; 503 break;
459 504
460 case abort_disk_io: 505 case ABORT_DISK_IO:
461 req->rq_state |= RQ_LOCAL_ABORTED; 506 mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
462 if (req->rq_state & RQ_WRITE)
463 _req_may_be_done_not_susp(req, m);
464 else
465 goto goto_queue_for_net_read;
466 break; 507 break;
467 508
468 case write_completed_with_error: 509 case WRITE_COMPLETED_WITH_ERROR:
469 req->rq_state |= RQ_LOCAL_COMPLETED;
470 req->rq_state &= ~RQ_LOCAL_PENDING;
471
472 drbd_report_io_error(mdev, req); 510 drbd_report_io_error(mdev, req);
473 __drbd_chk_io_error(mdev, DRBD_WRITE_ERROR); 511 __drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
474 _req_may_be_done_not_susp(req, m); 512 mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
475 break; 513 break;
476 514
477 case read_ahead_completed_with_error: 515 case READ_COMPLETED_WITH_ERROR:
478 /* it is legal to fail READA */ 516 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
479 req->rq_state |= RQ_LOCAL_COMPLETED;
480 req->rq_state &= ~RQ_LOCAL_PENDING;
481 _req_may_be_done_not_susp(req, m);
482 break;
483
484 case read_completed_with_error:
485 drbd_set_out_of_sync(mdev, req->sector, req->size);
486
487 req->rq_state |= RQ_LOCAL_COMPLETED;
488 req->rq_state &= ~RQ_LOCAL_PENDING;
489
490 if (req->rq_state & RQ_LOCAL_ABORTED) {
491 _req_may_be_done(req, m);
492 break;
493 }
494
495 drbd_report_io_error(mdev, req); 517 drbd_report_io_error(mdev, req);
496 __drbd_chk_io_error(mdev, DRBD_READ_ERROR); 518 __drbd_chk_io_error(mdev, DRBD_READ_ERROR);
519 /* fall through. */
520 case READ_AHEAD_COMPLETED_WITH_ERROR:
521 /* it is legal to fail READA, no __drbd_chk_io_error in that case. */
522 mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
523 break;
497 524
498 goto_queue_for_net_read: 525 case QUEUE_FOR_NET_READ:
499
500 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
501
502 /* no point in retrying if there is no good remote data,
503 * or we have no connection. */
504 if (mdev->state.pdsk != D_UP_TO_DATE) {
505 _req_may_be_done_not_susp(req, m);
506 break;
507 }
508
509 /* _req_mod(req,to_be_send); oops, recursion... */
510 req->rq_state |= RQ_NET_PENDING;
511 inc_ap_pending(mdev);
512 /* fall through: _req_mod(req,queue_for_net_read); */
513
514 case queue_for_net_read:
515 /* READ or READA, and 526 /* READ or READA, and
516 * no local disk, 527 * no local disk,
517 * or target area marked as invalid, 528 * or target area marked as invalid,
518 * or just got an io-error. */ 529 * or just got an io-error. */
519 /* from drbd_make_request_common 530 /* from __drbd_make_request
520 * or from bio_endio during read io-error recovery */ 531 * or from bio_endio during read io-error recovery */
521 532
522 /* so we can verify the handle in the answer packet 533 /* So we can verify the handle in the answer packet.
523 * corresponding hlist_del is in _req_may_be_done() */ 534 * Corresponding drbd_remove_request_interval is in
524 hlist_add_head(&req->collision, ar_hash_slot(mdev, req->sector)); 535 * drbd_req_complete() */
536 D_ASSERT(drbd_interval_empty(&req->i));
537 drbd_insert_interval(&mdev->read_requests, &req->i);
525 538
526 drbd_set_flag(mdev, UNPLUG_REMOTE); 539 set_bit(UNPLUG_REMOTE, &mdev->flags);
527 540
528 D_ASSERT(req->rq_state & RQ_NET_PENDING); 541 D_ASSERT(req->rq_state & RQ_NET_PENDING);
529 req->rq_state |= RQ_NET_QUEUED; 542 D_ASSERT((req->rq_state & RQ_LOCAL_MASK) == 0);
530 req->w.cb = (req->rq_state & RQ_LOCAL_MASK) 543 mod_rq_state(req, m, 0, RQ_NET_QUEUED);
531 ? w_read_retry_remote 544 req->w.cb = w_send_read_req;
532 : w_send_read_req; 545 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
533 drbd_queue_work(&mdev->data.work, &req->w);
534 break; 546 break;
535 547
536 case queue_for_net_write: 548 case QUEUE_FOR_NET_WRITE:
537 /* assert something? */ 549 /* assert something? */
538 /* from drbd_make_request_common only */ 550 /* from __drbd_make_request only */
539 551
540 hlist_add_head(&req->collision, tl_hash_slot(mdev, req->sector)); 552 /* Corresponding drbd_remove_request_interval is in
541 /* corresponding hlist_del is in _req_may_be_done() */ 553 * drbd_req_complete() */
554 D_ASSERT(drbd_interval_empty(&req->i));
555 drbd_insert_interval(&mdev->write_requests, &req->i);
542 556
543 /* NOTE 557 /* NOTE
544 * In case the req ended up on the transfer log before being 558 * In case the req ended up on the transfer log before being
@@ -549,7 +563,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
549 * 563 *
550 * _req_add_to_epoch(req); this has to be after the 564 * _req_add_to_epoch(req); this has to be after the
551 * _maybe_start_new_epoch(req); which happened in 565 * _maybe_start_new_epoch(req); which happened in
552 * drbd_make_request_common, because we now may set the bit 566 * __drbd_make_request, because we now may set the bit
553 * again ourselves to close the current epoch. 567 * again ourselves to close the current epoch.
554 * 568 *
555 * Add req to the (now) current epoch (barrier). */ 569 * Add req to the (now) current epoch (barrier). */
@@ -557,204 +571,189 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
557 /* otherwise we may lose an unplug, which may cause some remote 571 /* otherwise we may lose an unplug, which may cause some remote
558 * io-scheduler timeout to expire, increasing maximum latency, 572 * io-scheduler timeout to expire, increasing maximum latency,
559 * hurting performance. */ 573 * hurting performance. */
560 drbd_set_flag(mdev, UNPLUG_REMOTE); 574 set_bit(UNPLUG_REMOTE, &mdev->flags);
561
562 /* see drbd_make_request_common,
563 * just after it grabs the req_lock */
564 D_ASSERT(drbd_test_flag(mdev, CREATE_BARRIER) == 0);
565
566 req->epoch = mdev->newest_tle->br_number;
567
568 /* increment size of current epoch */
569 mdev->newest_tle->n_writes++;
570 575
571 /* queue work item to send data */ 576 /* queue work item to send data */
572 D_ASSERT(req->rq_state & RQ_NET_PENDING); 577 D_ASSERT(req->rq_state & RQ_NET_PENDING);
573 req->rq_state |= RQ_NET_QUEUED; 578 mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
574 req->w.cb = w_send_dblock; 579 req->w.cb = w_send_dblock;
575 drbd_queue_work(&mdev->data.work, &req->w); 580 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
576 581
577 /* close the epoch, in case it outgrew the limit */ 582 /* close the epoch, in case it outgrew the limit */
578 if (mdev->newest_tle->n_writes >= mdev->net_conf->max_epoch_size) 583 rcu_read_lock();
579 queue_barrier(mdev); 584 nc = rcu_dereference(mdev->tconn->net_conf);
585 p = nc->max_epoch_size;
586 rcu_read_unlock();
587 if (mdev->tconn->current_tle_writes >= p)
588 start_new_tl_epoch(mdev->tconn);
580 589
581 break; 590 break;
582 591
583 case queue_for_send_oos: 592 case QUEUE_FOR_SEND_OOS:
584 req->rq_state |= RQ_NET_QUEUED; 593 mod_rq_state(req, m, 0, RQ_NET_QUEUED);
585 req->w.cb = w_send_oos; 594 req->w.cb = w_send_out_of_sync;
586 drbd_queue_work(&mdev->data.work, &req->w); 595 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
587 break; 596 break;
588 597
589 case read_retry_remote_canceled: 598 case READ_RETRY_REMOTE_CANCELED:
590 case send_canceled: 599 case SEND_CANCELED:
591 case send_failed: 600 case SEND_FAILED:
592 /* real cleanup will be done from tl_clear. just update flags 601 /* real cleanup will be done from tl_clear. just update flags
593 * so it is no longer marked as on the worker queue */ 602 * so it is no longer marked as on the worker queue */
594 req->rq_state &= ~RQ_NET_QUEUED; 603 mod_rq_state(req, m, RQ_NET_QUEUED, 0);
595 /* if we did it right, tl_clear should be scheduled only after
596 * this, so this should not be necessary! */
597 _req_may_be_done_not_susp(req, m);
598 break; 604 break;
599 605
600 case handed_over_to_network: 606 case HANDED_OVER_TO_NETWORK:
601 /* assert something? */ 607 /* assert something? */
602 if (bio_data_dir(req->master_bio) == WRITE)
603 atomic_add(req->size>>9, &mdev->ap_in_flight);
604
605 if (bio_data_dir(req->master_bio) == WRITE && 608 if (bio_data_dir(req->master_bio) == WRITE &&
606 mdev->net_conf->wire_protocol == DRBD_PROT_A) { 609 !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
607 /* this is what is dangerous about protocol A: 610 /* this is what is dangerous about protocol A:
608 * pretend it was successfully written on the peer. */ 611 * pretend it was successfully written on the peer. */
609 if (req->rq_state & RQ_NET_PENDING) { 612 if (req->rq_state & RQ_NET_PENDING)
610 dec_ap_pending(mdev); 613 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
611 req->rq_state &= ~RQ_NET_PENDING; 614 /* else: neg-ack was faster... */
612 req->rq_state |= RQ_NET_OK;
613 } /* else: neg-ack was faster... */
614 /* it is still not yet RQ_NET_DONE until the 615 /* it is still not yet RQ_NET_DONE until the
615 * corresponding epoch barrier got acked as well, 616 * corresponding epoch barrier got acked as well,
616 * so we know what to dirty on connection loss */ 617 * so we know what to dirty on connection loss */
617 } 618 }
618 req->rq_state &= ~RQ_NET_QUEUED; 619 mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
619 req->rq_state |= RQ_NET_SENT;
620 _req_may_be_done_not_susp(req, m);
621 break; 620 break;
622 621
623 case oos_handed_to_network: 622 case OOS_HANDED_TO_NETWORK:
624 /* Was not set PENDING, no longer QUEUED, so is now DONE 623 /* Was not set PENDING, no longer QUEUED, so is now DONE
625 * as far as this connection is concerned. */ 624 * as far as this connection is concerned. */
626 req->rq_state &= ~RQ_NET_QUEUED; 625 mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
627 req->rq_state |= RQ_NET_DONE;
628 _req_may_be_done_not_susp(req, m);
629 break; 626 break;
630 627
631 case connection_lost_while_pending: 628 case CONNECTION_LOST_WHILE_PENDING:
632 /* transfer log cleanup after connection loss */ 629 /* transfer log cleanup after connection loss */
633 /* assert something? */ 630 mod_rq_state(req, m,
634 if (req->rq_state & RQ_NET_PENDING) 631 RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
635 dec_ap_pending(mdev); 632 RQ_NET_DONE);
636 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
637 req->rq_state |= RQ_NET_DONE;
638 if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)
639 atomic_sub(req->size>>9, &mdev->ap_in_flight);
640
641 /* if it is still queued, we may not complete it here.
642 * it will be canceled soon. */
643 if (!(req->rq_state & RQ_NET_QUEUED))
644 _req_may_be_done(req, m); /* Allowed while state.susp */
645 break; 633 break;
646 634
647 case conflict_discarded_by_peer: 635 case CONFLICT_RESOLVED:
648 /* for discarded conflicting writes of multiple primaries, 636 /* for superseded conflicting writes of multiple primaries,
649 * there is no need to keep anything in the tl, potential 637 * there is no need to keep anything in the tl, potential
650 * node crashes are covered by the activity log. */ 638 * node crashes are covered by the activity log.
651 if (what == conflict_discarded_by_peer) 639 *
652 dev_alert(DEV, "Got DiscardAck packet %llus +%u!" 640 * If this request had been marked as RQ_POSTPONED before,
653 " DRBD is not a random data generator!\n", 641 * it will actually not be completed, but "restarted",
654 (unsigned long long)req->sector, req->size); 642 * resubmitted from the retry worker context. */
655 req->rq_state |= RQ_NET_DONE; 643 D_ASSERT(req->rq_state & RQ_NET_PENDING);
656 /* fall through */ 644 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
657 case write_acked_by_peer_and_sis: 645 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
658 case write_acked_by_peer: 646 break;
659 if (what == write_acked_by_peer_and_sis) 647
660 req->rq_state |= RQ_NET_SIS; 648 case WRITE_ACKED_BY_PEER_AND_SIS:
649 req->rq_state |= RQ_NET_SIS;
650 case WRITE_ACKED_BY_PEER:
651 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
661 /* protocol C; successfully written on peer. 652 /* protocol C; successfully written on peer.
662 * Nothing more to do here. 653 * Nothing more to do here.
663 * We want to keep the tl in place for all protocols, to cater 654 * We want to keep the tl in place for all protocols, to cater
664 * for volatile write-back caches on lower level devices. */ 655 * for volatile write-back caches on lower level devices. */
665 656
666 case recv_acked_by_peer: 657 goto ack_common;
658 case RECV_ACKED_BY_PEER:
659 D_ASSERT(req->rq_state & RQ_EXP_RECEIVE_ACK);
667 /* protocol B; pretends to be successfully written on peer. 660 /* protocol B; pretends to be successfully written on peer.
668 * see also notes above in handed_over_to_network about 661 * see also notes above in HANDED_OVER_TO_NETWORK about
669 * protocol != C */ 662 * protocol != C */
670 req->rq_state |= RQ_NET_OK; 663 ack_common:
671 D_ASSERT(req->rq_state & RQ_NET_PENDING); 664 D_ASSERT(req->rq_state & RQ_NET_PENDING);
672 dec_ap_pending(mdev); 665 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
673 atomic_sub(req->size>>9, &mdev->ap_in_flight);
674 req->rq_state &= ~RQ_NET_PENDING;
675 _req_may_be_done_not_susp(req, m);
676 break; 666 break;
677 667
678 case neg_acked: 668 case POSTPONE_WRITE:
679 /* assert something? */ 669 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
680 if (req->rq_state & RQ_NET_PENDING) { 670 /* If this node has already detected the write conflict, the
681 dec_ap_pending(mdev); 671 * worker will be waiting on misc_wait. Wake it up once this
682 atomic_sub(req->size>>9, &mdev->ap_in_flight); 672 * request has completed locally.
683 } 673 */
684 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING); 674 D_ASSERT(req->rq_state & RQ_NET_PENDING);
675 req->rq_state |= RQ_POSTPONED;
676 if (req->i.waiting)
677 wake_up(&mdev->misc_wait);
678 /* Do not clear RQ_NET_PENDING. This request will make further
679 * progress via restart_conflicting_writes() or
680 * fail_postponed_requests(). Hopefully. */
681 break;
685 682
686 req->rq_state |= RQ_NET_DONE; 683 case NEG_ACKED:
687 _req_may_be_done_not_susp(req, m); 684 mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
688 /* else: done by handed_over_to_network */
689 break; 685 break;
690 686
691 case fail_frozen_disk_io: 687 case FAIL_FROZEN_DISK_IO:
692 if (!(req->rq_state & RQ_LOCAL_COMPLETED)) 688 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
693 break; 689 break;
694 690 mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
695 _req_may_be_done(req, m); /* Allowed while state.susp */
696 break; 691 break;
697 692
698 case restart_frozen_disk_io: 693 case RESTART_FROZEN_DISK_IO:
699 if (!(req->rq_state & RQ_LOCAL_COMPLETED)) 694 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
700 break; 695 break;
701 696
702 req->rq_state &= ~RQ_LOCAL_COMPLETED; 697 mod_rq_state(req, m,
698 RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
699 RQ_LOCAL_PENDING);
703 700
704 rv = MR_READ; 701 rv = MR_READ;
705 if (bio_data_dir(req->master_bio) == WRITE) 702 if (bio_data_dir(req->master_bio) == WRITE)
706 rv = MR_WRITE; 703 rv = MR_WRITE;
707 704
708 get_ldev(mdev); 705 get_ldev(mdev); /* always succeeds in this call path */
709 req->w.cb = w_restart_disk_io; 706 req->w.cb = w_restart_disk_io;
710 drbd_queue_work(&mdev->data.work, &req->w); 707 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
711 break; 708 break;
712 709
713 case resend: 710 case RESEND:
714 /* Simply complete (local only) READs. */ 711 /* Simply complete (local only) READs. */
715 if (!(req->rq_state & RQ_WRITE) && !req->w.cb) { 712 if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
716 _req_may_be_done(req, m); 713 mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
717 break; 714 break;
718 } 715 }
719 716
720 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK 717 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
721 before the connection loss (B&C only); only P_BARRIER_ACK was missing. 718 before the connection loss (B&C only); only P_BARRIER_ACK
722 Trowing them out of the TL here by pretending we got a BARRIER_ACK 719 (or the local completion?) was missing when we suspended.
723 We ensure that the peer was not rebooted */ 720 Throwing them out of the TL here by pretending we got a BARRIER_ACK.
721 During connection handshake, we ensure that the peer was not rebooted. */
724 if (!(req->rq_state & RQ_NET_OK)) { 722 if (!(req->rq_state & RQ_NET_OK)) {
723 /* FIXME could this possibly be a req->w.cb == w_send_out_of_sync?
724 * in that case we must not set RQ_NET_PENDING. */
725
726 mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
725 if (req->w.cb) { 727 if (req->w.cb) {
726 drbd_queue_work(&mdev->data.work, &req->w); 728 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
727 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; 729 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
728 } 730 } /* else: FIXME can this happen? */
729 break; 731 break;
730 } 732 }
731 /* else, fall through to barrier_acked */ 733 /* else, fall through to BARRIER_ACKED */
732 734
733 case barrier_acked: 735 case BARRIER_ACKED:
736 /* barrier ack for READ requests does not make sense */
734 if (!(req->rq_state & RQ_WRITE)) 737 if (!(req->rq_state & RQ_WRITE))
735 break; 738 break;
736 739
737 if (req->rq_state & RQ_NET_PENDING) { 740 if (req->rq_state & RQ_NET_PENDING) {
738 /* barrier came in before all requests have been acked. 741 /* barrier came in before all requests were acked.
739 * this is bad, because if the connection is lost now, 742 * this is bad, because if the connection is lost now,
740 * we won't be able to clean them up... */ 743 * we won't be able to clean them up... */
741 dev_err(DEV, "FIXME (barrier_acked but pending)\n"); 744 dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
742 list_move(&req->tl_requests, &mdev->out_of_sequence_requests);
743 } 745 }
744 if ((req->rq_state & RQ_NET_MASK) != 0) { 746 /* Allowed to complete requests, even while suspended.
745 req->rq_state |= RQ_NET_DONE; 747 * As this is called for all requests within a matching epoch,
746 if (mdev->net_conf->wire_protocol == DRBD_PROT_A) 748 * we need to filter, and only set RQ_NET_DONE for those that
747 atomic_sub(req->size>>9, &mdev->ap_in_flight); 749 * have actually been on the wire. */
748 } 750 mod_rq_state(req, m, RQ_COMPLETION_SUSP,
749 _req_may_be_done(req, m); /* Allowed while state.susp */ 751 (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
750 break; 752 break;
751 753
752 case data_received: 754 case DATA_RECEIVED:
753 D_ASSERT(req->rq_state & RQ_NET_PENDING); 755 D_ASSERT(req->rq_state & RQ_NET_PENDING);
754 dec_ap_pending(mdev); 756 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
755 req->rq_state &= ~RQ_NET_PENDING;
756 req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
757 _req_may_be_done_not_susp(req, m);
758 break; 757 break;
759 }; 758 };
760 759
@@ -768,75 +767,265 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
768 * since size may be bigger than BM_BLOCK_SIZE, 767 * since size may be bigger than BM_BLOCK_SIZE,
769 * we may need to check several bits. 768 * we may need to check several bits.
770 */ 769 */
771static int drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size) 770static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
772{ 771{
773 unsigned long sbnr, ebnr; 772 unsigned long sbnr, ebnr;
774 sector_t esector, nr_sectors; 773 sector_t esector, nr_sectors;
775 774
776 if (mdev->state.disk == D_UP_TO_DATE) 775 if (mdev->state.disk == D_UP_TO_DATE)
777 return 1; 776 return true;
778 if (mdev->state.disk >= D_OUTDATED) 777 if (mdev->state.disk != D_INCONSISTENT)
779 return 0; 778 return false;
780 if (mdev->state.disk < D_INCONSISTENT)
781 return 0;
782 /* state.disk == D_INCONSISTENT We will have a look at the BitMap */
783 nr_sectors = drbd_get_capacity(mdev->this_bdev);
784 esector = sector + (size >> 9) - 1; 779 esector = sector + (size >> 9) - 1;
785 780 nr_sectors = drbd_get_capacity(mdev->this_bdev);
786 D_ASSERT(sector < nr_sectors); 781 D_ASSERT(sector < nr_sectors);
787 D_ASSERT(esector < nr_sectors); 782 D_ASSERT(esector < nr_sectors);
788 783
789 sbnr = BM_SECT_TO_BIT(sector); 784 sbnr = BM_SECT_TO_BIT(sector);
790 ebnr = BM_SECT_TO_BIT(esector); 785 ebnr = BM_SECT_TO_BIT(esector);
791 786
792 return 0 == drbd_bm_count_bits(mdev, sbnr, ebnr); 787 return drbd_bm_count_bits(mdev, sbnr, ebnr) == 0;
788}
789
790static bool remote_due_to_read_balancing(struct drbd_conf *mdev, sector_t sector,
791 enum drbd_read_balancing rbm)
792{
793 struct backing_dev_info *bdi;
794 int stripe_shift;
795
796 switch (rbm) {
797 case RB_CONGESTED_REMOTE:
798 bdi = &mdev->ldev->backing_bdev->bd_disk->queue->backing_dev_info;
799 return bdi_read_congested(bdi);
800 case RB_LEAST_PENDING:
801 return atomic_read(&mdev->local_cnt) >
802 atomic_read(&mdev->ap_pending_cnt) + atomic_read(&mdev->rs_pending_cnt);
803 case RB_32K_STRIPING: /* stripe_shift = 15 */
804 case RB_64K_STRIPING:
805 case RB_128K_STRIPING:
806 case RB_256K_STRIPING:
807 case RB_512K_STRIPING:
808 case RB_1M_STRIPING: /* stripe_shift = 20 */
809 stripe_shift = (rbm - RB_32K_STRIPING + 15);
810 return (sector >> (stripe_shift - 9)) & 1;
811 case RB_ROUND_ROBIN:
812 return test_and_change_bit(READ_BALANCE_RR, &mdev->flags);
813 case RB_PREFER_REMOTE:
814 return true;
815 case RB_PREFER_LOCAL:
816 default:
817 return false;
818 }
819}
820
821/*
822 * complete_conflicting_writes - wait for any conflicting write requests
823 *
824 * The write_requests tree contains all active write requests which we
825 * currently know about. Wait for any requests to complete which conflict with
826 * the new one.
827 *
828 * Only way out: remove the conflicting intervals from the tree.
829 */
830static void complete_conflicting_writes(struct drbd_request *req)
831{
832 DEFINE_WAIT(wait);
833 struct drbd_conf *mdev = req->w.mdev;
834 struct drbd_interval *i;
835 sector_t sector = req->i.sector;
836 int size = req->i.size;
837
838 i = drbd_find_overlap(&mdev->write_requests, sector, size);
839 if (!i)
840 return;
841
842 for (;;) {
843 prepare_to_wait(&mdev->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
844 i = drbd_find_overlap(&mdev->write_requests, sector, size);
845 if (!i)
846 break;
847 /* Indicate to wake up device->misc_wait on progress. */
848 i->waiting = true;
849 spin_unlock_irq(&mdev->tconn->req_lock);
850 schedule();
851 spin_lock_irq(&mdev->tconn->req_lock);
852 }
853 finish_wait(&mdev->misc_wait, &wait);
793} 854}
794 855
856/* called within req_lock and rcu_read_lock() */
795static void maybe_pull_ahead(struct drbd_conf *mdev) 857static void maybe_pull_ahead(struct drbd_conf *mdev)
796{ 858{
797 int congested = 0; 859 struct drbd_tconn *tconn = mdev->tconn;
860 struct net_conf *nc;
861 bool congested = false;
862 enum drbd_on_congestion on_congestion;
863
864 nc = rcu_dereference(tconn->net_conf);
865 on_congestion = nc ? nc->on_congestion : OC_BLOCK;
866 if (on_congestion == OC_BLOCK ||
867 tconn->agreed_pro_version < 96)
868 return;
798 869
799 /* If I don't even have good local storage, we can not reasonably try 870 /* If I don't even have good local storage, we can not reasonably try
800 * to pull ahead of the peer. We also need the local reference to make 871 * to pull ahead of the peer. We also need the local reference to make
801 * sure mdev->act_log is there. 872 * sure mdev->act_log is there.
802 * Note: caller has to make sure that net_conf is there.
803 */ 873 */
804 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) 874 if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
805 return; 875 return;
806 876
807 if (mdev->net_conf->cong_fill && 877 if (nc->cong_fill &&
808 atomic_read(&mdev->ap_in_flight) >= mdev->net_conf->cong_fill) { 878 atomic_read(&mdev->ap_in_flight) >= nc->cong_fill) {
809 dev_info(DEV, "Congestion-fill threshold reached\n"); 879 dev_info(DEV, "Congestion-fill threshold reached\n");
810 congested = 1; 880 congested = true;
811 } 881 }
812 882
813 if (mdev->act_log->used >= mdev->net_conf->cong_extents) { 883 if (mdev->act_log->used >= nc->cong_extents) {
814 dev_info(DEV, "Congestion-extents threshold reached\n"); 884 dev_info(DEV, "Congestion-extents threshold reached\n");
815 congested = 1; 885 congested = true;
816 } 886 }
817 887
818 if (congested) { 888 if (congested) {
819 queue_barrier(mdev); /* last barrier, after mirrored writes */ 889 /* start a new epoch for non-mirrored writes */
890 start_new_tl_epoch(mdev->tconn);
820 891
821 if (mdev->net_conf->on_congestion == OC_PULL_AHEAD) 892 if (on_congestion == OC_PULL_AHEAD)
822 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL); 893 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
823 else /*mdev->net_conf->on_congestion == OC_DISCONNECT */ 894 else /*nc->on_congestion == OC_DISCONNECT */
824 _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL); 895 _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
825 } 896 }
826 put_ldev(mdev); 897 put_ldev(mdev);
827} 898}
828 899
829static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time) 900/* If this returns false, and req->private_bio is still set,
901 * this should be submitted locally.
902 *
903 * If it returns false, but req->private_bio is not set,
904 * we do not have access to good data :(
905 *
906 * Otherwise, this destroys req->private_bio, if any,
907 * and returns true.
908 */
909static bool do_remote_read(struct drbd_request *req)
910{
911 struct drbd_conf *mdev = req->w.mdev;
912 enum drbd_read_balancing rbm;
913
914 if (req->private_bio) {
915 if (!drbd_may_do_local_read(mdev,
916 req->i.sector, req->i.size)) {
917 bio_put(req->private_bio);
918 req->private_bio = NULL;
919 put_ldev(mdev);
920 }
921 }
922
923 if (mdev->state.pdsk != D_UP_TO_DATE)
924 return false;
925
926 if (req->private_bio == NULL)
927 return true;
928
929 /* TODO: improve read balancing decisions, take into account drbd
930 * protocol, pending requests etc. */
931
932 rcu_read_lock();
933 rbm = rcu_dereference(mdev->ldev->disk_conf)->read_balancing;
934 rcu_read_unlock();
935
936 if (rbm == RB_PREFER_LOCAL && req->private_bio)
937 return false; /* submit locally */
938
939 if (remote_due_to_read_balancing(mdev, req->i.sector, rbm)) {
940 if (req->private_bio) {
941 bio_put(req->private_bio);
942 req->private_bio = NULL;
943 put_ldev(mdev);
944 }
945 return true;
946 }
947
948 return false;
949}
950
951/* returns number of connections (== 1, for drbd 8.4)
952 * expected to actually write this data,
953 * which does NOT include those that we are L_AHEAD for. */
954static int drbd_process_write_request(struct drbd_request *req)
955{
956 struct drbd_conf *mdev = req->w.mdev;
957 int remote, send_oos;
958
959 rcu_read_lock();
960 remote = drbd_should_do_remote(mdev->state);
961 if (remote) {
962 maybe_pull_ahead(mdev);
963 remote = drbd_should_do_remote(mdev->state);
964 }
965 send_oos = drbd_should_send_out_of_sync(mdev->state);
966 rcu_read_unlock();
967
968 /* Need to replicate writes. Unless it is an empty flush,
969 * which is better mapped to a DRBD P_BARRIER packet,
970 * also for drbd wire protocol compatibility reasons.
971 * If this was a flush, just start a new epoch.
972 * Unless the current epoch was empty anyways, or we are not currently
973 * replicating, in which case there is no point. */
974 if (unlikely(req->i.size == 0)) {
975 /* The only size==0 bios we expect are empty flushes. */
976 D_ASSERT(req->master_bio->bi_rw & REQ_FLUSH);
977 if (remote)
978 start_new_tl_epoch(mdev->tconn);
979 return 0;
980 }
981
982 if (!remote && !send_oos)
983 return 0;
984
985 D_ASSERT(!(remote && send_oos));
986
987 if (remote) {
988 _req_mod(req, TO_BE_SENT);
989 _req_mod(req, QUEUE_FOR_NET_WRITE);
990 } else if (drbd_set_out_of_sync(mdev, req->i.sector, req->i.size))
991 _req_mod(req, QUEUE_FOR_SEND_OOS);
992
993 return remote;
994}
995
996static void
997drbd_submit_req_private_bio(struct drbd_request *req)
998{
999 struct drbd_conf *mdev = req->w.mdev;
1000 struct bio *bio = req->private_bio;
1001 const int rw = bio_rw(bio);
1002
1003 bio->bi_bdev = mdev->ldev->backing_bdev;
1004
1005 /* State may have changed since we grabbed our reference on the
1006 * ->ldev member. Double check, and short-circuit to endio.
1007 * In case the last activity log transaction failed to get on
1008 * stable storage, and this is a WRITE, we may not even submit
1009 * this bio. */
1010 if (get_ldev(mdev)) {
1011 if (drbd_insert_fault(mdev,
1012 rw == WRITE ? DRBD_FAULT_DT_WR
1013 : rw == READ ? DRBD_FAULT_DT_RD
1014 : DRBD_FAULT_DT_RA))
1015 bio_endio(bio, -EIO);
1016 else
1017 generic_make_request(bio);
1018 put_ldev(mdev);
1019 } else
1020 bio_endio(bio, -EIO);
1021}
1022
1023void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
830{ 1024{
831 const int rw = bio_rw(bio); 1025 const int rw = bio_rw(bio);
832 const int size = bio->bi_size; 1026 struct bio_and_error m = { NULL, };
833 const sector_t sector = bio->bi_sector;
834 struct drbd_tl_epoch *b = NULL;
835 struct drbd_request *req; 1027 struct drbd_request *req;
836 int local, remote, send_oos = 0; 1028 bool no_remote = false;
837 int err = -EIO;
838 int ret = 0;
839 union drbd_state s;
840 1029
841 /* allocate outside of all locks; */ 1030 /* allocate outside of all locks; */
842 req = drbd_req_new(mdev, bio); 1031 req = drbd_req_new(mdev, bio);
@@ -846,55 +1035,14 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, uns
846 * if user cannot handle io errors, that's not our business. */ 1035 * if user cannot handle io errors, that's not our business. */
847 dev_err(DEV, "could not kmalloc() req\n"); 1036 dev_err(DEV, "could not kmalloc() req\n");
848 bio_endio(bio, -ENOMEM); 1037 bio_endio(bio, -ENOMEM);
849 return 0; 1038 return;
850 } 1039 }
851 req->start_time = start_time; 1040 req->start_time = start_time;
852 1041
853 local = get_ldev(mdev); 1042 if (!get_ldev(mdev)) {
854 if (!local) { 1043 bio_put(req->private_bio);
855 bio_put(req->private_bio); /* or we get a bio leak */
856 req->private_bio = NULL; 1044 req->private_bio = NULL;
857 } 1045 }
858 if (rw == WRITE) {
859 /* Need to replicate writes. Unless it is an empty flush,
860 * which is better mapped to a DRBD P_BARRIER packet,
861 * also for drbd wire protocol compatibility reasons. */
862 if (unlikely(size == 0)) {
863 /* The only size==0 bios we expect are empty flushes. */
864 D_ASSERT(bio->bi_rw & REQ_FLUSH);
865 remote = 0;
866 } else
867 remote = 1;
868 } else {
869 /* READ || READA */
870 if (local) {
871 if (!drbd_may_do_local_read(mdev, sector, size)) {
872 /* we could kick the syncer to
873 * sync this extent asap, wait for
874 * it, then continue locally.
875 * Or just issue the request remotely.
876 */
877 local = 0;
878 bio_put(req->private_bio);
879 req->private_bio = NULL;
880 put_ldev(mdev);
881 }
882 }
883 remote = !local && mdev->state.pdsk >= D_UP_TO_DATE;
884 }
885
886 /* If we have a disk, but a READA request is mapped to remote,
887 * we are R_PRIMARY, D_INCONSISTENT, SyncTarget.
888 * Just fail that READA request right here.
889 *
890 * THINK: maybe fail all READA when not local?
891 * or make this configurable...
892 * if network is slow, READA won't do any good.
893 */
894 if (rw == READA && mdev->state.disk >= D_INCONSISTENT && !local) {
895 err = -EWOULDBLOCK;
896 goto fail_and_free_req;
897 }
898 1046
899 /* For WRITES going to the local disk, grab a reference on the target 1047 /* For WRITES going to the local disk, grab a reference on the target
900 * extent. This waits for any resync activity in the corresponding 1048 * extent. This waits for any resync activity in the corresponding
@@ -903,349 +1051,131 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, uns
903 * of transactional on-disk meta data updates. 1051 * of transactional on-disk meta data updates.
904 * Empty flushes don't need to go into the activity log, they can only 1052 * Empty flushes don't need to go into the activity log, they can only
905 * flush data for pending writes which are already in there. */ 1053 * flush data for pending writes which are already in there. */
906 if (rw == WRITE && local && size 1054 if (rw == WRITE && req->private_bio && req->i.size
907 && !drbd_test_flag(mdev, AL_SUSPENDED)) { 1055 && !test_bit(AL_SUSPENDED, &mdev->flags)) {
908 req->rq_state |= RQ_IN_ACT_LOG; 1056 req->rq_state |= RQ_IN_ACT_LOG;
909 drbd_al_begin_io(mdev, sector); 1057 drbd_al_begin_io(mdev, &req->i);
910 } 1058 }
911 1059
912 s = mdev->state; 1060 spin_lock_irq(&mdev->tconn->req_lock);
913 remote = remote && drbd_should_do_remote(s); 1061 if (rw == WRITE) {
914 send_oos = rw == WRITE && drbd_should_send_oos(s); 1062 /* This may temporarily give up the req_lock,
915 D_ASSERT(!(remote && send_oos)); 1063 * but will re-aquire it before it returns here.
916 1064 * Needs to be before the check on drbd_suspended() */
917 if (!(local || remote) && !is_susp(mdev->state)) { 1065 complete_conflicting_writes(req);
918 if (__ratelimit(&drbd_ratelimit_state))
919 dev_err(DEV, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
920 (unsigned long long)req->sector, req->size >> 9);
921 goto fail_free_complete;
922 }
923
924 /* For WRITE request, we have to make sure that we have an
925 * unused_spare_tle, in case we need to start a new epoch.
926 * I try to be smart and avoid to pre-allocate always "just in case",
927 * but there is a race between testing the bit and pointer outside the
928 * spinlock, and grabbing the spinlock.
929 * if we lost that race, we retry. */
930 if (rw == WRITE && (remote || send_oos) &&
931 mdev->unused_spare_tle == NULL &&
932 drbd_test_flag(mdev, CREATE_BARRIER)) {
933allocate_barrier:
934 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
935 if (!b) {
936 dev_err(DEV, "Failed to alloc barrier.\n");
937 err = -ENOMEM;
938 goto fail_free_complete;
939 }
940 } 1066 }
941 1067
942 /* GOOD, everything prepared, grab the spin_lock */ 1068 /* no more giving up req_lock from now on! */
943 spin_lock_irq(&mdev->req_lock);
944
945 if (is_susp(mdev->state)) {
946 /* If we got suspended, use the retry mechanism of
947 drbd_make_request() to restart processing of this
948 bio. In the next call to drbd_make_request
949 we sleep in inc_ap_bio() */
950 ret = 1;
951 spin_unlock_irq(&mdev->req_lock);
952 goto fail_free_complete;
953 }
954 1069
955 if (remote || send_oos) { 1070 if (drbd_suspended(mdev)) {
956 remote = drbd_should_do_remote(mdev->state); 1071 /* push back and retry: */
957 send_oos = rw == WRITE && drbd_should_send_oos(mdev->state); 1072 req->rq_state |= RQ_POSTPONED;
958 D_ASSERT(!(remote && send_oos)); 1073 if (req->private_bio) {
959 1074 bio_put(req->private_bio);
960 if (!(remote || send_oos)) 1075 req->private_bio = NULL;
961 dev_warn(DEV, "lost connection while grabbing the req_lock!\n"); 1076 put_ldev(mdev);
962 if (!(local || remote)) {
963 dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
964 spin_unlock_irq(&mdev->req_lock);
965 goto fail_free_complete;
966 } 1077 }
1078 goto out;
967 } 1079 }
968 1080
969 if (b && mdev->unused_spare_tle == NULL) {
970 mdev->unused_spare_tle = b;
971 b = NULL;
972 }
973 if (rw == WRITE && (remote || send_oos) &&
974 mdev->unused_spare_tle == NULL &&
975 drbd_test_flag(mdev, CREATE_BARRIER)) {
976 /* someone closed the current epoch
977 * while we were grabbing the spinlock */
978 spin_unlock_irq(&mdev->req_lock);
979 goto allocate_barrier;
980 }
981
982
983 /* Update disk stats */ 1081 /* Update disk stats */
984 _drbd_start_io_acct(mdev, req, bio); 1082 _drbd_start_io_acct(mdev, req, bio);
985 1083
986 /* _maybe_start_new_epoch(mdev); 1084 /* We fail READ/READA early, if we can not serve it.
987 * If we need to generate a write barrier packet, we have to add the 1085 * We must do this before req is registered on any lists.
988 * new epoch (barrier) object, and queue the barrier packet for sending, 1086 * Otherwise, drbd_req_complete() will queue failed READ for retry. */
989 * and queue the req's data after it _within the same lock_, otherwise 1087 if (rw != WRITE) {
990 * we have race conditions were the reorder domains could be mixed up. 1088 if (!do_remote_read(req) && !req->private_bio)
991 * 1089 goto nodata;
992 * Even read requests may start a new epoch and queue the corresponding
993 * barrier packet. To get the write ordering right, we only have to
994 * make sure that, if this is a write request and it triggered a
995 * barrier packet, this request is queued within the same spinlock. */
996 if ((remote || send_oos) && mdev->unused_spare_tle &&
997 drbd_test_and_clear_flag(mdev, CREATE_BARRIER)) {
998 _tl_add_barrier(mdev, mdev->unused_spare_tle);
999 mdev->unused_spare_tle = NULL;
1000 } else {
1001 D_ASSERT(!(remote && rw == WRITE &&
1002 drbd_test_flag(mdev, CREATE_BARRIER)));
1003 } 1090 }
1004 1091
1005 /* NOTE 1092 /* which transfer log epoch does this belong to? */
1006 * Actually, 'local' may be wrong here already, since we may have failed 1093 req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
1007 * to write to the meta data, and may become wrong anytime because of
1008 * local io-error for some other request, which would lead to us
1009 * "detaching" the local disk.
1010 *
1011 * 'remote' may become wrong any time because the network could fail.
1012 *
1013 * This is a harmless race condition, though, since it is handled
1014 * correctly at the appropriate places; so it just defers the failure
1015 * of the respective operation.
1016 */
1017
1018 /* mark them early for readability.
1019 * this just sets some state flags. */
1020 if (remote)
1021 _req_mod(req, to_be_send);
1022 if (local)
1023 _req_mod(req, to_be_submitted);
1024
1025 /* check this request on the collision detection hash tables.
1026 * if we have a conflict, just complete it here.
1027 * THINK do we want to check reads, too? (I don't think so...) */
1028 if (rw == WRITE && _req_conflicts(req))
1029 goto fail_conflicting;
1030 1094
1031 /* no point in adding empty flushes to the transfer log, 1095 /* no point in adding empty flushes to the transfer log,
1032 * they are mapped to drbd barriers already. */ 1096 * they are mapped to drbd barriers already. */
1033 if (likely(size!=0)) 1097 if (likely(req->i.size!=0)) {
1034 list_add_tail(&req->tl_requests, &mdev->newest_tle->requests); 1098 if (rw == WRITE)
1099 mdev->tconn->current_tle_writes++;
1035 1100
1036 /* NOTE remote first: to get the concurrent write detection right, 1101 list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
1037 * we must register the request before start of local IO. */
1038 if (remote) {
1039 /* either WRITE and C_CONNECTED,
1040 * or READ, and no local disk,
1041 * or READ, but not in sync.
1042 */
1043 _req_mod(req, (rw == WRITE)
1044 ? queue_for_net_write
1045 : queue_for_net_read);
1046 } 1102 }
1047 if (send_oos && drbd_set_out_of_sync(mdev, sector, size))
1048 _req_mod(req, queue_for_send_oos);
1049
1050 if (remote &&
1051 mdev->net_conf->on_congestion != OC_BLOCK && mdev->agreed_pro_version >= 96)
1052 maybe_pull_ahead(mdev);
1053 1103
1054 /* If this was a flush, queue a drbd barrier/start a new epoch. 1104 if (rw == WRITE) {
1055 * Unless the current epoch was empty anyways, or we are not currently 1105 if (!drbd_process_write_request(req))
1056 * replicating, in which case there is no point. */ 1106 no_remote = true;
1057 if (unlikely(bio->bi_rw & REQ_FLUSH) 1107 } else {
1058 && mdev->newest_tle->n_writes 1108 /* We either have a private_bio, or we can read from remote.
1059 && drbd_should_do_remote(mdev->state)) 1109 * Otherwise we had done the goto nodata above. */
1060 queue_barrier(mdev); 1110 if (req->private_bio == NULL) {
1061 1111 _req_mod(req, TO_BE_SENT);
1062 spin_unlock_irq(&mdev->req_lock); 1112 _req_mod(req, QUEUE_FOR_NET_READ);
1063 kfree(b); /* if someone else has beaten us to it... */
1064
1065 if (local) {
1066 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1067
1068 /* State may have changed since we grabbed our reference on the
1069 * mdev->ldev member. Double check, and short-circuit to endio.
1070 * In case the last activity log transaction failed to get on
1071 * stable storage, and this is a WRITE, we may not even submit
1072 * this bio. */
1073 if (get_ldev(mdev)) {
1074 if (drbd_insert_fault(mdev, rw == WRITE ? DRBD_FAULT_DT_WR
1075 : rw == READ ? DRBD_FAULT_DT_RD
1076 : DRBD_FAULT_DT_RA))
1077 bio_endio(req->private_bio, -EIO);
1078 else
1079 generic_make_request(req->private_bio);
1080 put_ldev(mdev);
1081 } else 1113 } else
1082 bio_endio(req->private_bio, -EIO); 1114 no_remote = true;
1083 } 1115 }
1084 1116
1085 return 0; 1117 if (req->private_bio) {
1086 1118 /* needs to be marked within the same spinlock */
1087fail_conflicting: 1119 _req_mod(req, TO_BE_SUBMITTED);
1088 /* this is a conflicting request. 1120 /* but we need to give up the spinlock to submit */
1089 * even though it may have been only _partially_ 1121 spin_unlock_irq(&mdev->tconn->req_lock);
1090 * overlapping with one of the currently pending requests, 1122 drbd_submit_req_private_bio(req);
1091 * without even submitting or sending it, we will 1123 spin_lock_irq(&mdev->tconn->req_lock);
1092 * pretend that it was successfully served right now. 1124 } else if (no_remote) {
1093 */ 1125nodata:
1094 _drbd_end_io_acct(mdev, req); 1126 if (__ratelimit(&drbd_ratelimit_state))
1095 spin_unlock_irq(&mdev->req_lock); 1127 dev_err(DEV, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
1096 if (remote) 1128 (unsigned long long)req->i.sector, req->i.size >> 9);
1097 dec_ap_pending(mdev); 1129 /* A write may have been queued for send_oos, however.
1098 /* THINK: do we want to fail it (-EIO), or pretend success? 1130 * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
1099 * this pretends success. */
1100 err = 0;
1101
1102fail_free_complete:
1103 if (req->rq_state & RQ_IN_ACT_LOG)
1104 drbd_al_complete_io(mdev, sector);
1105fail_and_free_req:
1106 if (local) {
1107 bio_put(req->private_bio);
1108 req->private_bio = NULL;
1109 put_ldev(mdev);
1110 } 1131 }
1111 if (!ret)
1112 bio_endio(bio, err);
1113
1114 drbd_req_free(req);
1115 dec_ap_bio(mdev);
1116 kfree(b);
1117
1118 return ret;
1119}
1120 1132
1121/* helper function for drbd_make_request 1133out:
1122 * if we can determine just by the mdev (state) that this request will fail, 1134 if (drbd_req_put_completion_ref(req, &m, 1))
1123 * return 1 1135 kref_put(&req->kref, drbd_req_destroy);
1124 * otherwise return 0 1136 spin_unlock_irq(&mdev->tconn->req_lock);
1125 */
1126static int drbd_fail_request_early(struct drbd_conf *mdev, int is_write)
1127{
1128 if (mdev->state.role != R_PRIMARY &&
1129 (!allow_oos || is_write)) {
1130 if (__ratelimit(&drbd_ratelimit_state)) {
1131 dev_err(DEV, "Process %s[%u] tried to %s; "
1132 "since we are not in Primary state, "
1133 "we cannot allow this\n",
1134 current->comm, current->pid,
1135 is_write ? "WRITE" : "READ");
1136 }
1137 return 1;
1138 }
1139 1137
1140 return 0; 1138 if (m.bio)
1139 complete_master_bio(mdev, &m);
1140 return;
1141} 1141}
1142 1142
1143void drbd_make_request(struct request_queue *q, struct bio *bio) 1143void drbd_make_request(struct request_queue *q, struct bio *bio)
1144{ 1144{
1145 unsigned int s_enr, e_enr;
1146 struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata; 1145 struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1147 unsigned long start_time; 1146 unsigned long start_time;
1148 1147
1149 if (drbd_fail_request_early(mdev, bio_data_dir(bio) & WRITE)) {
1150 bio_endio(bio, -EPERM);
1151 return;
1152 }
1153
1154 start_time = jiffies; 1148 start_time = jiffies;
1155 1149
1156 /* 1150 /*
1157 * what we "blindly" assume: 1151 * what we "blindly" assume:
1158 */ 1152 */
1159 D_ASSERT((bio->bi_size & 0x1ff) == 0); 1153 D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
1160
1161 /* to make some things easier, force alignment of requests within the
1162 * granularity of our hash tables */
1163 s_enr = bio->bi_sector >> HT_SHIFT;
1164 e_enr = bio->bi_size ? (bio->bi_sector+(bio->bi_size>>9)-1) >> HT_SHIFT : s_enr;
1165
1166 if (likely(s_enr == e_enr)) {
1167 do {
1168 inc_ap_bio(mdev, 1);
1169 } while (drbd_make_request_common(mdev, bio, start_time));
1170 return;
1171 }
1172
1173 /* can this bio be split generically?
1174 * Maybe add our own split-arbitrary-bios function. */
1175 if (bio->bi_vcnt != 1 || bio->bi_idx != 0 || bio->bi_size > DRBD_MAX_BIO_SIZE) {
1176 /* rather error out here than BUG in bio_split */
1177 dev_err(DEV, "bio would need to, but cannot, be split: "
1178 "(vcnt=%u,idx=%u,size=%u,sector=%llu)\n",
1179 bio->bi_vcnt, bio->bi_idx, bio->bi_size,
1180 (unsigned long long)bio->bi_sector);
1181 bio_endio(bio, -EINVAL);
1182 } else {
1183 /* This bio crosses some boundary, so we have to split it. */
1184 struct bio_pair *bp;
1185 /* works for the "do not cross hash slot boundaries" case
1186 * e.g. sector 262269, size 4096
1187 * s_enr = 262269 >> 6 = 4097
1188 * e_enr = (262269+8-1) >> 6 = 4098
1189 * HT_SHIFT = 6
1190 * sps = 64, mask = 63
1191 * first_sectors = 64 - (262269 & 63) = 3
1192 */
1193 const sector_t sect = bio->bi_sector;
1194 const int sps = 1 << HT_SHIFT; /* sectors per slot */
1195 const int mask = sps - 1;
1196 const sector_t first_sectors = sps - (sect & mask);
1197 bp = bio_split(bio, first_sectors);
1198 1154
1199 /* we need to get a "reference count" (ap_bio_cnt) 1155 inc_ap_bio(mdev);
1200 * to avoid races with the disconnect/reconnect/suspend code. 1156 __drbd_make_request(mdev, bio, start_time);
1201 * In case we need to split the bio here, we need to get three references
1202 * atomically, otherwise we might deadlock when trying to submit the
1203 * second one! */
1204 inc_ap_bio(mdev, 3);
1205
1206 D_ASSERT(e_enr == s_enr + 1);
1207
1208 while (drbd_make_request_common(mdev, &bp->bio1, start_time))
1209 inc_ap_bio(mdev, 1);
1210
1211 while (drbd_make_request_common(mdev, &bp->bio2, start_time))
1212 inc_ap_bio(mdev, 1);
1213
1214 dec_ap_bio(mdev);
1215
1216 bio_pair_release(bp);
1217 }
1218} 1157}
1219 1158
1220/* This is called by bio_add_page(). With this function we reduce 1159/* This is called by bio_add_page().
1221 * the number of BIOs that span over multiple DRBD_MAX_BIO_SIZEs 1160 *
1222 * units (was AL_EXTENTs). 1161 * q->max_hw_sectors and other global limits are already enforced there.
1223 * 1162 *
1224 * we do the calculation within the lower 32bit of the byte offsets, 1163 * We need to call down to our lower level device,
1225 * since we don't care for actual offset, but only check whether it 1164 * in case it has special restrictions.
1226 * would cross "activity log extent" boundaries. 1165 *
1166 * We also may need to enforce configured max-bio-bvecs limits.
1227 * 1167 *
1228 * As long as the BIO is empty we have to allow at least one bvec, 1168 * As long as the BIO is empty we have to allow at least one bvec,
1229 * regardless of size and offset. so the resulting bio may still 1169 * regardless of size and offset, so no need to ask lower levels.
1230 * cross extent boundaries. those are dealt with (bio_split) in
1231 * drbd_make_request.
1232 */ 1170 */
1233int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec) 1171int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1234{ 1172{
1235 struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata; 1173 struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1236 unsigned int bio_offset =
1237 (unsigned int)bvm->bi_sector << 9; /* 32 bit */
1238 unsigned int bio_size = bvm->bi_size; 1174 unsigned int bio_size = bvm->bi_size;
1239 int limit, backing_limit; 1175 int limit = DRBD_MAX_BIO_SIZE;
1240 1176 int backing_limit;
1241 limit = DRBD_MAX_BIO_SIZE 1177
1242 - ((bio_offset & (DRBD_MAX_BIO_SIZE-1)) + bio_size); 1178 if (bio_size && get_ldev(mdev)) {
1243 if (limit < 0)
1244 limit = 0;
1245 if (bio_size == 0) {
1246 if (limit <= bvec->bv_len)
1247 limit = bvec->bv_len;
1248 } else if (limit && get_ldev(mdev)) {
1249 struct request_queue * const b = 1179 struct request_queue * const b =
1250 mdev->ldev->backing_bdev->bd_disk->queue; 1180 mdev->ldev->backing_bdev->bd_disk->queue;
1251 if (b->merge_bvec_fn) { 1181 if (b->merge_bvec_fn) {
@@ -1257,24 +1187,38 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
1257 return limit; 1187 return limit;
1258} 1188}
1259 1189
1190struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
1191{
1192 /* Walk the transfer log,
1193 * and find the oldest not yet completed request */
1194 struct drbd_request *r;
1195 list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
1196 if (atomic_read(&r->completion_ref))
1197 return r;
1198 }
1199 return NULL;
1200}
1201
1260void request_timer_fn(unsigned long data) 1202void request_timer_fn(unsigned long data)
1261{ 1203{
1262 struct drbd_conf *mdev = (struct drbd_conf *) data; 1204 struct drbd_conf *mdev = (struct drbd_conf *) data;
1205 struct drbd_tconn *tconn = mdev->tconn;
1263 struct drbd_request *req; /* oldest request */ 1206 struct drbd_request *req; /* oldest request */
1264 struct list_head *le; 1207 struct net_conf *nc;
1265 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ 1208 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1266 unsigned long now; 1209 unsigned long now;
1267 1210
1268 if (get_net_conf(mdev)) { 1211 rcu_read_lock();
1269 if (mdev->state.conn >= C_WF_REPORT_PARAMS) 1212 nc = rcu_dereference(tconn->net_conf);
1270 ent = mdev->net_conf->timeout*HZ/10 1213 if (nc && mdev->state.conn >= C_WF_REPORT_PARAMS)
1271 * mdev->net_conf->ko_count; 1214 ent = nc->timeout * HZ/10 * nc->ko_count;
1272 put_net_conf(mdev); 1215
1273 }
1274 if (get_ldev(mdev)) { /* implicit state.disk >= D_INCONSISTENT */ 1216 if (get_ldev(mdev)) { /* implicit state.disk >= D_INCONSISTENT */
1275 dt = mdev->ldev->dc.disk_timeout * HZ / 10; 1217 dt = rcu_dereference(mdev->ldev->disk_conf)->disk_timeout * HZ / 10;
1276 put_ldev(mdev); 1218 put_ldev(mdev);
1277 } 1219 }
1220 rcu_read_unlock();
1221
1278 et = min_not_zero(dt, ent); 1222 et = min_not_zero(dt, ent);
1279 1223
1280 if (!et) 1224 if (!et)
@@ -1282,17 +1226,14 @@ void request_timer_fn(unsigned long data)
1282 1226
1283 now = jiffies; 1227 now = jiffies;
1284 1228
1285 spin_lock_irq(&mdev->req_lock); 1229 spin_lock_irq(&tconn->req_lock);
1286 le = &mdev->oldest_tle->requests; 1230 req = find_oldest_request(tconn);
1287 if (list_empty(le)) { 1231 if (!req) {
1288 spin_unlock_irq(&mdev->req_lock); 1232 spin_unlock_irq(&tconn->req_lock);
1289 mod_timer(&mdev->request_timer, now + et); 1233 mod_timer(&mdev->request_timer, now + et);
1290 return; 1234 return;
1291 } 1235 }
1292 1236
1293 le = le->prev;
1294 req = list_entry(le, struct drbd_request, tl_requests);
1295
1296 /* The request is considered timed out, if 1237 /* The request is considered timed out, if
1297 * - we have some effective timeout from the configuration, 1238 * - we have some effective timeout from the configuration,
1298 * with above state restrictions applied, 1239 * with above state restrictions applied,
@@ -1311,17 +1252,17 @@ void request_timer_fn(unsigned long data)
1311 */ 1252 */
1312 if (ent && req->rq_state & RQ_NET_PENDING && 1253 if (ent && req->rq_state & RQ_NET_PENDING &&
1313 time_after(now, req->start_time + ent) && 1254 time_after(now, req->start_time + ent) &&
1314 !time_in_range(now, mdev->last_reconnect_jif, mdev->last_reconnect_jif + ent)) { 1255 !time_in_range(now, tconn->last_reconnect_jif, tconn->last_reconnect_jif + ent)) {
1315 dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n"); 1256 dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");
1316 _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL); 1257 _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
1317 } 1258 }
1318 if (dt && req->rq_state & RQ_LOCAL_PENDING && 1259 if (dt && req->rq_state & RQ_LOCAL_PENDING && req->w.mdev == mdev &&
1319 time_after(now, req->start_time + dt) && 1260 time_after(now, req->start_time + dt) &&
1320 !time_in_range(now, mdev->last_reattach_jif, mdev->last_reattach_jif + dt)) { 1261 !time_in_range(now, mdev->last_reattach_jif, mdev->last_reattach_jif + dt)) {
1321 dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n"); 1262 dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
1322 __drbd_chk_io_error(mdev, DRBD_FORCE_DETACH); 1263 __drbd_chk_io_error(mdev, DRBD_FORCE_DETACH);
1323 } 1264 }
1324 nt = (time_after(now, req->start_time + et) ? now : req->start_time) + et; 1265 nt = (time_after(now, req->start_time + et) ? now : req->start_time) + et;
1325 spin_unlock_irq(&mdev->req_lock); 1266 spin_unlock_irq(&tconn->req_lock);
1326 mod_timer(&mdev->request_timer, nt); 1267 mod_timer(&mdev->request_timer, nt);
1327} 1268}