aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/drbd/drbd_worker.c')
-rw-r--r--drivers/block/drbd/drbd_worker.c348
1 files changed, 276 insertions, 72 deletions
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index d8f57b6305cd..50776b362828 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -67,13 +67,10 @@ rwlock_t global_state_lock;
67 */ 67 */
68void drbd_md_io_complete(struct bio *bio, int error) 68void drbd_md_io_complete(struct bio *bio, int error)
69{ 69{
70 struct drbd_md_io *md_io;
71 struct drbd_device *device; 70 struct drbd_device *device;
72 71
73 md_io = (struct drbd_md_io *)bio->bi_private; 72 device = bio->bi_private;
74 device = container_of(md_io, struct drbd_device, md_io); 73 device->md_io.error = error;
75
76 md_io->error = error;
77 74
78 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
79 * to timeout on the lower level device, and eventually detach from it. 76 * to timeout on the lower level device, and eventually detach from it.
@@ -87,7 +84,7 @@ void drbd_md_io_complete(struct bio *bio, int error)
87 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
88 */ 85 */
89 drbd_md_put_buffer(device); 86 drbd_md_put_buffer(device);
90 md_io->done = 1; 87 device->md_io.done = 1;
91 wake_up(&device->misc_wait); 88 wake_up(&device->misc_wait);
92 bio_put(bio); 89 bio_put(bio);
93 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
@@ -135,6 +132,7 @@ void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(l
135 i = peer_req->i; 132 i = peer_req->i;
136 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
137 block_id = peer_req->block_id; 134 block_id = peer_req->block_id;
135 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
138 136
139 spin_lock_irqsave(&device->resource->req_lock, flags); 137 spin_lock_irqsave(&device->resource->req_lock, flags);
140 device->writ_cnt += peer_req->i.size >> 9; 138 device->writ_cnt += peer_req->i.size >> 9;
@@ -398,9 +396,6 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector,
398 if (!get_ldev(device)) 396 if (!get_ldev(device))
399 return -EIO; 397 return -EIO;
400 398
401 if (drbd_rs_should_slow_down(device, sector))
402 goto defer;
403
404 /* GFP_TRY, because if there is no memory available right now, this may 399 /* GFP_TRY, because if there is no memory available right now, this may
405 * be rescheduled for later. It is "only" background resync, after all. */ 400 * be rescheduled for later. It is "only" background resync, after all. */
406 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 401 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
@@ -410,7 +405,7 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector,
410 405
411 peer_req->w.cb = w_e_send_csum; 406 peer_req->w.cb = w_e_send_csum;
412 spin_lock_irq(&device->resource->req_lock); 407 spin_lock_irq(&device->resource->req_lock);
413 list_add(&peer_req->w.list, &device->read_ee); 408 list_add_tail(&peer_req->w.list, &device->read_ee);
414 spin_unlock_irq(&device->resource->req_lock); 409 spin_unlock_irq(&device->resource->req_lock);
415 410
416 atomic_add(size >> 9, &device->rs_sect_ev); 411 atomic_add(size >> 9, &device->rs_sect_ev);
@@ -452,9 +447,9 @@ void resync_timer_fn(unsigned long data)
452{ 447{
453 struct drbd_device *device = (struct drbd_device *) data; 448 struct drbd_device *device = (struct drbd_device *) data;
454 449
455 if (list_empty(&device->resync_work.list)) 450 drbd_queue_work_if_unqueued(
456 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 451 &first_peer_device(device)->connection->sender_work,
457 &device->resync_work); 452 &device->resync_work);
458} 453}
459 454
460static void fifo_set(struct fifo_buffer *fb, int value) 455static void fifo_set(struct fifo_buffer *fb, int value)
@@ -504,9 +499,9 @@ struct fifo_buffer *fifo_alloc(int fifo_size)
504static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 499static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
505{ 500{
506 struct disk_conf *dc; 501 struct disk_conf *dc;
507 unsigned int want; /* The number of sectors we want in the proxy */ 502 unsigned int want; /* The number of sectors we want in-flight */
508 int req_sect; /* Number of sectors to request in this turn */ 503 int req_sect; /* Number of sectors to request in this turn */
509 int correction; /* Number of sectors more we need in the proxy*/ 504 int correction; /* Number of sectors more we need in-flight */
510 int cps; /* correction per invocation of drbd_rs_controller() */ 505 int cps; /* correction per invocation of drbd_rs_controller() */
511 int steps; /* Number of time steps to plan ahead */ 506 int steps; /* Number of time steps to plan ahead */
512 int curr_corr; 507 int curr_corr;
@@ -577,20 +572,27 @@ static int drbd_rs_number_requests(struct drbd_device *device)
577 * potentially causing a distributed deadlock on congestion during 572 * potentially causing a distributed deadlock on congestion during
578 * online-verify or (checksum-based) resync, if max-buffers, 573 * online-verify or (checksum-based) resync, if max-buffers,
579 * socket buffer sizes and resync rate settings are mis-configured. */ 574 * socket buffer sizes and resync rate settings are mis-configured. */
580 if (mxb - device->rs_in_flight < number) 575
581 number = mxb - device->rs_in_flight; 576 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577 * mxb (as used here, and in drbd_alloc_pages on the peer) is
578 * "number of pages" (typically also 4k),
579 * but "rs_in_flight" is in "sectors" (512 Byte). */
580 if (mxb - device->rs_in_flight/8 < number)
581 number = mxb - device->rs_in_flight/8;
582 582
583 return number; 583 return number;
584} 584}
585 585
586static int make_resync_request(struct drbd_device *device, int cancel) 586static int make_resync_request(struct drbd_device *const device, int cancel)
587{ 587{
588 struct drbd_peer_device *const peer_device = first_peer_device(device);
589 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
588 unsigned long bit; 590 unsigned long bit;
589 sector_t sector; 591 sector_t sector;
590 const sector_t capacity = drbd_get_capacity(device->this_bdev); 592 const sector_t capacity = drbd_get_capacity(device->this_bdev);
591 int max_bio_size; 593 int max_bio_size;
592 int number, rollback_i, size; 594 int number, rollback_i, size;
593 int align, queued, sndbuf; 595 int align, requeue = 0;
594 int i = 0; 596 int i = 0;
595 597
596 if (unlikely(cancel)) 598 if (unlikely(cancel))
@@ -617,17 +619,22 @@ static int make_resync_request(struct drbd_device *device, int cancel)
617 goto requeue; 619 goto requeue;
618 620
619 for (i = 0; i < number; i++) { 621 for (i = 0; i < number; i++) {
620 /* Stop generating RS requests, when half of the send buffer is filled */ 622 /* Stop generating RS requests when half of the send buffer is filled,
621 mutex_lock(&first_peer_device(device)->connection->data.mutex); 623 * but notify TCP that we'd like to have more space. */
622 if (first_peer_device(device)->connection->data.socket) { 624 mutex_lock(&connection->data.mutex);
623 queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued; 625 if (connection->data.socket) {
624 sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf; 626 struct sock *sk = connection->data.socket->sk;
625 } else { 627 int queued = sk->sk_wmem_queued;
626 queued = 1; 628 int sndbuf = sk->sk_sndbuf;
627 sndbuf = 0; 629 if (queued > sndbuf / 2) {
628 } 630 requeue = 1;
629 mutex_unlock(&first_peer_device(device)->connection->data.mutex); 631 if (sk->sk_socket)
630 if (queued > sndbuf / 2) 632 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633 }
634 } else
635 requeue = 1;
636 mutex_unlock(&connection->data.mutex);
637 if (requeue)
631 goto requeue; 638 goto requeue;
632 639
633next_sector: 640next_sector:
@@ -642,8 +649,7 @@ next_sector:
642 649
643 sector = BM_BIT_TO_SECT(bit); 650 sector = BM_BIT_TO_SECT(bit);
644 651
645 if (drbd_rs_should_slow_down(device, sector) || 652 if (drbd_try_rs_begin_io(device, sector)) {
646 drbd_try_rs_begin_io(device, sector)) {
647 device->bm_resync_fo = bit; 653 device->bm_resync_fo = bit;
648 goto requeue; 654 goto requeue;
649 } 655 }
@@ -696,9 +702,9 @@ next_sector:
696 /* adjust very last sectors, in case we are oddly sized */ 702 /* adjust very last sectors, in case we are oddly sized */
697 if (sector + (size>>9) > capacity) 703 if (sector + (size>>9) > capacity)
698 size = (capacity-sector)<<9; 704 size = (capacity-sector)<<9;
699 if (first_peer_device(device)->connection->agreed_pro_version >= 89 && 705
700 first_peer_device(device)->connection->csums_tfm) { 706 if (device->use_csums) {
701 switch (read_for_csum(first_peer_device(device), sector, size)) { 707 switch (read_for_csum(peer_device, sector, size)) {
702 case -EIO: /* Disk failure */ 708 case -EIO: /* Disk failure */
703 put_ldev(device); 709 put_ldev(device);
704 return -EIO; 710 return -EIO;
@@ -717,7 +723,7 @@ next_sector:
717 int err; 723 int err;
718 724
719 inc_rs_pending(device); 725 inc_rs_pending(device);
720 err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST, 726 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
721 sector, size, ID_SYNCER); 727 sector, size, ID_SYNCER);
722 if (err) { 728 if (err) {
723 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 729 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
@@ -774,8 +780,7 @@ static int make_ov_request(struct drbd_device *device, int cancel)
774 780
775 size = BM_BLOCK_SIZE; 781 size = BM_BLOCK_SIZE;
776 782
777 if (drbd_rs_should_slow_down(device, sector) || 783 if (drbd_try_rs_begin_io(device, sector)) {
778 drbd_try_rs_begin_io(device, sector)) {
779 device->ov_position = sector; 784 device->ov_position = sector;
780 goto requeue; 785 goto requeue;
781 } 786 }
@@ -911,7 +916,7 @@ int drbd_resync_finished(struct drbd_device *device)
911 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 916 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
912 khelper_cmd = "after-resync-target"; 917 khelper_cmd = "after-resync-target";
913 918
914 if (first_peer_device(device)->connection->csums_tfm && device->rs_total) { 919 if (device->use_csums && device->rs_total) {
915 const unsigned long s = device->rs_same_csum; 920 const unsigned long s = device->rs_same_csum;
916 const unsigned long t = device->rs_total; 921 const unsigned long t = device->rs_total;
917 const int ratio = 922 const int ratio =
@@ -1351,13 +1356,15 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
1351{ 1356{
1352 struct drbd_request *req = container_of(w, struct drbd_request, w); 1357 struct drbd_request *req = container_of(w, struct drbd_request, w);
1353 struct drbd_device *device = req->device; 1358 struct drbd_device *device = req->device;
1354 struct drbd_connection *connection = first_peer_device(device)->connection; 1359 struct drbd_peer_device *const peer_device = first_peer_device(device);
1360 struct drbd_connection *const connection = peer_device->connection;
1355 int err; 1361 int err;
1356 1362
1357 if (unlikely(cancel)) { 1363 if (unlikely(cancel)) {
1358 req_mod(req, SEND_CANCELED); 1364 req_mod(req, SEND_CANCELED);
1359 return 0; 1365 return 0;
1360 } 1366 }
1367 req->pre_send_jif = jiffies;
1361 1368
1362 /* this time, no connection->send.current_epoch_writes++; 1369 /* this time, no connection->send.current_epoch_writes++;
1363 * If it was sent, it was the closing barrier for the last 1370 * If it was sent, it was the closing barrier for the last
@@ -1365,7 +1372,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
1365 * No more barriers will be sent, until we leave AHEAD mode again. */ 1372 * No more barriers will be sent, until we leave AHEAD mode again. */
1366 maybe_send_barrier(connection, req->epoch); 1373 maybe_send_barrier(connection, req->epoch);
1367 1374
1368 err = drbd_send_out_of_sync(first_peer_device(device), req); 1375 err = drbd_send_out_of_sync(peer_device, req);
1369 req_mod(req, OOS_HANDED_TO_NETWORK); 1376 req_mod(req, OOS_HANDED_TO_NETWORK);
1370 1377
1371 return err; 1378 return err;
@@ -1380,19 +1387,21 @@ int w_send_dblock(struct drbd_work *w, int cancel)
1380{ 1387{
1381 struct drbd_request *req = container_of(w, struct drbd_request, w); 1388 struct drbd_request *req = container_of(w, struct drbd_request, w);
1382 struct drbd_device *device = req->device; 1389 struct drbd_device *device = req->device;
1383 struct drbd_connection *connection = first_peer_device(device)->connection; 1390 struct drbd_peer_device *const peer_device = first_peer_device(device);
1391 struct drbd_connection *connection = peer_device->connection;
1384 int err; 1392 int err;
1385 1393
1386 if (unlikely(cancel)) { 1394 if (unlikely(cancel)) {
1387 req_mod(req, SEND_CANCELED); 1395 req_mod(req, SEND_CANCELED);
1388 return 0; 1396 return 0;
1389 } 1397 }
1398 req->pre_send_jif = jiffies;
1390 1399
1391 re_init_if_first_write(connection, req->epoch); 1400 re_init_if_first_write(connection, req->epoch);
1392 maybe_send_barrier(connection, req->epoch); 1401 maybe_send_barrier(connection, req->epoch);
1393 connection->send.current_epoch_writes++; 1402 connection->send.current_epoch_writes++;
1394 1403
1395 err = drbd_send_dblock(first_peer_device(device), req); 1404 err = drbd_send_dblock(peer_device, req);
1396 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1405 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1397 1406
1398 return err; 1407 return err;
@@ -1407,19 +1416,21 @@ int w_send_read_req(struct drbd_work *w, int cancel)
1407{ 1416{
1408 struct drbd_request *req = container_of(w, struct drbd_request, w); 1417 struct drbd_request *req = container_of(w, struct drbd_request, w);
1409 struct drbd_device *device = req->device; 1418 struct drbd_device *device = req->device;
1410 struct drbd_connection *connection = first_peer_device(device)->connection; 1419 struct drbd_peer_device *const peer_device = first_peer_device(device);
1420 struct drbd_connection *connection = peer_device->connection;
1411 int err; 1421 int err;
1412 1422
1413 if (unlikely(cancel)) { 1423 if (unlikely(cancel)) {
1414 req_mod(req, SEND_CANCELED); 1424 req_mod(req, SEND_CANCELED);
1415 return 0; 1425 return 0;
1416 } 1426 }
1427 req->pre_send_jif = jiffies;
1417 1428
1418 /* Even read requests may close a write epoch, 1429 /* Even read requests may close a write epoch,
1419 * if there was any yet. */ 1430 * if there was any yet. */
1420 maybe_send_barrier(connection, req->epoch); 1431 maybe_send_barrier(connection, req->epoch);
1421 1432
1422 err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size, 1433 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1423 (unsigned long)req); 1434 (unsigned long)req);
1424 1435
1425 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1436 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1433,7 +1444,7 @@ int w_restart_disk_io(struct drbd_work *w, int cancel)
1433 struct drbd_device *device = req->device; 1444 struct drbd_device *device = req->device;
1434 1445
1435 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1446 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1436 drbd_al_begin_io(device, &req->i, false); 1447 drbd_al_begin_io(device, &req->i);
1437 1448
1438 drbd_req_make_private_bio(req, req->master_bio); 1449 drbd_req_make_private_bio(req, req->master_bio);
1439 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1450 req->private_bio->bi_bdev = device->ldev->backing_bdev;
@@ -1601,26 +1612,32 @@ void drbd_rs_controller_reset(struct drbd_device *device)
1601void start_resync_timer_fn(unsigned long data) 1612void start_resync_timer_fn(unsigned long data)
1602{ 1613{
1603 struct drbd_device *device = (struct drbd_device *) data; 1614 struct drbd_device *device = (struct drbd_device *) data;
1604 1615 drbd_device_post_work(device, RS_START);
1605 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1606 &device->start_resync_work);
1607} 1616}
1608 1617
1609int w_start_resync(struct drbd_work *w, int cancel) 1618static void do_start_resync(struct drbd_device *device)
1610{ 1619{
1611 struct drbd_device *device =
1612 container_of(w, struct drbd_device, start_resync_work);
1613
1614 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1620 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1615 drbd_warn(device, "w_start_resync later...\n"); 1621 drbd_warn(device, "postponing start_resync ...\n");
1616 device->start_resync_timer.expires = jiffies + HZ/10; 1622 device->start_resync_timer.expires = jiffies + HZ/10;
1617 add_timer(&device->start_resync_timer); 1623 add_timer(&device->start_resync_timer);
1618 return 0; 1624 return;
1619 } 1625 }
1620 1626
1621 drbd_start_resync(device, C_SYNC_SOURCE); 1627 drbd_start_resync(device, C_SYNC_SOURCE);
1622 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1628 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1623 return 0; 1629}
1630
1631static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1632{
1633 bool csums_after_crash_only;
1634 rcu_read_lock();
1635 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1636 rcu_read_unlock();
1637 return connection->agreed_pro_version >= 89 && /* supported? */
1638 connection->csums_tfm && /* configured? */
1639 (csums_after_crash_only == 0 /* use for each resync? */
1640 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1624} 1641}
1625 1642
1626/** 1643/**
@@ -1633,6 +1650,8 @@ int w_start_resync(struct drbd_work *w, int cancel)
1633 */ 1650 */
1634void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1651void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1635{ 1652{
1653 struct drbd_peer_device *peer_device = first_peer_device(device);
1654 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1636 union drbd_state ns; 1655 union drbd_state ns;
1637 int r; 1656 int r;
1638 1657
@@ -1651,7 +1670,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1651 if (r > 0) { 1670 if (r > 0) {
1652 drbd_info(device, "before-resync-target handler returned %d, " 1671 drbd_info(device, "before-resync-target handler returned %d, "
1653 "dropping connection.\n", r); 1672 "dropping connection.\n", r);
1654 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD); 1673 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1655 return; 1674 return;
1656 } 1675 }
1657 } else /* C_SYNC_SOURCE */ { 1676 } else /* C_SYNC_SOURCE */ {
@@ -1664,7 +1683,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1664 } else { 1683 } else {
1665 drbd_info(device, "before-resync-source handler returned %d, " 1684 drbd_info(device, "before-resync-source handler returned %d, "
1666 "dropping connection.\n", r); 1685 "dropping connection.\n", r);
1667 conn_request_state(first_peer_device(device)->connection, 1686 conn_request_state(connection,
1668 NS(conn, C_DISCONNECTING), CS_HARD); 1687 NS(conn, C_DISCONNECTING), CS_HARD);
1669 return; 1688 return;
1670 } 1689 }
@@ -1672,7 +1691,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1672 } 1691 }
1673 } 1692 }
1674 1693
1675 if (current == first_peer_device(device)->connection->worker.task) { 1694 if (current == connection->worker.task) {
1676 /* The worker should not sleep waiting for state_mutex, 1695 /* The worker should not sleep waiting for state_mutex,
1677 that can take long */ 1696 that can take long */
1678 if (!mutex_trylock(device->state_mutex)) { 1697 if (!mutex_trylock(device->state_mutex)) {
@@ -1733,11 +1752,20 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1733 device->rs_mark_time[i] = now; 1752 device->rs_mark_time[i] = now;
1734 } 1753 }
1735 _drbd_pause_after(device); 1754 _drbd_pause_after(device);
1755 /* Forget potentially stale cached per resync extent bit-counts.
1756 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1757 * disabled, and know the disk state is ok. */
1758 spin_lock(&device->al_lock);
1759 lc_reset(device->resync);
1760 device->resync_locked = 0;
1761 device->resync_wenr = LC_FREE;
1762 spin_unlock(&device->al_lock);
1736 } 1763 }
1737 write_unlock(&global_state_lock); 1764 write_unlock(&global_state_lock);
1738 spin_unlock_irq(&device->resource->req_lock); 1765 spin_unlock_irq(&device->resource->req_lock);
1739 1766
1740 if (r == SS_SUCCESS) { 1767 if (r == SS_SUCCESS) {
1768 wake_up(&device->al_wait); /* for lc_reset() above */
1741 /* reset rs_last_bcast when a resync or verify is started, 1769 /* reset rs_last_bcast when a resync or verify is started,
1742 * to deal with potential jiffies wrap. */ 1770 * to deal with potential jiffies wrap. */
1743 device->rs_last_bcast = jiffies - HZ; 1771 device->rs_last_bcast = jiffies - HZ;
@@ -1746,8 +1774,12 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1746 drbd_conn_str(ns.conn), 1774 drbd_conn_str(ns.conn),
1747 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1775 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1748 (unsigned long) device->rs_total); 1776 (unsigned long) device->rs_total);
1749 if (side == C_SYNC_TARGET) 1777 if (side == C_SYNC_TARGET) {
1750 device->bm_resync_fo = 0; 1778 device->bm_resync_fo = 0;
1779 device->use_csums = use_checksum_based_resync(connection, device);
1780 } else {
1781 device->use_csums = 0;
1782 }
1751 1783
1752 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1784 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1753 * with w_send_oos, or the sync target will get confused as to 1785 * with w_send_oos, or the sync target will get confused as to
@@ -1756,12 +1788,10 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1756 * drbd_resync_finished from here in that case. 1788 * drbd_resync_finished from here in that case.
1757 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1789 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1758 * and from after_state_ch otherwise. */ 1790 * and from after_state_ch otherwise. */
1759 if (side == C_SYNC_SOURCE && 1791 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1760 first_peer_device(device)->connection->agreed_pro_version < 96) 1792 drbd_gen_and_send_sync_uuid(peer_device);
1761 drbd_gen_and_send_sync_uuid(first_peer_device(device));
1762 1793
1763 if (first_peer_device(device)->connection->agreed_pro_version < 95 && 1794 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1764 device->rs_total == 0) {
1765 /* This still has a race (about when exactly the peers 1795 /* This still has a race (about when exactly the peers
1766 * detect connection loss) that can lead to a full sync 1796 * detect connection loss) that can lead to a full sync
1767 * on next handshake. In 8.3.9 we fixed this with explicit 1797 * on next handshake. In 8.3.9 we fixed this with explicit
@@ -1777,7 +1807,7 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1777 int timeo; 1807 int timeo;
1778 1808
1779 rcu_read_lock(); 1809 rcu_read_lock();
1780 nc = rcu_dereference(first_peer_device(device)->connection->net_conf); 1810 nc = rcu_dereference(connection->net_conf);
1781 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1811 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1782 rcu_read_unlock(); 1812 rcu_read_unlock();
1783 schedule_timeout_interruptible(timeo); 1813 schedule_timeout_interruptible(timeo);
@@ -1799,10 +1829,165 @@ void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1799 mutex_unlock(device->state_mutex); 1829 mutex_unlock(device->state_mutex);
1800} 1830}
1801 1831
1832static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1833{
1834 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1835 device->rs_last_bcast = jiffies;
1836
1837 if (!get_ldev(device))
1838 return;
1839
1840 drbd_bm_write_lazy(device, 0);
1841 if (resync_done && is_sync_state(device->state.conn))
1842 drbd_resync_finished(device);
1843
1844 drbd_bcast_event(device, &sib);
1845 /* update timestamp, in case it took a while to write out stuff */
1846 device->rs_last_bcast = jiffies;
1847 put_ldev(device);
1848}
1849
1850static void drbd_ldev_destroy(struct drbd_device *device)
1851{
1852 lc_destroy(device->resync);
1853 device->resync = NULL;
1854 lc_destroy(device->act_log);
1855 device->act_log = NULL;
1856 __no_warn(local,
1857 drbd_free_ldev(device->ldev);
1858 device->ldev = NULL;);
1859 clear_bit(GOING_DISKLESS, &device->flags);
1860 wake_up(&device->misc_wait);
1861}
1862
1863static void go_diskless(struct drbd_device *device)
1864{
1865 D_ASSERT(device, device->state.disk == D_FAILED);
1866 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1867 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1868 * the protected members anymore, though, so once put_ldev reaches zero
1869 * again, it will be safe to free them. */
1870
1871 /* Try to write changed bitmap pages, read errors may have just
1872 * set some bits outside the area covered by the activity log.
1873 *
1874 * If we have an IO error during the bitmap writeout,
1875 * we will want a full sync next time, just in case.
1876 * (Do we want a specific meta data flag for this?)
1877 *
1878 * If that does not make it to stable storage either,
1879 * we cannot do anything about that anymore.
1880 *
1881 * We still need to check if both bitmap and ldev are present, we may
1882 * end up here after a failed attach, before ldev was even assigned.
1883 */
1884 if (device->bitmap && device->ldev) {
1885 /* An interrupted resync or similar is allowed to recounts bits
1886 * while we detach.
1887 * Any modifications would not be expected anymore, though.
1888 */
1889 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1890 "detach", BM_LOCKED_TEST_ALLOWED)) {
1891 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1892 drbd_md_set_flag(device, MDF_FULL_SYNC);
1893 drbd_md_sync(device);
1894 }
1895 }
1896 }
1897
1898 drbd_force_state(device, NS(disk, D_DISKLESS));
1899}
1900
1901static int do_md_sync(struct drbd_device *device)
1902{
1903 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1904 drbd_md_sync(device);
1905 return 0;
1906}
1907
1908/* only called from drbd_worker thread, no locking */
1909void __update_timing_details(
1910 struct drbd_thread_timing_details *tdp,
1911 unsigned int *cb_nr,
1912 void *cb,
1913 const char *fn, const unsigned int line)
1914{
1915 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1916 struct drbd_thread_timing_details *td = tdp + i;
1917
1918 td->start_jif = jiffies;
1919 td->cb_addr = cb;
1920 td->caller_fn = fn;
1921 td->line = line;
1922 td->cb_nr = *cb_nr;
1923
1924 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1925 td = tdp + i;
1926 memset(td, 0, sizeof(*td));
1927
1928 ++(*cb_nr);
1929}
1930
1931#define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit))
1932static void do_device_work(struct drbd_device *device, const unsigned long todo)
1933{
1934 if (WORK_PENDING(MD_SYNC, todo))
1935 do_md_sync(device);
1936 if (WORK_PENDING(RS_DONE, todo) ||
1937 WORK_PENDING(RS_PROGRESS, todo))
1938 update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
1939 if (WORK_PENDING(GO_DISKLESS, todo))
1940 go_diskless(device);
1941 if (WORK_PENDING(DESTROY_DISK, todo))
1942 drbd_ldev_destroy(device);
1943 if (WORK_PENDING(RS_START, todo))
1944 do_start_resync(device);
1945}
1946
1947#define DRBD_DEVICE_WORK_MASK \
1948 ((1UL << GO_DISKLESS) \
1949 |(1UL << DESTROY_DISK) \
1950 |(1UL << MD_SYNC) \
1951 |(1UL << RS_START) \
1952 |(1UL << RS_PROGRESS) \
1953 |(1UL << RS_DONE) \
1954 )
1955
1956static unsigned long get_work_bits(unsigned long *flags)
1957{
1958 unsigned long old, new;
1959 do {
1960 old = *flags;
1961 new = old & ~DRBD_DEVICE_WORK_MASK;
1962 } while (cmpxchg(flags, old, new) != old);
1963 return old & DRBD_DEVICE_WORK_MASK;
1964}
1965
1966static void do_unqueued_work(struct drbd_connection *connection)
1967{
1968 struct drbd_peer_device *peer_device;
1969 int vnr;
1970
1971 rcu_read_lock();
1972 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1973 struct drbd_device *device = peer_device->device;
1974 unsigned long todo = get_work_bits(&device->flags);
1975 if (!todo)
1976 continue;
1977
1978 kref_get(&device->kref);
1979 rcu_read_unlock();
1980 do_device_work(device, todo);
1981 kref_put(&device->kref, drbd_destroy_device);
1982 rcu_read_lock();
1983 }
1984 rcu_read_unlock();
1985}
1986
1802static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1987static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1803{ 1988{
1804 spin_lock_irq(&queue->q_lock); 1989 spin_lock_irq(&queue->q_lock);
1805 list_splice_init(&queue->q, work_list); 1990 list_splice_tail_init(&queue->q, work_list);
1806 spin_unlock_irq(&queue->q_lock); 1991 spin_unlock_irq(&queue->q_lock);
1807 return !list_empty(work_list); 1992 return !list_empty(work_list);
1808} 1993}
@@ -1851,7 +2036,7 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
1851 /* dequeue single item only, 2036 /* dequeue single item only,
1852 * we still use drbd_queue_work_front() in some places */ 2037 * we still use drbd_queue_work_front() in some places */
1853 if (!list_empty(&connection->sender_work.q)) 2038 if (!list_empty(&connection->sender_work.q))
1854 list_move(connection->sender_work.q.next, work_list); 2039 list_splice_tail_init(&connection->sender_work.q, work_list);
1855 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2040 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
1856 if (!list_empty(work_list) || signal_pending(current)) { 2041 if (!list_empty(work_list) || signal_pending(current)) {
1857 spin_unlock_irq(&connection->resource->req_lock); 2042 spin_unlock_irq(&connection->resource->req_lock);
@@ -1873,6 +2058,14 @@ static void wait_for_work(struct drbd_connection *connection, struct list_head *
1873 if (send_barrier) 2058 if (send_barrier)
1874 maybe_send_barrier(connection, 2059 maybe_send_barrier(connection,
1875 connection->send.current_epoch_nr + 1); 2060 connection->send.current_epoch_nr + 1);
2061
2062 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2063 break;
2064
2065 /* drbd_send() may have called flush_signals() */
2066 if (get_t_state(&connection->worker) != RUNNING)
2067 break;
2068
1876 schedule(); 2069 schedule();
1877 /* may be woken up for other things but new work, too, 2070 /* may be woken up for other things but new work, too,
1878 * e.g. if the current epoch got closed. 2071 * e.g. if the current epoch got closed.
@@ -1906,10 +2099,15 @@ int drbd_worker(struct drbd_thread *thi)
1906 while (get_t_state(thi) == RUNNING) { 2099 while (get_t_state(thi) == RUNNING) {
1907 drbd_thread_current_set_cpu(thi); 2100 drbd_thread_current_set_cpu(thi);
1908 2101
1909 /* as long as we use drbd_queue_work_front(), 2102 if (list_empty(&work_list)) {
1910 * we may only dequeue single work items here, not batches. */ 2103 update_worker_timing_details(connection, wait_for_work);
1911 if (list_empty(&work_list))
1912 wait_for_work(connection, &work_list); 2104 wait_for_work(connection, &work_list);
2105 }
2106
2107 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2108 update_worker_timing_details(connection, do_unqueued_work);
2109 do_unqueued_work(connection);
2110 }
1913 2111
1914 if (signal_pending(current)) { 2112 if (signal_pending(current)) {
1915 flush_signals(current); 2113 flush_signals(current);
@@ -1926,6 +2124,7 @@ int drbd_worker(struct drbd_thread *thi)
1926 while (!list_empty(&work_list)) { 2124 while (!list_empty(&work_list)) {
1927 w = list_first_entry(&work_list, struct drbd_work, list); 2125 w = list_first_entry(&work_list, struct drbd_work, list);
1928 list_del_init(&w->list); 2126 list_del_init(&w->list);
2127 update_worker_timing_details(connection, w->cb);
1929 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2128 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1930 continue; 2129 continue;
1931 if (connection->cstate >= C_WF_REPORT_PARAMS) 2130 if (connection->cstate >= C_WF_REPORT_PARAMS)
@@ -1934,13 +2133,18 @@ int drbd_worker(struct drbd_thread *thi)
1934 } 2133 }
1935 2134
1936 do { 2135 do {
2136 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2137 update_worker_timing_details(connection, do_unqueued_work);
2138 do_unqueued_work(connection);
2139 }
1937 while (!list_empty(&work_list)) { 2140 while (!list_empty(&work_list)) {
1938 w = list_first_entry(&work_list, struct drbd_work, list); 2141 w = list_first_entry(&work_list, struct drbd_work, list);
1939 list_del_init(&w->list); 2142 list_del_init(&w->list);
2143 update_worker_timing_details(connection, w->cb);
1940 w->cb(w, 1); 2144 w->cb(w, 1);
1941 } 2145 }
1942 dequeue_work_batch(&connection->sender_work, &work_list); 2146 dequeue_work_batch(&connection->sender_work, &work_list);
1943 } while (!list_empty(&work_list)); 2147 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
1944 2148
1945 rcu_read_lock(); 2149 rcu_read_lock();
1946 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2150 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {