aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2011-11-28 09:04:49 -0500
committerPhilipp Reisner <philipp.reisner@linbit.com>2012-11-08 10:58:35 -0500
commitb6dd1a89767bc33e9c98b3195f8925b46c5c95f3 (patch)
treee82371062171f5cade79cb0c4a6cd22486b5f082 /drivers/block
parentd5b27b01f17ef1f0badc45f9eea521be3457c9cb (diff)
drbd: remove struct drbd_tl_epoch objects (barrier works)
cherry-picked and adapted from drbd 9 devel branch DRBD requests (struct drbd_request) are already on the per resource transfer log list, and carry their epoch number. We do not need to additionally link them on other ring lists in other structs. The drbd sender thread can recognize itself when to send a P_BARRIER, by tracking the currently processed epoch, and how many writes have been processed for that epoch. If the epoch of the request to be processed does not match the currently processed epoch, any writes have been processed in it, a P_BARRIER for this last processed epoch is send out first. The new epoch then becomes the currently processed epoch. To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK, the sender thread also needs to handle the case when the current epoch was closed already, but no new requests are queued yet, and send out P_BARRIER as soon as possible. This is done by comparing the per resource "current transfer log epoch" (tconn->current_tle_nr) with the per connection "currently processed epoch number" (tconn->send.current_epoch_nr), while waiting for new requests to be processed in wait_for_work(). Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/drbd/drbd_int.h45
-rw-r--r--drivers/block/drbd/drbd_main.c316
-rw-r--r--drivers/block/drbd/drbd_nl.c8
-rw-r--r--drivers/block/drbd/drbd_receiver.c1
-rw-r--r--drivers/block/drbd/drbd_req.c157
-rw-r--r--drivers/block/drbd/drbd_worker.c194
6 files changed, 291 insertions, 430 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index c0d0de54ae57..309c121557ae 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -562,12 +562,16 @@ struct drbd_request {
562 struct bio *private_bio; 562 struct bio *private_bio;
563 563
564 struct drbd_interval i; 564 struct drbd_interval i;
565 unsigned int epoch; /* barrier_nr */
566 565
567 /* barrier_nr: used to check on "completion" whether this req was in 566 /* epoch: used to check on "completion" whether this req was in
568 * the current epoch, and we therefore have to close it, 567 * the current epoch, and we therefore have to close it,
569 * starting a new epoch... 568 * causing a p_barrier packet to be send, starting a new epoch.
569 *
570 * This corresponds to "barrier" in struct p_barrier[_ack],
571 * and to "barrier_nr" in struct drbd_epoch (and various
572 * comments/function parameters/local variable names).
570 */ 573 */
574 unsigned int epoch;
571 575
572 struct list_head tl_requests; /* ring list in the transfer log */ 576 struct list_head tl_requests; /* ring list in the transfer log */
573 struct bio *master_bio; /* master bio pointer */ 577 struct bio *master_bio; /* master bio pointer */
@@ -575,14 +579,6 @@ struct drbd_request {
575 unsigned long start_time; 579 unsigned long start_time;
576}; 580};
577 581
578struct drbd_tl_epoch {
579 struct drbd_work w;
580 struct list_head requests; /* requests before */
581 struct drbd_tl_epoch *next; /* pointer to the next barrier */
582 unsigned int br_number; /* the barriers identifier. */
583 int n_writes; /* number of requests attached before this barrier */
584};
585
586struct drbd_epoch { 582struct drbd_epoch {
587 struct drbd_tconn *tconn; 583 struct drbd_tconn *tconn;
588 struct list_head list; 584 struct list_head list;
@@ -845,11 +841,8 @@ struct drbd_tconn { /* is a resource from the config file */
845 unsigned int ko_count; 841 unsigned int ko_count;
846 842
847 spinlock_t req_lock; 843 spinlock_t req_lock;
848 struct drbd_tl_epoch *unused_spare_tle; /* for pre-allocation */ 844
849 struct drbd_tl_epoch *newest_tle; 845 struct list_head transfer_log; /* all requests not yet fully processed */
850 struct drbd_tl_epoch *oldest_tle;
851 struct list_head out_of_sequence_requests;
852 struct list_head barrier_acked_requests;
853 846
854 struct crypto_hash *cram_hmac_tfm; 847 struct crypto_hash *cram_hmac_tfm;
855 struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */ 848 struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */
@@ -859,18 +852,36 @@ struct drbd_tconn { /* is a resource from the config file */
859 void *int_dig_in; 852 void *int_dig_in;
860 void *int_dig_vv; 853 void *int_dig_vv;
861 854
855 /* receiver side */
862 struct drbd_epoch *current_epoch; 856 struct drbd_epoch *current_epoch;
863 spinlock_t epoch_lock; 857 spinlock_t epoch_lock;
864 unsigned int epochs; 858 unsigned int epochs;
865 enum write_ordering_e write_ordering; 859 enum write_ordering_e write_ordering;
866 atomic_t current_tle_nr; /* transfer log epoch number */ 860 atomic_t current_tle_nr; /* transfer log epoch number */
861 unsigned current_tle_writes; /* writes seen within this tl epoch */
867 862
868 unsigned long last_reconnect_jif; 863 unsigned long last_reconnect_jif;
869 struct drbd_thread receiver; 864 struct drbd_thread receiver;
870 struct drbd_thread worker; 865 struct drbd_thread worker;
871 struct drbd_thread asender; 866 struct drbd_thread asender;
872 cpumask_var_t cpu_mask; 867 cpumask_var_t cpu_mask;
868
869 /* sender side */
873 struct drbd_work_queue sender_work; 870 struct drbd_work_queue sender_work;
871
872 struct {
873 /* whether this sender thread
874 * has processed a single write yet. */
875 bool seen_any_write_yet;
876
877 /* Which barrier number to send with the next P_BARRIER */
878 int current_epoch_nr;
879
880 /* how many write requests have been sent
881 * with req->epoch == current_epoch_nr.
882 * If none, no P_BARRIER will be sent. */
883 unsigned current_epoch_writes;
884 } send;
874}; 885};
875 886
876struct drbd_conf { 887struct drbd_conf {
@@ -1054,7 +1065,6 @@ extern void drbd_calc_cpu_mask(struct drbd_tconn *tconn);
1054extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr, 1065extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr,
1055 unsigned int set_size); 1066 unsigned int set_size);
1056extern void tl_clear(struct drbd_tconn *); 1067extern void tl_clear(struct drbd_tconn *);
1057extern void _tl_add_barrier(struct drbd_tconn *, struct drbd_tl_epoch *);
1058extern void drbd_free_sock(struct drbd_tconn *tconn); 1068extern void drbd_free_sock(struct drbd_tconn *tconn);
1059extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock, 1069extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1060 void *buf, size_t size, unsigned msg_flags); 1070 void *buf, size_t size, unsigned msg_flags);
@@ -1460,7 +1470,6 @@ extern int w_resync_timer(struct drbd_work *, int);
1460extern int w_send_write_hint(struct drbd_work *, int); 1470extern int w_send_write_hint(struct drbd_work *, int);
1461extern int w_make_resync_request(struct drbd_work *, int); 1471extern int w_make_resync_request(struct drbd_work *, int);
1462extern int w_send_dblock(struct drbd_work *, int); 1472extern int w_send_dblock(struct drbd_work *, int);
1463extern int w_send_barrier(struct drbd_work *, int);
1464extern int w_send_read_req(struct drbd_work *, int); 1473extern int w_send_read_req(struct drbd_work *, int);
1465extern int w_prev_work_done(struct drbd_work *, int); 1474extern int w_prev_work_done(struct drbd_work *, int);
1466extern int w_e_reissue(struct drbd_work *, int); 1475extern int w_e_reissue(struct drbd_work *, int);
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 7e37149684e4..8c6c48e363cd 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -188,147 +188,75 @@ int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
188#endif 188#endif
189 189
190/** 190/**
191 * DOC: The transfer log 191 * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
192 * 192 * @tconn: DRBD connection.
193 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194 * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195 * of the list. There is always at least one &struct drbd_tl_epoch object.
196 *
197 * Each &struct drbd_tl_epoch has a circular double linked list of requests
198 * attached.
199 */
200static int tl_init(struct drbd_tconn *tconn)
201{
202 struct drbd_tl_epoch *b;
203
204 /* during device minor initialization, we may well use GFP_KERNEL */
205 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206 if (!b)
207 return 0;
208 INIT_LIST_HEAD(&b->requests);
209 INIT_LIST_HEAD(&b->w.list);
210 b->next = NULL;
211 b->br_number = atomic_inc_return(&tconn->current_tle_nr);
212 b->n_writes = 0;
213 b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215 tconn->oldest_tle = b;
216 tconn->newest_tle = b;
217 INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218 INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220 return 1;
221}
222
223static void tl_cleanup(struct drbd_tconn *tconn)
224{
225 if (tconn->oldest_tle != tconn->newest_tle)
226 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227 if (!list_empty(&tconn->out_of_sequence_requests))
228 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229 kfree(tconn->oldest_tle);
230 tconn->oldest_tle = NULL;
231 kfree(tconn->unused_spare_tle);
232 tconn->unused_spare_tle = NULL;
233}
234
235/**
236 * _tl_add_barrier() - Adds a barrier to the transfer log
237 * @mdev: DRBD device.
238 * @new: Barrier to be added before the current head of the TL.
239 *
240 * The caller must hold the req_lock.
241 */
242void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243{
244 INIT_LIST_HEAD(&new->requests);
245 INIT_LIST_HEAD(&new->w.list);
246 new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
247 new->next = NULL;
248 new->n_writes = 0;
249
250 new->br_number = atomic_inc_return(&tconn->current_tle_nr);
251 if (tconn->newest_tle != new) {
252 tconn->newest_tle->next = new;
253 tconn->newest_tle = new;
254 }
255}
256
257/**
258 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
259 * @mdev: DRBD device.
260 * @barrier_nr: Expected identifier of the DRBD write barrier packet. 193 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
261 * @set_size: Expected number of requests before that barrier. 194 * @set_size: Expected number of requests before that barrier.
262 * 195 *
263 * In case the passed barrier_nr or set_size does not match the oldest 196 * In case the passed barrier_nr or set_size does not match the oldest
264 * &struct drbd_tl_epoch objects this function will cause a termination 197 * epoch of not yet barrier-acked requests, this function will cause a
265 * of the connection. 198 * termination of the connection.
266 */ 199 */
267void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr, 200void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
268 unsigned int set_size) 201 unsigned int set_size)
269{ 202{
270 struct drbd_conf *mdev;
271 struct drbd_tl_epoch *b, *nob; /* next old barrier */
272 struct list_head *le, *tle;
273 struct drbd_request *r; 203 struct drbd_request *r;
204 struct drbd_request *req = NULL;
205 int expect_epoch = 0;
206 int expect_size = 0;
274 207
275 spin_lock_irq(&tconn->req_lock); 208 spin_lock_irq(&tconn->req_lock);
276 209
277 b = tconn->oldest_tle; 210 /* find latest not yet barrier-acked write request,
211 * count writes in its epoch. */
212 list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
213 const unsigned long s = r->rq_state;
214 if (!req) {
215 if (!(s & RQ_WRITE))
216 continue;
217 if (!(s & RQ_NET_MASK))
218 continue;
219 if (s & RQ_NET_DONE)
220 continue;
221 req = r;
222 expect_epoch = req->epoch;
223 expect_size ++;
224 } else {
225 if (r->epoch != expect_epoch)
226 break;
227 if (!(s & RQ_WRITE))
228 continue;
229 /* if (s & RQ_DONE): not expected */
230 /* if (!(s & RQ_NET_MASK)): not expected */
231 expect_size++;
232 }
233 }
278 234
279 /* first some paranoia code */ 235 /* first some paranoia code */
280 if (b == NULL) { 236 if (req == NULL) {
281 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n", 237 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
282 barrier_nr); 238 barrier_nr);
283 goto bail; 239 goto bail;
284 } 240 }
285 if (b->br_number != barrier_nr) { 241 if (expect_epoch != barrier_nr) {
286 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n", 242 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
287 barrier_nr, b->br_number); 243 barrier_nr, expect_epoch);
288 goto bail; 244 goto bail;
289 } 245 }
290 if (b->n_writes != set_size) { 246
247 if (expect_size != set_size) {
291 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n", 248 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
292 barrier_nr, set_size, b->n_writes); 249 barrier_nr, set_size, expect_size);
293 goto bail; 250 goto bail;
294 } 251 }
295 252
296 /* Clean up list of requests processed during current epoch */ 253 /* Clean up list of requests processed during current epoch */
297 list_for_each_safe(le, tle, &b->requests) { 254 list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
298 r = list_entry(le, struct drbd_request, tl_requests); 255 if (req->epoch != expect_epoch)
299 _req_mod(r, BARRIER_ACKED); 256 break;
300 } 257 _req_mod(req, BARRIER_ACKED);
301 /* There could be requests on the list waiting for completion
302 of the write to the local disk. To avoid corruptions of
303 slab's data structures we have to remove the lists head.
304
305 Also there could have been a barrier ack out of sequence, overtaking
306 the write acks - which would be a bug and violating write ordering.
307 To not deadlock in case we lose connection while such requests are
308 still pending, we need some way to find them for the
309 _req_mode(CONNECTION_LOST_WHILE_PENDING).
310
311 These have been list_move'd to the out_of_sequence_requests list in
312 _req_mod(, BARRIER_ACKED) above.
313 */
314 list_splice_init(&b->requests, &tconn->barrier_acked_requests);
315 mdev = b->w.mdev;
316
317 nob = b->next;
318 if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
319 _tl_add_barrier(tconn, b);
320 if (nob)
321 tconn->oldest_tle = nob;
322 /* if nob == NULL b was the only barrier, and becomes the new
323 barrier. Therefore tconn->oldest_tle points already to b */
324 } else {
325 D_ASSERT(nob != NULL);
326 tconn->oldest_tle = nob;
327 kfree(b);
328 } 258 }
329
330 spin_unlock_irq(&tconn->req_lock); 259 spin_unlock_irq(&tconn->req_lock);
331 dec_ap_pending(mdev);
332 260
333 return; 261 return;
334 262
@@ -346,91 +274,20 @@ bail:
346 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO, 274 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
347 * RESTART_FROZEN_DISK_IO. 275 * RESTART_FROZEN_DISK_IO.
348 */ 276 */
277/* must hold resource->req_lock */
349void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) 278void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
350{ 279{
351 struct drbd_tl_epoch *b, *tmp, **pn; 280 struct drbd_request *req, *r;
352 struct list_head *le, *tle, carry_reads; 281
353 struct drbd_request *req; 282 list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests)
354 int rv, n_writes, n_reads; 283 _req_mod(req, what);
355 284}
356 b = tconn->oldest_tle; 285
357 pn = &tconn->oldest_tle; 286void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
358 while (b) { 287{
359 n_writes = 0; 288 spin_lock_irq(&tconn->req_lock);
360 n_reads = 0; 289 _tl_restart(tconn, what);
361 INIT_LIST_HEAD(&carry_reads); 290 spin_unlock_irq(&tconn->req_lock);
362 list_for_each_safe(le, tle, &b->requests) {
363 req = list_entry(le, struct drbd_request, tl_requests);
364 rv = _req_mod(req, what);
365
366 if (rv & MR_WRITE)
367 n_writes++;
368 if (rv & MR_READ)
369 n_reads++;
370 }
371 tmp = b->next;
372
373 if (n_writes) {
374 if (what == RESEND) {
375 b->n_writes = n_writes;
376 if (b->w.cb == NULL) {
377 b->w.cb = w_send_barrier;
378 inc_ap_pending(b->w.mdev);
379 set_bit(CREATE_BARRIER, &tconn->flags);
380 }
381
382 drbd_queue_work(&tconn->sender_work, &b->w);
383 }
384 pn = &b->next;
385 } else {
386 if (n_reads)
387 list_add(&carry_reads, &b->requests);
388 /* there could still be requests on that ring list,
389 * in case local io is still pending */
390 list_del(&b->requests);
391
392 /* dec_ap_pending corresponding to queue_barrier.
393 * the newest barrier may not have been queued yet,
394 * in which case w.cb is still NULL. */
395 if (b->w.cb != NULL)
396 dec_ap_pending(b->w.mdev);
397
398 if (b == tconn->newest_tle) {
399 /* recycle, but reinit! */
400 if (tmp != NULL)
401 conn_err(tconn, "ASSERT FAILED tmp == NULL");
402 INIT_LIST_HEAD(&b->requests);
403 list_splice(&carry_reads, &b->requests);
404 INIT_LIST_HEAD(&b->w.list);
405 b->w.cb = NULL;
406 b->br_number = atomic_inc_return(&tconn->current_tle_nr);
407 b->n_writes = 0;
408
409 *pn = b;
410 break;
411 }
412 *pn = tmp;
413 kfree(b);
414 }
415 b = tmp;
416 list_splice(&carry_reads, &b->requests);
417 }
418
419 /* Actions operating on the disk state, also want to work on
420 requests that got barrier acked. */
421 switch (what) {
422 case FAIL_FROZEN_DISK_IO:
423 case RESTART_FROZEN_DISK_IO:
424 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
425 req = list_entry(le, struct drbd_request, tl_requests);
426 _req_mod(req, what);
427 }
428 case CONNECTION_LOST_WHILE_PENDING:
429 case RESEND:
430 break;
431 default:
432 conn_err(tconn, "what = %d in _tl_restart()\n", what);
433 }
434} 291}
435 292
436/** 293/**
@@ -443,36 +300,7 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
443 */ 300 */
444void tl_clear(struct drbd_tconn *tconn) 301void tl_clear(struct drbd_tconn *tconn)
445{ 302{
446 struct list_head *le, *tle; 303 tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
447 struct drbd_request *r;
448
449 spin_lock_irq(&tconn->req_lock);
450
451 _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
452
453 /* we expect this list to be empty. */
454 if (!list_empty(&tconn->out_of_sequence_requests))
455 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
456
457 /* but just in case, clean it up anyways! */
458 list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
459 r = list_entry(le, struct drbd_request, tl_requests);
460 /* It would be nice to complete outside of spinlock.
461 * But this is easier for now. */
462 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
463 }
464
465 /* ensure bit indicating barrier is required is clear */
466 clear_bit(CREATE_BARRIER, &tconn->flags);
467
468 spin_unlock_irq(&tconn->req_lock);
469}
470
471void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
472{
473 spin_lock_irq(&tconn->req_lock);
474 _tl_restart(tconn, what);
475 spin_unlock_irq(&tconn->req_lock);
476} 304}
477 305
478/** 306/**
@@ -482,31 +310,16 @@ void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
482void tl_abort_disk_io(struct drbd_conf *mdev) 310void tl_abort_disk_io(struct drbd_conf *mdev)
483{ 311{
484 struct drbd_tconn *tconn = mdev->tconn; 312 struct drbd_tconn *tconn = mdev->tconn;
485 struct drbd_tl_epoch *b; 313 struct drbd_request *req, *r;
486 struct list_head *le, *tle;
487 struct drbd_request *req;
488 314
489 spin_lock_irq(&tconn->req_lock); 315 spin_lock_irq(&tconn->req_lock);
490 b = tconn->oldest_tle; 316 list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
491 while (b) {
492 list_for_each_safe(le, tle, &b->requests) {
493 req = list_entry(le, struct drbd_request, tl_requests);
494 if (!(req->rq_state & RQ_LOCAL_PENDING))
495 continue;
496 if (req->w.mdev == mdev)
497 _req_mod(req, ABORT_DISK_IO);
498 }
499 b = b->next;
500 }
501
502 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
503 req = list_entry(le, struct drbd_request, tl_requests);
504 if (!(req->rq_state & RQ_LOCAL_PENDING)) 317 if (!(req->rq_state & RQ_LOCAL_PENDING))
505 continue; 318 continue;
506 if (req->w.mdev == mdev) 319 if (req->w.mdev != mdev)
507 _req_mod(req, ABORT_DISK_IO); 320 continue;
321 _req_mod(req, ABORT_DISK_IO);
508 } 322 }
509
510 spin_unlock_irq(&tconn->req_lock); 323 spin_unlock_irq(&tconn->req_lock);
511} 324}
512 325
@@ -2680,17 +2493,21 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2680 if (set_resource_options(tconn, res_opts)) 2493 if (set_resource_options(tconn, res_opts))
2681 goto fail; 2494 goto fail;
2682 2495
2683 if (!tl_init(tconn))
2684 goto fail;
2685
2686 tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL); 2496 tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2687 if (!tconn->current_epoch) 2497 if (!tconn->current_epoch)
2688 goto fail; 2498 goto fail;
2499
2500 INIT_LIST_HEAD(&tconn->transfer_log);
2501
2689 INIT_LIST_HEAD(&tconn->current_epoch->list); 2502 INIT_LIST_HEAD(&tconn->current_epoch->list);
2690 tconn->epochs = 1; 2503 tconn->epochs = 1;
2691 spin_lock_init(&tconn->epoch_lock); 2504 spin_lock_init(&tconn->epoch_lock);
2692 tconn->write_ordering = WO_bdev_flush; 2505 tconn->write_ordering = WO_bdev_flush;
2693 2506
2507 tconn->send.seen_any_write_yet = false;
2508 tconn->send.current_epoch_nr = 0;
2509 tconn->send.current_epoch_writes = 0;
2510
2694 tconn->cstate = C_STANDALONE; 2511 tconn->cstate = C_STANDALONE;
2695 mutex_init(&tconn->cstate_mutex); 2512 mutex_init(&tconn->cstate_mutex);
2696 spin_lock_init(&tconn->req_lock); 2513 spin_lock_init(&tconn->req_lock);
@@ -2713,7 +2530,6 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2713 2530
2714fail: 2531fail:
2715 kfree(tconn->current_epoch); 2532 kfree(tconn->current_epoch);
2716 tl_cleanup(tconn);
2717 free_cpumask_var(tconn->cpu_mask); 2533 free_cpumask_var(tconn->cpu_mask);
2718 drbd_free_socket(&tconn->meta); 2534 drbd_free_socket(&tconn->meta);
2719 drbd_free_socket(&tconn->data); 2535 drbd_free_socket(&tconn->data);
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index c5d4fac1a111..bbc5c2f4a9b4 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -622,6 +622,8 @@ drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
622 /* Wait until nothing is on the fly :) */ 622 /* Wait until nothing is on the fly :) */
623 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0); 623 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0);
624 624
625 /* FIXME also wait for all pending P_BARRIER_ACK? */
626
625 if (new_role == R_SECONDARY) { 627 if (new_role == R_SECONDARY) {
626 set_disk_ro(mdev->vdisk, true); 628 set_disk_ro(mdev->vdisk, true);
627 if (get_ldev(mdev)) { 629 if (get_ldev(mdev)) {
@@ -1436,6 +1438,12 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
1436 1438
1437 drbd_suspend_io(mdev); 1439 drbd_suspend_io(mdev);
1438 /* also wait for the last barrier ack. */ 1440 /* also wait for the last barrier ack. */
1441 /* FIXME see also https://daiquiri.linbit/cgi-bin/bugzilla/show_bug.cgi?id=171
1442 * We need a way to either ignore barrier acks for barriers sent before a device
1443 * was attached, or a way to wait for all pending barrier acks to come in.
1444 * As barriers are counted per resource,
1445 * we'd need to suspend io on all devices of a resource.
1446 */
1439 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev)); 1447 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev));
1440 /* and for any other previously queued work */ 1448 /* and for any other previously queued work */
1441 drbd_flush_workqueue(mdev); 1449 drbd_flush_workqueue(mdev);
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 34fc33b5eb45..7fe6b01618d4 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -4451,6 +4451,7 @@ static void conn_disconnect(struct drbd_tconn *tconn)
4451 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n"); 4451 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4452 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4452 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4453 atomic_set(&tconn->current_epoch->epoch_size, 0); 4453 atomic_set(&tconn->current_epoch->epoch_size, 0);
4454 tconn->send.seen_any_write_yet = false;
4454 4455
4455 conn_info(tconn, "Connection closed\n"); 4456 conn_info(tconn, "Connection closed\n");
4456 4457
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index e609557a9425..ca28b56b7a2f 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -149,46 +149,16 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
149 drbd_req_free(req); 149 drbd_req_free(req);
150} 150}
151 151
152static void queue_barrier(struct drbd_conf *mdev) 152static void wake_all_senders(struct drbd_tconn *tconn) {
153{ 153 wake_up(&tconn->sender_work.q_wait);
154 struct drbd_tl_epoch *b;
155 struct drbd_tconn *tconn = mdev->tconn;
156
157 /* We are within the req_lock. Once we queued the barrier for sending,
158 * we set the CREATE_BARRIER bit. It is cleared as soon as a new
159 * barrier/epoch object is added. This is the only place this bit is
160 * set. It indicates that the barrier for this epoch is already queued,
161 * and no new epoch has been created yet. */
162 if (test_bit(CREATE_BARRIER, &tconn->flags))
163 return;
164
165 b = tconn->newest_tle;
166 b->w.cb = w_send_barrier;
167 b->w.mdev = mdev;
168 /* inc_ap_pending done here, so we won't
169 * get imbalanced on connection loss.
170 * dec_ap_pending will be done in got_BarrierAck
171 * or (on connection loss) in tl_clear. */
172 inc_ap_pending(mdev);
173 drbd_queue_work(&tconn->sender_work, &b->w);
174 set_bit(CREATE_BARRIER, &tconn->flags);
175} 154}
176 155
177static void _about_to_complete_local_write(struct drbd_conf *mdev, 156/* must hold resource->req_lock */
178 struct drbd_request *req) 157static void start_new_tl_epoch(struct drbd_tconn *tconn)
179{ 158{
180 const unsigned long s = req->rq_state; 159 tconn->current_tle_writes = 0;
181 160 atomic_inc(&tconn->current_tle_nr);
182 /* Before we can signal completion to the upper layers, 161 wake_all_senders(tconn);
183 * we may need to close the current epoch.
184 * We can skip this, if this request has not even been sent, because we
185 * did not have a fully established connection yet/anymore, during
186 * bitmap exchange, or while we are C_AHEAD due to congestion policy.
187 */
188 if (mdev->state.conn >= C_CONNECTED &&
189 (s & RQ_NET_SENT) != 0 &&
190 req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
191 queue_barrier(mdev);
192} 162}
193 163
194void complete_master_bio(struct drbd_conf *mdev, 164void complete_master_bio(struct drbd_conf *mdev,
@@ -320,9 +290,16 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m)
320 } else if (!(s & RQ_POSTPONED)) 290 } else if (!(s & RQ_POSTPONED))
321 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0); 291 D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
322 292
323 /* for writes we need to do some extra housekeeping */ 293 /* Before we can signal completion to the upper layers,
324 if (rw == WRITE) 294 * we may need to close the current transfer log epoch.
325 _about_to_complete_local_write(mdev, req); 295 * We are within the request lock, so we can simply compare
296 * the request epoch number with the current transfer log
297 * epoch number. If they match, increase the current_tle_nr,
298 * and reset the transfer log epoch write_cnt.
299 */
300 if (rw == WRITE &&
301 req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
302 start_new_tl_epoch(mdev->tconn);
326 303
327 /* Update disk stats */ 304 /* Update disk stats */
328 _drbd_end_io_acct(mdev, req); 305 _drbd_end_io_acct(mdev, req);
@@ -514,15 +491,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
514 * hurting performance. */ 491 * hurting performance. */
515 set_bit(UNPLUG_REMOTE, &mdev->flags); 492 set_bit(UNPLUG_REMOTE, &mdev->flags);
516 493
517 /* see __drbd_make_request,
518 * just after it grabs the req_lock */
519 D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
520
521 req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
522
523 /* increment size of current epoch */
524 mdev->tconn->newest_tle->n_writes++;
525
526 /* queue work item to send data */ 494 /* queue work item to send data */
527 D_ASSERT(req->rq_state & RQ_NET_PENDING); 495 D_ASSERT(req->rq_state & RQ_NET_PENDING);
528 req->rq_state |= RQ_NET_QUEUED; 496 req->rq_state |= RQ_NET_QUEUED;
@@ -534,8 +502,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
534 nc = rcu_dereference(mdev->tconn->net_conf); 502 nc = rcu_dereference(mdev->tconn->net_conf);
535 p = nc->max_epoch_size; 503 p = nc->max_epoch_size;
536 rcu_read_unlock(); 504 rcu_read_unlock();
537 if (mdev->tconn->newest_tle->n_writes >= p) 505 if (mdev->tconn->current_tle_writes >= p)
538 queue_barrier(mdev); 506 start_new_tl_epoch(mdev->tconn);
539 507
540 break; 508 break;
541 509
@@ -692,6 +660,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
692 During connection handshake, we ensure that the peer was not rebooted. */ 660 During connection handshake, we ensure that the peer was not rebooted. */
693 if (!(req->rq_state & RQ_NET_OK)) { 661 if (!(req->rq_state & RQ_NET_OK)) {
694 if (req->w.cb) { 662 if (req->w.cb) {
663 /* w.cb expected to be w_send_dblock, or w_send_read_req */
695 drbd_queue_work(&mdev->tconn->sender_work, &req->w); 664 drbd_queue_work(&mdev->tconn->sender_work, &req->w);
696 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; 665 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
697 } 666 }
@@ -708,7 +677,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
708 * this is bad, because if the connection is lost now, 677 * this is bad, because if the connection is lost now,
709 * we won't be able to clean them up... */ 678 * we won't be able to clean them up... */
710 dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n"); 679 dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
711 list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
712 } 680 }
713 if ((req->rq_state & RQ_NET_MASK) != 0) { 681 if ((req->rq_state & RQ_NET_MASK) != 0) {
714 req->rq_state |= RQ_NET_DONE; 682 req->rq_state |= RQ_NET_DONE;
@@ -835,7 +803,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
835 const int rw = bio_rw(bio); 803 const int rw = bio_rw(bio);
836 const int size = bio->bi_size; 804 const int size = bio->bi_size;
837 const sector_t sector = bio->bi_sector; 805 const sector_t sector = bio->bi_sector;
838 struct drbd_tl_epoch *b = NULL;
839 struct drbd_request *req; 806 struct drbd_request *req;
840 struct net_conf *nc; 807 struct net_conf *nc;
841 int local, remote, send_oos = 0; 808 int local, remote, send_oos = 0;
@@ -916,24 +883,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s
916 goto fail_free_complete; 883 goto fail_free_complete;
917 } 884 }
918 885
919 /* For WRITE request, we have to make sure that we have an
920 * unused_spare_tle, in case we need to start a new epoch.
921 * I try to be smart and avoid to pre-allocate always "just in case",
922 * but there is a race between testing the bit and pointer outside the
923 * spinlock, and grabbing the spinlock.
924 * if we lost that race, we retry. */
925 if (rw == WRITE && (remote || send_oos) &&
926 mdev->tconn->unused_spare_tle == NULL &&
927 test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
928allocate_barrier:
929 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
930 if (!b) {
931 dev_err(DEV, "Failed to alloc barrier.\n");
932 err = -ENOMEM;
933 goto fail_free_complete;
934 }
935 }
936
937 /* GOOD, everything prepared, grab the spin_lock */ 886 /* GOOD, everything prepared, grab the spin_lock */
938 spin_lock_irq(&mdev->tconn->req_lock); 887 spin_lock_irq(&mdev->tconn->req_lock);
939 888
@@ -969,42 +918,9 @@ allocate_barrier:
969 } 918 }
970 } 919 }
971 920
972 if (b && mdev->tconn->unused_spare_tle == NULL) {
973 mdev->tconn->unused_spare_tle = b;
974 b = NULL;
975 }
976 if (rw == WRITE && (remote || send_oos) &&
977 mdev->tconn->unused_spare_tle == NULL &&
978 test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
979 /* someone closed the current epoch
980 * while we were grabbing the spinlock */
981 spin_unlock_irq(&mdev->tconn->req_lock);
982 goto allocate_barrier;
983 }
984
985
986 /* Update disk stats */ 921 /* Update disk stats */
987 _drbd_start_io_acct(mdev, req, bio); 922 _drbd_start_io_acct(mdev, req, bio);
988 923
989 /* _maybe_start_new_epoch(mdev);
990 * If we need to generate a write barrier packet, we have to add the
991 * new epoch (barrier) object, and queue the barrier packet for sending,
992 * and queue the req's data after it _within the same lock_, otherwise
993 * we have race conditions were the reorder domains could be mixed up.
994 *
995 * Even read requests may start a new epoch and queue the corresponding
996 * barrier packet. To get the write ordering right, we only have to
997 * make sure that, if this is a write request and it triggered a
998 * barrier packet, this request is queued within the same spinlock. */
999 if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
1000 test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
1001 _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
1002 mdev->tconn->unused_spare_tle = NULL;
1003 } else {
1004 D_ASSERT(!(remote && rw == WRITE &&
1005 test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
1006 }
1007
1008 /* NOTE 924 /* NOTE
1009 * Actually, 'local' may be wrong here already, since we may have failed 925 * Actually, 'local' may be wrong here already, since we may have failed
1010 * to write to the meta data, and may become wrong anytime because of 926 * to write to the meta data, and may become wrong anytime because of
@@ -1025,7 +941,12 @@ allocate_barrier:
1025 if (local) 941 if (local)
1026 _req_mod(req, TO_BE_SUBMITTED); 942 _req_mod(req, TO_BE_SUBMITTED);
1027 943
1028 list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests); 944 /* which transfer log epoch does this belong to? */
945 req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
946 if (rw == WRITE)
947 mdev->tconn->current_tle_writes++;
948
949 list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
1029 950
1030 /* NOTE remote first: to get the concurrent write detection right, 951 /* NOTE remote first: to get the concurrent write detection right,
1031 * we must register the request before start of local IO. */ 952 * we must register the request before start of local IO. */
@@ -1059,7 +980,9 @@ allocate_barrier:
1059 } 980 }
1060 981
1061 if (congested) { 982 if (congested) {
1062 queue_barrier(mdev); /* last barrier, after mirrored writes */ 983 if (mdev->tconn->current_tle_writes)
984 /* start a new epoch for non-mirrored writes */
985 start_new_tl_epoch(mdev->tconn);
1063 986
1064 if (nc->on_congestion == OC_PULL_AHEAD) 987 if (nc->on_congestion == OC_PULL_AHEAD)
1065 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL); 988 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
@@ -1070,7 +993,6 @@ allocate_barrier:
1070 rcu_read_unlock(); 993 rcu_read_unlock();
1071 994
1072 spin_unlock_irq(&mdev->tconn->req_lock); 995 spin_unlock_irq(&mdev->tconn->req_lock);
1073 kfree(b); /* if someone else has beaten us to it... */
1074 996
1075 if (local) { 997 if (local) {
1076 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 998 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
@@ -1108,7 +1030,6 @@ fail_and_free_req:
1108 1030
1109 drbd_req_free(req); 1031 drbd_req_free(req);
1110 dec_ap_bio(mdev); 1032 dec_ap_bio(mdev);
1111 kfree(b);
1112 1033
1113 return ret; 1034 return ret;
1114} 1035}
@@ -1164,12 +1085,23 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
1164 return limit; 1085 return limit;
1165} 1086}
1166 1087
1088struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
1089{
1090 /* Walk the transfer log,
1091 * and find the oldest not yet completed request */
1092 struct drbd_request *r;
1093 list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
1094 if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING))
1095 return r;
1096 }
1097 return NULL;
1098}
1099
1167void request_timer_fn(unsigned long data) 1100void request_timer_fn(unsigned long data)
1168{ 1101{
1169 struct drbd_conf *mdev = (struct drbd_conf *) data; 1102 struct drbd_conf *mdev = (struct drbd_conf *) data;
1170 struct drbd_tconn *tconn = mdev->tconn; 1103 struct drbd_tconn *tconn = mdev->tconn;
1171 struct drbd_request *req; /* oldest request */ 1104 struct drbd_request *req; /* oldest request */
1172 struct list_head *le;
1173 struct net_conf *nc; 1105 struct net_conf *nc;
1174 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ 1106 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1175 unsigned long now; 1107 unsigned long now;
@@ -1193,16 +1125,13 @@ void request_timer_fn(unsigned long data)
1193 now = jiffies; 1125 now = jiffies;
1194 1126
1195 spin_lock_irq(&tconn->req_lock); 1127 spin_lock_irq(&tconn->req_lock);
1196 le = &tconn->oldest_tle->requests; 1128 req = find_oldest_request(tconn);
1197 if (list_empty(le)) { 1129 if (!req) {
1198 spin_unlock_irq(&tconn->req_lock); 1130 spin_unlock_irq(&tconn->req_lock);
1199 mod_timer(&mdev->request_timer, now + et); 1131 mod_timer(&mdev->request_timer, now + et);
1200 return; 1132 return;
1201 } 1133 }
1202 1134
1203 le = le->prev;
1204 req = list_entry(le, struct drbd_request, tl_requests);
1205
1206 /* The request is considered timed out, if 1135 /* The request is considered timed out, if
1207 * - we have some effective timeout from the configuration, 1136 * - we have some effective timeout from the configuration,
1208 * with above state restrictions applied, 1137 * with above state restrictions applied,
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 39ece3a2f53a..66be3910e8d2 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel)
1210 return 0; 1210 return 0;
1211} 1211}
1212 1212
1213int w_send_barrier(struct drbd_work *w, int cancel) 1213/* FIXME
1214 * We need to track the number of pending barrier acks,
1215 * and to be able to wait for them.
1216 * See also comment in drbd_adm_attach before drbd_suspend_io.
1217 */
1218int drbd_send_barrier(struct drbd_tconn *tconn)
1214{ 1219{
1215 struct drbd_socket *sock;
1216 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1217 struct drbd_conf *mdev = w->mdev;
1218 struct p_barrier *p; 1220 struct p_barrier *p;
1221 struct drbd_socket *sock;
1219 1222
1220 /* really avoid racing with tl_clear. w.cb may have been referenced 1223 sock = &tconn->data;
1221 * just before it was reassigned and re-queued, so double check that. 1224 p = conn_prepare_command(tconn, sock);
1222 * actually, this race was harmless, since we only try to send the
1223 * barrier packet here, and otherwise do nothing with the object.
1224 * but compare with the head of w_clear_epoch */
1225 spin_lock_irq(&mdev->tconn->req_lock);
1226 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1227 cancel = 1;
1228 spin_unlock_irq(&mdev->tconn->req_lock);
1229 if (cancel)
1230 return 0;
1231
1232 sock = &mdev->tconn->data;
1233 p = drbd_prepare_command(mdev, sock);
1234 if (!p) 1225 if (!p)
1235 return -EIO; 1226 return -EIO;
1236 p->barrier = b->br_number; 1227 p->barrier = tconn->send.current_epoch_nr;
1237 /* inc_ap_pending was done where this was queued. 1228 p->pad = 0;
1238 * dec_ap_pending will be done in got_BarrierAck 1229 tconn->send.current_epoch_writes = 0;
1239 * or (on connection loss) in w_clear_epoch. */ 1230
1240 return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0); 1231 return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1241} 1232}
1242 1233
1243int w_send_write_hint(struct drbd_work *w, int cancel) 1234int w_send_write_hint(struct drbd_work *w, int cancel)
@@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
1257{ 1248{
1258 struct drbd_request *req = container_of(w, struct drbd_request, w); 1249 struct drbd_request *req = container_of(w, struct drbd_request, w);
1259 struct drbd_conf *mdev = w->mdev; 1250 struct drbd_conf *mdev = w->mdev;
1251 struct drbd_tconn *tconn = mdev->tconn;
1260 int err; 1252 int err;
1261 1253
1262 if (unlikely(cancel)) { 1254 if (unlikely(cancel)) {
@@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
1264 return 0; 1256 return 0;
1265 } 1257 }
1266 1258
1259 if (!tconn->send.seen_any_write_yet) {
1260 tconn->send.seen_any_write_yet = true;
1261 tconn->send.current_epoch_nr = req->epoch;
1262 }
1263 if (tconn->send.current_epoch_nr != req->epoch) {
1264 if (tconn->send.current_epoch_writes)
1265 drbd_send_barrier(tconn);
1266 tconn->send.current_epoch_nr = req->epoch;
1267 }
1268 /* this time, no tconn->send.current_epoch_writes++;
1269 * If it was sent, it was the closing barrier for the last
1270 * replicated epoch, before we went into AHEAD mode.
1271 * No more barriers will be sent, until we leave AHEAD mode again. */
1272
1267 err = drbd_send_out_of_sync(mdev, req); 1273 err = drbd_send_out_of_sync(mdev, req);
1268 req_mod(req, OOS_HANDED_TO_NETWORK); 1274 req_mod(req, OOS_HANDED_TO_NETWORK);
1269 1275
@@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel)
1280{ 1286{
1281 struct drbd_request *req = container_of(w, struct drbd_request, w); 1287 struct drbd_request *req = container_of(w, struct drbd_request, w);
1282 struct drbd_conf *mdev = w->mdev; 1288 struct drbd_conf *mdev = w->mdev;
1289 struct drbd_tconn *tconn = mdev->tconn;
1283 int err; 1290 int err;
1284 1291
1285 if (unlikely(cancel)) { 1292 if (unlikely(cancel)) {
@@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel)
1287 return 0; 1294 return 0;
1288 } 1295 }
1289 1296
1297 if (!tconn->send.seen_any_write_yet) {
1298 tconn->send.seen_any_write_yet = true;
1299 tconn->send.current_epoch_nr = req->epoch;
1300 }
1301 if (tconn->send.current_epoch_nr != req->epoch) {
1302 if (tconn->send.current_epoch_writes)
1303 drbd_send_barrier(tconn);
1304 tconn->send.current_epoch_nr = req->epoch;
1305 }
1306 tconn->send.current_epoch_writes++;
1307
1290 err = drbd_send_dblock(mdev, req); 1308 err = drbd_send_dblock(mdev, req);
1291 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1309 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1292 1310
@@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel)
1303{ 1321{
1304 struct drbd_request *req = container_of(w, struct drbd_request, w); 1322 struct drbd_request *req = container_of(w, struct drbd_request, w);
1305 struct drbd_conf *mdev = w->mdev; 1323 struct drbd_conf *mdev = w->mdev;
1324 struct drbd_tconn *tconn = mdev->tconn;
1306 int err; 1325 int err;
1307 1326
1308 if (unlikely(cancel)) { 1327 if (unlikely(cancel)) {
@@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel)
1310 return 0; 1329 return 0;
1311 } 1330 }
1312 1331
1332 /* Even read requests may close a write epoch,
1333 * if there was any yet. */
1334 if (tconn->send.seen_any_write_yet &&
1335 tconn->send.current_epoch_nr != req->epoch) {
1336 if (tconn->send.current_epoch_writes)
1337 drbd_send_barrier(tconn);
1338 tconn->send.current_epoch_nr = req->epoch;
1339 }
1340
1313 err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size, 1341 err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1314 (unsigned long)req); 1342 (unsigned long)req);
1315 1343
@@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1673 mutex_unlock(mdev->state_mutex); 1701 mutex_unlock(mdev->state_mutex);
1674} 1702}
1675 1703
1704/* If the resource already closed the current epoch, but we did not
1705 * (because we have not yet seen new requests), we should send the
1706 * corresponding barrier now. Must be checked within the same spinlock
1707 * that is used to check for new requests. */
1708bool need_to_send_barrier(struct drbd_tconn *connection)
1709{
1710 if (!connection->send.seen_any_write_yet)
1711 return false;
1712
1713 /* Skip barriers that do not contain any writes.
1714 * This may happen during AHEAD mode. */
1715 if (!connection->send.current_epoch_writes)
1716 return false;
1717
1718 /* ->req_lock is held when requests are queued on
1719 * connection->sender_work, and put into ->transfer_log.
1720 * It is also held when ->current_tle_nr is increased.
1721 * So either there are already new requests queued,
1722 * and corresponding barriers will be send there.
1723 * Or nothing new is queued yet, so the difference will be 1.
1724 */
1725 if (atomic_read(&connection->current_tle_nr) !=
1726 connection->send.current_epoch_nr + 1)
1727 return false;
1728
1729 return true;
1730}
1731
1676bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1732bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1677{ 1733{
1678 spin_lock_irq(&queue->q_lock); 1734 spin_lock_irq(&queue->q_lock);
@@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis
1690 return !list_empty(work_list); 1746 return !list_empty(work_list);
1691} 1747}
1692 1748
1749void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1750{
1751 DEFINE_WAIT(wait);
1752 struct net_conf *nc;
1753 int uncork, cork;
1754
1755 dequeue_work_item(&connection->sender_work, work_list);
1756 if (!list_empty(work_list))
1757 return;
1758
1759 /* Still nothing to do?
1760 * Maybe we still need to close the current epoch,
1761 * even if no new requests are queued yet.
1762 *
1763 * Also, poke TCP, just in case.
1764 * Then wait for new work (or signal). */
1765 rcu_read_lock();
1766 nc = rcu_dereference(connection->net_conf);
1767 uncork = nc ? nc->tcp_cork : 0;
1768 rcu_read_unlock();
1769 if (uncork) {
1770 mutex_lock(&connection->data.mutex);
1771 if (connection->data.socket)
1772 drbd_tcp_uncork(connection->data.socket);
1773 mutex_unlock(&connection->data.mutex);
1774 }
1775
1776 for (;;) {
1777 int send_barrier;
1778 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1779 spin_lock_irq(&connection->req_lock);
1780 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
1781 list_splice_init(&connection->sender_work.q, work_list);
1782 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
1783 if (!list_empty(work_list) || signal_pending(current)) {
1784 spin_unlock_irq(&connection->req_lock);
1785 break;
1786 }
1787 send_barrier = need_to_send_barrier(connection);
1788 spin_unlock_irq(&connection->req_lock);
1789 if (send_barrier) {
1790 drbd_send_barrier(connection);
1791 connection->send.current_epoch_nr++;
1792 }
1793 schedule();
1794 /* may be woken up for other things but new work, too,
1795 * e.g. if the current epoch got closed.
1796 * In which case we send the barrier above. */
1797 }
1798 finish_wait(&connection->sender_work.q_wait, &wait);
1799
1800 /* someone may have changed the config while we have been waiting above. */
1801 rcu_read_lock();
1802 nc = rcu_dereference(connection->net_conf);
1803 cork = nc ? nc->tcp_cork : 0;
1804 rcu_read_unlock();
1805 mutex_lock(&connection->data.mutex);
1806 if (connection->data.socket) {
1807 if (cork)
1808 drbd_tcp_cork(connection->data.socket);
1809 else if (!uncork)
1810 drbd_tcp_uncork(connection->data.socket);
1811 }
1812 mutex_unlock(&connection->data.mutex);
1813}
1814
1693int drbd_worker(struct drbd_thread *thi) 1815int drbd_worker(struct drbd_thread *thi)
1694{ 1816{
1695 struct drbd_tconn *tconn = thi->tconn; 1817 struct drbd_tconn *tconn = thi->tconn;
1696 struct drbd_work *w = NULL; 1818 struct drbd_work *w = NULL;
1697 struct drbd_conf *mdev; 1819 struct drbd_conf *mdev;
1698 struct net_conf *nc;
1699 LIST_HEAD(work_list); 1820 LIST_HEAD(work_list);
1700 int vnr; 1821 int vnr;
1701 int cork;
1702 1822
1703 while (get_t_state(thi) == RUNNING) { 1823 while (get_t_state(thi) == RUNNING) {
1704 drbd_thread_current_set_cpu(thi); 1824 drbd_thread_current_set_cpu(thi);
@@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi)
1706 /* as long as we use drbd_queue_work_front(), 1826 /* as long as we use drbd_queue_work_front(),
1707 * we may only dequeue single work items here, not batches. */ 1827 * we may only dequeue single work items here, not batches. */
1708 if (list_empty(&work_list)) 1828 if (list_empty(&work_list))
1709 dequeue_work_item(&tconn->sender_work, &work_list); 1829 wait_for_work(tconn, &work_list);
1710
1711 /* Still nothing to do? Poke TCP, just in case,
1712 * then wait for new work (or signal). */
1713 if (list_empty(&work_list)) {
1714 mutex_lock(&tconn->data.mutex);
1715 rcu_read_lock();
1716 nc = rcu_dereference(tconn->net_conf);
1717 cork = nc ? nc->tcp_cork : 0;
1718 rcu_read_unlock();
1719
1720 if (tconn->data.socket && cork)
1721 drbd_tcp_uncork(tconn->data.socket);
1722 mutex_unlock(&tconn->data.mutex);
1723
1724 wait_event_interruptible(tconn->sender_work.q_wait,
1725 dequeue_work_item(&tconn->sender_work, &work_list));
1726
1727 mutex_lock(&tconn->data.mutex);
1728 if (tconn->data.socket && cork)
1729 drbd_tcp_cork(tconn->data.socket);
1730 mutex_unlock(&tconn->data.mutex);
1731 }
1732 1830
1733 if (signal_pending(current)) { 1831 if (signal_pending(current)) {
1734 flush_signals(current); 1832 flush_signals(current);