aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2010-05-14 11:10:48 -0400
committerPhilipp Reisner <philipp.reisner@linbit.com>2010-05-17 20:01:23 -0400
commit45bb912bd5ea4d2b3a270a93cbdf767a0e2df6f5 (patch)
treed95d27ea8e945fcda3427c50a5bc062c804c6eff
parent708d740ed8242b84eefc63df144313a7308c7de5 (diff)
drbd: Allow drbd_epoch_entries to use multiple bios.
This should allow for better performance if the lower level IO stack of the peers differs in limits exposed either via the queue, or via some merge_bvec_fn. Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
-rw-r--r--drivers/block/drbd/drbd_int.h90
-rw-r--r--drivers/block/drbd/drbd_main.c19
-rw-r--r--drivers/block/drbd/drbd_nl.c18
-rw-r--r--drivers/block/drbd/drbd_receiver.c483
-rw-r--r--drivers/block/drbd/drbd_worker.c178
-rw-r--r--drivers/block/drbd/drbd_wrappers.h16
6 files changed, 480 insertions, 324 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 1bc86ddac38b..4b97f30bb7c6 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -740,18 +740,6 @@ enum epoch_event {
740 EV_CLEANUP = 32, /* used as flag */ 740 EV_CLEANUP = 32, /* used as flag */
741}; 741};
742 742
743struct drbd_epoch_entry {
744 struct drbd_work w;
745 struct drbd_conf *mdev;
746 struct bio *private_bio;
747 struct hlist_node colision;
748 sector_t sector;
749 unsigned int size;
750 unsigned int flags;
751 struct drbd_epoch *epoch;
752 u64 block_id;
753};
754
755struct drbd_wq_barrier { 743struct drbd_wq_barrier {
756 struct drbd_work w; 744 struct drbd_work w;
757 struct completion done; 745 struct completion done;
@@ -762,17 +750,49 @@ struct digest_info {
762 void *digest; 750 void *digest;
763}; 751};
764 752
765/* ee flag bits */ 753struct drbd_epoch_entry {
754 struct drbd_work w;
755 struct hlist_node colision;
756 struct drbd_epoch *epoch;
757 struct drbd_conf *mdev;
758 struct page *pages;
759 atomic_t pending_bios;
760 unsigned int size;
761 /* see comments on ee flag bits below */
762 unsigned long flags;
763 sector_t sector;
764 u64 block_id;
765};
766
767/* ee flag bits.
768 * While corresponding bios are in flight, the only modification will be
769 * set_bit WAS_ERROR, which has to be atomic.
770 * If no bios are in flight yet, or all have been completed,
771 * non-atomic modification to ee->flags is ok.
772 */
766enum { 773enum {
767 __EE_CALL_AL_COMPLETE_IO, 774 __EE_CALL_AL_COMPLETE_IO,
768 __EE_CONFLICT_PENDING,
769 __EE_MAY_SET_IN_SYNC, 775 __EE_MAY_SET_IN_SYNC,
776
777 /* This epoch entry closes an epoch using a barrier.
778 * On sucessful completion, the epoch is released,
779 * and the P_BARRIER_ACK send. */
770 __EE_IS_BARRIER, 780 __EE_IS_BARRIER,
781
782 /* In case a barrier failed,
783 * we need to resubmit without the barrier flag. */
784 __EE_RESUBMITTED,
785
786 /* we may have several bios per epoch entry.
787 * if any of those fail, we set this flag atomically
788 * from the endio callback */
789 __EE_WAS_ERROR,
771}; 790};
772#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO) 791#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
773#define EE_CONFLICT_PENDING (1<<__EE_CONFLICT_PENDING)
774#define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC) 792#define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC)
775#define EE_IS_BARRIER (1<<__EE_IS_BARRIER) 793#define EE_IS_BARRIER (1<<__EE_IS_BARRIER)
794#define EE_RESUBMITTED (1<<__EE_RESUBMITTED)
795#define EE_WAS_ERROR (1<<__EE_WAS_ERROR)
776 796
777/* global flag bits */ 797/* global flag bits */
778enum { 798enum {
@@ -1441,7 +1461,8 @@ static inline void ov_oos_print(struct drbd_conf *mdev)
1441} 1461}
1442 1462
1443 1463
1444extern void drbd_csum(struct drbd_conf *, struct crypto_hash *, struct bio *, void *); 1464extern void drbd_csum_bio(struct drbd_conf *, struct crypto_hash *, struct bio *, void *);
1465extern void drbd_csum_ee(struct drbd_conf *, struct crypto_hash *, struct drbd_epoch_entry *, void *);
1445/* worker callbacks */ 1466/* worker callbacks */
1446extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int); 1467extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int);
1447extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int); 1468extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int);
@@ -1465,6 +1486,8 @@ extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int);
1465extern void resync_timer_fn(unsigned long data); 1486extern void resync_timer_fn(unsigned long data);
1466 1487
1467/* drbd_receiver.c */ 1488/* drbd_receiver.c */
1489extern int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1490 const unsigned rw, const int fault_type);
1468extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list); 1491extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list);
1469extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, 1492extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
1470 u64 id, 1493 u64 id,
@@ -1620,6 +1643,41 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
1620 * inline helper functions 1643 * inline helper functions
1621 *************************/ 1644 *************************/
1622 1645
1646/* see also page_chain_add and friends in drbd_receiver.c */
1647static inline struct page *page_chain_next(struct page *page)
1648{
1649 return (struct page *)page_private(page);
1650}
1651#define page_chain_for_each(page) \
1652 for (; page && ({ prefetch(page_chain_next(page)); 1; }); \
1653 page = page_chain_next(page))
1654#define page_chain_for_each_safe(page, n) \
1655 for (; page && ({ n = page_chain_next(page); 1; }); page = n)
1656
1657static inline int drbd_bio_has_active_page(struct bio *bio)
1658{
1659 struct bio_vec *bvec;
1660 int i;
1661
1662 __bio_for_each_segment(bvec, bio, i, 0) {
1663 if (page_count(bvec->bv_page) > 1)
1664 return 1;
1665 }
1666
1667 return 0;
1668}
1669
1670static inline int drbd_ee_has_active_page(struct drbd_epoch_entry *e)
1671{
1672 struct page *page = e->pages;
1673 page_chain_for_each(page) {
1674 if (page_count(page) > 1)
1675 return 1;
1676 }
1677 return 0;
1678}
1679
1680
1623static inline void drbd_state_lock(struct drbd_conf *mdev) 1681static inline void drbd_state_lock(struct drbd_conf *mdev)
1624{ 1682{
1625 wait_event(mdev->misc_wait, 1683 wait_event(mdev->misc_wait,
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 3aa0add1c230..d0fabace1452 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -2354,6 +2354,19 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2354 return 1; 2354 return 1;
2355} 2355}
2356 2356
2357static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2358{
2359 struct page *page = e->pages;
2360 unsigned len = e->size;
2361 page_chain_for_each(page) {
2362 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2363 if (!_drbd_send_page(mdev, page, 0, l))
2364 return 0;
2365 len -= l;
2366 }
2367 return 1;
2368}
2369
2357static void consider_delay_probes(struct drbd_conf *mdev) 2370static void consider_delay_probes(struct drbd_conf *mdev)
2358{ 2371{
2359 if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93) 2372 if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
@@ -2430,7 +2443,7 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2430 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE)); 2443 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2431 if (ok && dgs) { 2444 if (ok && dgs) {
2432 dgb = mdev->int_dig_out; 2445 dgb = mdev->int_dig_out;
2433 drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb); 2446 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2434 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); 2447 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2435 } 2448 }
2436 if (ok) { 2449 if (ok) {
@@ -2483,11 +2496,11 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2483 sizeof(p), MSG_MORE); 2496 sizeof(p), MSG_MORE);
2484 if (ok && dgs) { 2497 if (ok && dgs) {
2485 dgb = mdev->int_dig_out; 2498 dgb = mdev->int_dig_out;
2486 drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb); 2499 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2487 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); 2500 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2488 } 2501 }
2489 if (ok) 2502 if (ok)
2490 ok = _drbd_send_zc_bio(mdev, e->private_bio); 2503 ok = _drbd_send_zc_ee(mdev, e);
2491 2504
2492 drbd_put_data_sock(mdev); 2505 drbd_put_data_sock(mdev);
2493 2506
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 93d150661f4b..28ef76bd5230 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -2215,9 +2215,9 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
2215{ 2215{
2216 struct cn_msg *cn_reply; 2216 struct cn_msg *cn_reply;
2217 struct drbd_nl_cfg_reply *reply; 2217 struct drbd_nl_cfg_reply *reply;
2218 struct bio_vec *bvec;
2219 unsigned short *tl; 2218 unsigned short *tl;
2220 int i; 2219 struct page *page;
2220 unsigned len;
2221 2221
2222 if (!e) 2222 if (!e)
2223 return; 2223 return;
@@ -2255,11 +2255,15 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
2255 put_unaligned(T_ee_data, tl++); 2255 put_unaligned(T_ee_data, tl++);
2256 put_unaligned(e->size, tl++); 2256 put_unaligned(e->size, tl++);
2257 2257
2258 __bio_for_each_segment(bvec, e->private_bio, i, 0) { 2258 len = e->size;
2259 void *d = kmap(bvec->bv_page); 2259 page = e->pages;
2260 memcpy(tl, d + bvec->bv_offset, bvec->bv_len); 2260 page_chain_for_each(page) {
2261 kunmap(bvec->bv_page); 2261 void *d = kmap_atomic(page, KM_USER0);
2262 tl=(unsigned short*)((char*)tl + bvec->bv_len); 2262 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2263 memcpy(tl, d, l);
2264 kunmap_atomic(d, KM_USER0);
2265 tl = (unsigned short*)((char*)tl + l);
2266 len -= l;
2263 } 2267 }
2264 put_unaligned(TT_END, tl++); /* Close the tag list */ 2268 put_unaligned(TT_END, tl++); /* Close the tag list */
2265 2269
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index fee0d249adf7..388a3e8bb0d0 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo
80 80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82 82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) 83/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
84{ 159{
85 struct page *page = NULL; 160 struct page *page = NULL;
161 struct page *tmp = NULL;
162 int i = 0;
86 163
87 /* Yes, testing drbd_pp_vacant outside the lock is racy. 164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */ 165 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) { 166 if (drbd_pp_vacant >= number) {
90 spin_lock(&drbd_pp_lock); 167 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool; 168 page = page_chain_del(&drbd_pp_pool, number);
92 if (page) { 169 if (page)
93 drbd_pp_pool = (struct page *)page_private(page); 170 drbd_pp_vacant -= number;
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock); 171 spin_unlock(&drbd_pp_lock);
172 if (page)
173 return page;
98 } 174 }
175
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD, 177 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */ 178 * which in turn might block on the other node at this very place. */
102 if (!page) 179 for (i = 0; i < number; i++) {
103 page = alloc_page(GFP_TRY); 180 tmp = alloc_page(GFP_TRY);
104 if (page) 181 if (!tmp)
105 atomic_inc(&mdev->pp_in_use); 182 break;
106 return page; 183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
107} 201}
108 202
109/* kick lower level device, if we have more than (arbitrary number) 203/* kick lower level device, if we have more than (arbitrary number)
@@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed
127 221
128 list_for_each_safe(le, tle, &mdev->net_ee) { 222 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list); 223 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio)) 224 if (drbd_ee_has_active_page(e))
131 break; 225 break;
132 list_move(le, to_be_freed); 226 list_move(le, to_be_freed);
133 } 227 }
@@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
148} 242}
149 243
150/** 244/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in 245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
152 * @mdev: DRBD device. 246 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled) 247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
249 *
250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
154 * 253 *
155 * Tries to allocate a page, first from our own page pool, then from the 254 * Returns a page chain linked via page->private.
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */ 255 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) 256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
160{ 257{
161 struct page *page = NULL; 258 struct page *page = NULL;
162 DEFINE_WAIT(wait); 259 DEFINE_WAIT(wait);
163 260
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 261 /* Yes, we may run up to @number over max_buffers. If we
165 page = drbd_pp_first_page_or_try_alloc(mdev); 262 * follow it strictly, the admin will get it wrong anyways. */
166 if (page) 263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
167 return page; 264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
168 }
169 265
170 for (;;) { 266 while (page == NULL) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172 268
173 drbd_kick_lo_and_reclaim_net(mdev); 269 drbd_kick_lo_and_reclaim_net(mdev);
174 270
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev); 272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
177 if (page) 273 if (page)
178 break; 274 break;
179 } 275 }
@@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
190 } 286 }
191 finish_wait(&drbd_pp_wait, &wait); 287 finish_wait(&drbd_pp_wait, &wait);
192 288
289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
193 return page; 291 return page;
194} 292}
195 293
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. 294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ 295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) 298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{ 299{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i; 300 int i;
227 301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
228 spin_lock(&drbd_pp_lock); 302 i = page_chain_free(page);
229 __bio_for_each_segment(bvec, bio, i, 0) { 303 else {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { 304 struct page *tmp;
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); 305 tmp = page_chain_tail(page, &i);
232 p_to_be_freed = bvec->bv_page; 306 spin_lock(&drbd_pp_lock);
233 } else { 307 page_chain_add(&drbd_pp_pool, page, tmp);
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); 308 drbd_pp_vacant += i;
235 drbd_pp_pool = bvec->bv_page; 309 spin_unlock(&drbd_pp_lock);
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 } 310 }
248 311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
249 wake_up(&drbd_pp_wait); 315 wake_up(&drbd_pp_wait);
250} 316}
251 317
@@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
270 unsigned int data_size, 336 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local) 337 gfp_t gfp_mask) __must_hold(local)
272{ 338{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e; 339 struct drbd_epoch_entry *e;
275 struct page *page; 340 struct page *page;
276 struct bio *bio; 341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
277 unsigned int ds;
278 342
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) 343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL; 344 return NULL;
@@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
286 return NULL; 350 return NULL;
287 } 351 }
288 352
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); 353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
290 if (!bio) { 354 if (!page)
291 if (!(gfp_mask & __GFP_NOWARN)) 355 goto fail;
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344 356
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision); 357 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL; 358 e->epoch = NULL;
359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
349 e->flags = 0; 363 e->flags = 0;
364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
350 367
351 return e; 368 return e;
352 369
353 fail2: 370 fail:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool); 371 mempool_free(e, drbd_ee_mempool);
358
359 return NULL; 372 return NULL;
360} 373}
361 374
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{ 376{
364 struct bio *bio = e->private_bio; 377 drbd_pp_free(mdev, e->pages);
365 drbd_pp_free_bio_pages(mdev, bio); 378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision)); 379 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool); 380 mempool_free(e, drbd_ee_mempool);
369} 381}
@@ -1121,6 +1133,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1121} 1133}
1122 1134
1123/** 1135/**
1136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153 /* In most cases, we will only need one bio. But in case the lower
1154 * level restrictions happen to be different at this offset on this
1155 * side than those of the sending peer, we may need to submit the
1156 * request in more than one bio. */
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
1163 /* > e->sector, unless this is the first bio */
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
1166 /* we special case some flags in the multi-bio case, see below
1167 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1168 bio->bi_rw = rw;
1169 bio->bi_private = e;
1170 bio->bi_end_io = drbd_endio_sec;
1171
1172 bio->bi_next = bios;
1173 bios = bio;
1174 ++n_bios;
1175
1176 page_chain_for_each(page) {
1177 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1178 if (!bio_add_page(bio, page, len, 0)) {
1179 /* a single page must always be possible! */
1180 BUG_ON(bio->bi_vcnt == 0);
1181 goto next_bio;
1182 }
1183 ds -= len;
1184 sector += len >> 9;
1185 --nr_pages;
1186 }
1187 D_ASSERT(page == NULL);
1188 D_ASSERT(ds == 0);
1189
1190 atomic_set(&e->pending_bios, n_bios);
1191 do {
1192 bio = bios;
1193 bios = bios->bi_next;
1194 bio->bi_next = NULL;
1195
1196 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1197 if (bios)
1198 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1199
1200 drbd_generic_make_request(mdev, fault_type, bio);
1201
1202 /* strip off BIO_RW_BARRIER,
1203 * unless it is the first or last bio */
1204 if (bios && bios->bi_next)
1205 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1206 } while (bios);
1207 maybe_kick_lo(mdev);
1208 return 0;
1209
1210fail:
1211 while (bios) {
1212 bio = bios;
1213 bios = bios->bi_next;
1214 bio_put(bio);
1215 }
1216 return -ENOMEM;
1217}
1218
1219/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set 1220 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device. 1221 * @mdev: DRBD device.
1126 * @w: work object. 1222 * @w: work object.
@@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) 1225int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{ 1226{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1227 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, 1228 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) 1229 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch(). 1230 so that we can finish that epoch in drbd_may_finish_epoch().
@@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
1144 if (previous_epoch(mdev, e->epoch)) 1238 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); 1239 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146 1240
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference, 1241 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */ 1242 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166 1243
1167 e->w.cb = e_end_block; 1244 e->w.cb = e_end_block;
1168 1245 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1169 /* This is no longer a barrier request. */ 1246 /* drbd_submit_ee fails for one reason only:
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); 1247 * if was not able to allocate sufficient bios.
1171 1248 * requeue, try again later. */
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); 1249 e->w.cb = w_e_reissue;
1173 1250 drbd_queue_work(&mdev->data.work, &e->w);
1251 }
1174 return 1; 1252 return 1;
1175} 1253}
1176 1254
@@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1264{ 1342{
1265 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1343 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1266 struct drbd_epoch_entry *e; 1344 struct drbd_epoch_entry *e;
1267 struct bio_vec *bvec;
1268 struct page *page; 1345 struct page *page;
1269 struct bio *bio; 1346 int dgs, ds, rr;
1270 int dgs, ds, i, rr;
1271 void *dig_in = mdev->int_dig_in; 1347 void *dig_in = mdev->int_dig_in;
1272 void *dig_vv = mdev->int_dig_vv; 1348 void *dig_vv = mdev->int_dig_vv;
1273 unsigned long *data; 1349 unsigned long *data;
@@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1304 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); 1380 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1305 if (!e) 1381 if (!e)
1306 return NULL; 1382 return NULL;
1307 bio = e->private_bio; 1383
1308 ds = data_size; 1384 ds = data_size;
1309 bio_for_each_segment(bvec, bio, i) { 1385 page = e->pages;
1310 page = bvec->bv_page; 1386 page_chain_for_each(page) {
1387 unsigned len = min_t(int, ds, PAGE_SIZE);
1311 data = kmap(page); 1388 data = kmap(page);
1312 rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE)); 1389 rr = drbd_recv(mdev, data, len);
1313 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { 1390 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1314 dev_err(DEV, "Fault injection: Corrupting data on receive\n"); 1391 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1315 data[0] = data[0] ^ (unsigned long)-1; 1392 data[0] = data[0] ^ (unsigned long)-1;
1316 } 1393 }
1317 kunmap(page); 1394 kunmap(page);
1318 if (rr != min_t(int, ds, PAGE_SIZE)) { 1395 if (rr != len) {
1319 drbd_free_ee(mdev, e); 1396 drbd_free_ee(mdev, e);
1320 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1397 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1321 rr, min_t(int, ds, PAGE_SIZE)); 1398 rr, len);
1322 return NULL; 1399 return NULL;
1323 } 1400 }
1324 ds -= rr; 1401 ds -= rr;
1325 } 1402 }
1326 1403
1327 if (dgs) { 1404 if (dgs) {
1328 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1405 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1329 if (memcmp(dig_in, dig_vv, dgs)) { 1406 if (memcmp(dig_in, dig_vv, dgs)) {
1330 dev_err(DEV, "Digest integrity check FAILED.\n"); 1407 dev_err(DEV, "Digest integrity check FAILED.\n");
1331 drbd_bcast_ee(mdev, "digest failed", 1408 drbd_bcast_ee(mdev, "digest failed",
@@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1350 if (!data_size) 1427 if (!data_size)
1351 return TRUE; 1428 return TRUE;
1352 1429
1353 page = drbd_pp_alloc(mdev, 1); 1430 page = drbd_pp_alloc(mdev, 1, 1);
1354 1431
1355 data = kmap(page); 1432 data = kmap(page);
1356 while (data_size) { 1433 while (data_size) {
@@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1414 } 1491 }
1415 1492
1416 if (dgs) { 1493 if (dgs) {
1417 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1494 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1418 if (memcmp(dig_in, dig_vv, dgs)) { 1495 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); 1496 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0; 1497 return 0;
@@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u
1435 1512
1436 D_ASSERT(hlist_unhashed(&e->colision)); 1513 D_ASSERT(hlist_unhashed(&e->colision));
1437 1514
1438 if (likely(drbd_bio_uptodate(e->private_bio))) { 1515 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1439 drbd_set_in_sync(mdev, sector, e->size); 1516 drbd_set_in_sync(mdev, sector, e->size);
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); 1517 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else { 1518 } else {
@@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
1454 struct drbd_epoch_entry *e; 1531 struct drbd_epoch_entry *e;
1455 1532
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size); 1533 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1457 if (!e) { 1534 if (!e)
1458 put_ldev(mdev); 1535 goto fail;
1459 return FALSE;
1460 }
1461 1536
1462 dec_rs_pending(mdev); 1537 dec_rs_pending(mdev);
1463 1538
1464 e->private_bio->bi_end_io = drbd_endio_write_sec;
1465 e->private_bio->bi_rw = WRITE;
1466 e->w.cb = e_end_resync_block;
1467
1468 inc_unacked(mdev); 1539 inc_unacked(mdev);
1469 /* corresponding dec_unacked() in e_end_resync_block() 1540 /* corresponding dec_unacked() in e_end_resync_block()
1470 * respective _drbd_clear_done_ee */ 1541 * respective _drbd_clear_done_ee */
1471 1542
1543 e->w.cb = e_end_resync_block;
1544
1472 spin_lock_irq(&mdev->req_lock); 1545 spin_lock_irq(&mdev->req_lock);
1473 list_add(&e->w.list, &mdev->sync_ee); 1546 list_add(&e->w.list, &mdev->sync_ee);
1474 spin_unlock_irq(&mdev->req_lock); 1547 spin_unlock_irq(&mdev->req_lock);
1475 1548
1476 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); 1549 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1477 /* accounting done in endio */ 1550 return TRUE;
1478 1551
1479 maybe_kick_lo(mdev); 1552 drbd_free_ee(mdev, e);
1480 return TRUE; 1553fail:
1554 put_ldev(mdev);
1555 return FALSE;
1481} 1556}
1482 1557
1483static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) 1558static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
@@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1572 } 1647 }
1573 1648
1574 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { 1649 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1575 if (likely(drbd_bio_uptodate(e->private_bio))) { 1650 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1576 pcmd = (mdev->state.conn >= C_SYNC_SOURCE && 1651 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1577 mdev->state.conn <= C_PAUSED_SYNC_T && 1652 mdev->state.conn <= C_PAUSED_SYNC_T &&
1578 e->flags & EE_MAY_SET_IN_SYNC) ? 1653 e->flags & EE_MAY_SET_IN_SYNC) ?
@@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1718 return FALSE; 1793 return FALSE;
1719 } 1794 }
1720 1795
1721 e->private_bio->bi_end_io = drbd_endio_write_sec;
1722 e->w.cb = e_end_block; 1796 e->w.cb = e_end_block;
1723 1797
1724 spin_lock(&mdev->epoch_lock); 1798 spin_lock(&mdev->epoch_lock);
@@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1914 drbd_al_begin_io(mdev, e->sector); 1988 drbd_al_begin_io(mdev, e->sector);
1915 } 1989 }
1916 1990
1917 e->private_bio->bi_rw = rw; 1991 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1918 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); 1992 return TRUE;
1919 /* accounting done in endio */
1920
1921 maybe_kick_lo(mdev);
1922 return TRUE;
1923 1993
1924out_interrupted: 1994out_interrupted:
1925 /* yes, the epoch_size now is imbalanced. 1995 /* yes, the epoch_size now is imbalanced.
@@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1977 return FALSE; 2047 return FALSE;
1978 } 2048 }
1979 2049
1980 e->private_bio->bi_rw = READ;
1981 e->private_bio->bi_end_io = drbd_endio_read_sec;
1982
1983 switch (h->command) { 2050 switch (h->command) {
1984 case P_DATA_REQUEST: 2051 case P_DATA_REQUEST:
1985 e->w.cb = w_e_end_data_req; 2052 e->w.cb = w_e_end_data_req;
@@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2073 2140
2074 inc_unacked(mdev); 2141 inc_unacked(mdev);
2075 2142
2076 drbd_generic_make_request(mdev, fault_type, e->private_bio); 2143 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2077 maybe_kick_lo(mdev); 2144 return TRUE;
2078
2079 return TRUE;
2080 2145
2081out_free_e: 2146out_free_e:
2082 kfree(di); 2147 kfree(di);
@@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
3837 dev_info(DEV, "net_ee not empty, killed %u entries\n", i); 3902 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3838 i = atomic_read(&mdev->pp_in_use); 3903 i = atomic_read(&mdev->pp_in_use);
3839 if (i) 3904 if (i)
3840 dev_info(DEV, "pp_in_use = %u, expected 0\n", i); 3905 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3841 3906
3842 D_ASSERT(list_empty(&mdev->read_ee)); 3907 D_ASSERT(list_empty(&mdev->read_ee));
3843 D_ASSERT(list_empty(&mdev->active_ee)); 3908 D_ASSERT(list_empty(&mdev->active_ee));
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 0bbecf45b485..d771b1e0424b 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -47,8 +47,7 @@ static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int ca
47 47
48/* defined here: 48/* defined here:
49 drbd_md_io_complete 49 drbd_md_io_complete
50 drbd_endio_write_sec 50 drbd_endio_sec
51 drbd_endio_read_sec
52 drbd_endio_pri 51 drbd_endio_pri
53 52
54 * more endio handlers: 53 * more endio handlers:
@@ -85,27 +84,10 @@ void drbd_md_io_complete(struct bio *bio, int error)
85/* reads on behalf of the partner, 84/* reads on behalf of the partner,
86 * "submitted" by the receiver 85 * "submitted" by the receiver
87 */ 86 */
88void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) 87void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
89{ 88{
90 unsigned long flags = 0; 89 unsigned long flags = 0;
91 struct drbd_epoch_entry *e = NULL; 90 struct drbd_conf *mdev = e->mdev;
92 struct drbd_conf *mdev;
93 int uptodate = bio_flagged(bio, BIO_UPTODATE);
94
95 e = bio->bi_private;
96 mdev = e->mdev;
97
98 if (error)
99 dev_warn(DEV, "read: error=%d s=%llus\n", error,
100 (unsigned long long)e->sector);
101 if (!error && !uptodate) {
102 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103 (unsigned long long)e->sector);
104 /* strange behavior of some lower level drivers...
105 * fail the request by clearing the uptodate flag,
106 * but do not return any error?! */
107 error = -EIO;
108 }
109 91
110 D_ASSERT(e->block_id != ID_VACANT); 92 D_ASSERT(e->block_id != ID_VACANT);
111 93
@@ -114,49 +96,38 @@ void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
114 list_del(&e->w.list); 96 list_del(&e->w.list);
115 if (list_empty(&mdev->read_ee)) 97 if (list_empty(&mdev->read_ee))
116 wake_up(&mdev->ee_wait); 98 wake_up(&mdev->ee_wait);
99 if (test_bit(__EE_WAS_ERROR, &e->flags))
100 __drbd_chk_io_error(mdev, FALSE);
117 spin_unlock_irqrestore(&mdev->req_lock, flags); 101 spin_unlock_irqrestore(&mdev->req_lock, flags);
118 102
119 drbd_chk_io_error(mdev, error, FALSE);
120 drbd_queue_work(&mdev->data.work, &e->w); 103 drbd_queue_work(&mdev->data.work, &e->w);
121 put_ldev(mdev); 104 put_ldev(mdev);
122} 105}
123 106
107static int is_failed_barrier(int ee_flags)
108{
109 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 == (EE_IS_BARRIER|EE_WAS_ERROR);
111}
112
124/* writes on behalf of the partner, or resync writes, 113/* writes on behalf of the partner, or resync writes,
125 * "submitted" by the receiver. 114 * "submitted" by the receiver, final stage. */
126 */ 115static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
127void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
128{ 116{
129 unsigned long flags = 0; 117 unsigned long flags = 0;
130 struct drbd_epoch_entry *e = NULL; 118 struct drbd_conf *mdev = e->mdev;
131 struct drbd_conf *mdev;
132 sector_t e_sector; 119 sector_t e_sector;
133 int do_wake; 120 int do_wake;
134 int is_syncer_req; 121 int is_syncer_req;
135 int do_al_complete_io; 122 int do_al_complete_io;
136 int uptodate = bio_flagged(bio, BIO_UPTODATE);
137 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
138 123
139 e = bio->bi_private; 124 /* if this is a failed barrier request, disable use of barriers,
140 mdev = e->mdev; 125 * and schedule for resubmission */
141 126 if (is_failed_barrier(e->flags)) {
142 if (error)
143 dev_warn(DEV, "write: error=%d s=%llus\n", error,
144 (unsigned long long)e->sector);
145 if (!error && !uptodate) {
146 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147 (unsigned long long)e->sector);
148 /* strange behavior of some lower level drivers...
149 * fail the request by clearing the uptodate flag,
150 * but do not return any error?! */
151 error = -EIO;
152 }
153
154 /* error == -ENOTSUPP would be a better test,
155 * alas it is not reliable */
156 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157 drbd_bump_write_ordering(mdev, WO_bdev_flush); 127 drbd_bump_write_ordering(mdev, WO_bdev_flush);
158 spin_lock_irqsave(&mdev->req_lock, flags); 128 spin_lock_irqsave(&mdev->req_lock, flags);
159 list_del(&e->w.list); 129 list_del(&e->w.list);
130 e->flags |= EE_RESUBMITTED;
160 e->w.cb = w_e_reissue; 131 e->w.cb = w_e_reissue;
161 /* put_ldev actually happens below, once we come here again. */ 132 /* put_ldev actually happens below, once we come here again. */
162 __release(local); 133 __release(local);
@@ -167,17 +138,16 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
167 138
168 D_ASSERT(e->block_id != ID_VACANT); 139 D_ASSERT(e->block_id != ID_VACANT);
169 140
170 spin_lock_irqsave(&mdev->req_lock, flags);
171 mdev->writ_cnt += e->size >> 9;
172 is_syncer_req = is_syncer_block_id(e->block_id);
173
174 /* after we moved e to done_ee, 141 /* after we moved e to done_ee,
175 * we may no longer access it, 142 * we may no longer access it,
176 * it may be freed/reused already! 143 * it may be freed/reused already!
177 * (as soon as we release the req_lock) */ 144 * (as soon as we release the req_lock) */
178 e_sector = e->sector; 145 e_sector = e->sector;
179 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 146 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 is_syncer_req = is_syncer_block_id(e->block_id);
180 148
149 spin_lock_irqsave(&mdev->req_lock, flags);
150 mdev->writ_cnt += e->size >> 9;
181 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 151 list_del(&e->w.list); /* has been on active_ee or sync_ee */
182 list_add_tail(&e->w.list, &mdev->done_ee); 152 list_add_tail(&e->w.list, &mdev->done_ee);
183 153
@@ -190,7 +160,7 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
190 ? list_empty(&mdev->sync_ee) 160 ? list_empty(&mdev->sync_ee)
191 : list_empty(&mdev->active_ee); 161 : list_empty(&mdev->active_ee);
192 162
193 if (error) 163 if (test_bit(__EE_WAS_ERROR, &e->flags))
194 __drbd_chk_io_error(mdev, FALSE); 164 __drbd_chk_io_error(mdev, FALSE);
195 spin_unlock_irqrestore(&mdev->req_lock, flags); 165 spin_unlock_irqrestore(&mdev->req_lock, flags);
196 166
@@ -205,7 +175,42 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
205 175
206 wake_asender(mdev); 176 wake_asender(mdev);
207 put_ldev(mdev); 177 put_ldev(mdev);
178}
208 179
180/* writes on behalf of the partner, or resync writes,
181 * "submitted" by the receiver.
182 */
183void drbd_endio_sec(struct bio *bio, int error)
184{
185 struct drbd_epoch_entry *e = bio->bi_private;
186 struct drbd_conf *mdev = e->mdev;
187 int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 int is_write = bio_data_dir(bio) == WRITE;
189
190 if (error)
191 dev_warn(DEV, "%s: error=%d s=%llus\n",
192 is_write ? "write" : "read", error,
193 (unsigned long long)e->sector);
194 if (!error && !uptodate) {
195 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 is_write ? "write" : "read",
197 (unsigned long long)e->sector);
198 /* strange behavior of some lower level drivers...
199 * fail the request by clearing the uptodate flag,
200 * but do not return any error?! */
201 error = -EIO;
202 }
203
204 if (error)
205 set_bit(__EE_WAS_ERROR, &e->flags);
206
207 bio_put(bio); /* no need for the bio anymore */
208 if (atomic_dec_and_test(&e->pending_bios)) {
209 if (is_write)
210 drbd_endio_write_sec_final(e);
211 else
212 drbd_endio_read_sec_final(e);
213 }
209} 214}
210 215
211/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 216/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
@@ -295,7 +300,34 @@ int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
295 return 1; /* Simply ignore this! */ 300 return 1; /* Simply ignore this! */
296} 301}
297 302
298void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 303void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
304{
305 struct hash_desc desc;
306 struct scatterlist sg;
307 struct page *page = e->pages;
308 struct page *tmp;
309 unsigned len;
310
311 desc.tfm = tfm;
312 desc.flags = 0;
313
314 sg_init_table(&sg, 1);
315 crypto_hash_init(&desc);
316
317 while ((tmp = page_chain_next(page))) {
318 /* all but the last page will be fully used */
319 sg_set_page(&sg, page, PAGE_SIZE, 0);
320 crypto_hash_update(&desc, &sg, sg.length);
321 page = tmp;
322 }
323 /* and now the last, possibly only partially used page */
324 len = e->size & (PAGE_SIZE - 1);
325 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
326 crypto_hash_update(&desc, &sg, sg.length);
327 crypto_hash_final(&desc, digest);
328}
329
330void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
299{ 331{
300 struct hash_desc desc; 332 struct hash_desc desc;
301 struct scatterlist sg; 333 struct scatterlist sg;
@@ -329,11 +361,11 @@ static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel
329 return 1; 361 return 1;
330 } 362 }
331 363
332 if (likely(drbd_bio_uptodate(e->private_bio))) { 364 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
333 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 365 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334 digest = kmalloc(digest_size, GFP_NOIO); 366 digest = kmalloc(digest_size, GFP_NOIO);
335 if (digest) { 367 if (digest) {
336 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); 368 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
337 369
338 inc_rs_pending(mdev); 370 inc_rs_pending(mdev);
339 ok = drbd_send_drequest_csum(mdev, 371 ok = drbd_send_drequest_csum(mdev,
@@ -369,23 +401,21 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
369 /* GFP_TRY, because if there is no memory available right now, this may 401 /* GFP_TRY, because if there is no memory available right now, this may
370 * be rescheduled for later. It is "only" background resync, after all. */ 402 * be rescheduled for later. It is "only" background resync, after all. */
371 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 403 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372 if (!e) { 404 if (!e)
373 put_ldev(mdev); 405 goto fail;
374 return 2;
375 }
376 406
377 spin_lock_irq(&mdev->req_lock); 407 spin_lock_irq(&mdev->req_lock);
378 list_add(&e->w.list, &mdev->read_ee); 408 list_add(&e->w.list, &mdev->read_ee);
379 spin_unlock_irq(&mdev->req_lock); 409 spin_unlock_irq(&mdev->req_lock);
380 410
381 e->private_bio->bi_end_io = drbd_endio_read_sec;
382 e->private_bio->bi_rw = READ;
383 e->w.cb = w_e_send_csum; 411 e->w.cb = w_e_send_csum;
412 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
413 return 1;
384 414
385 mdev->read_cnt += size >> 9; 415 drbd_free_ee(mdev, e);
386 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio); 416fail:
387 417 put_ldev(mdev);
388 return 1; 418 return 2;
389} 419}
390 420
391void resync_timer_fn(unsigned long data) 421void resync_timer_fn(unsigned long data)
@@ -819,7 +849,7 @@ out:
819/* helper */ 849/* helper */
820static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 850static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
821{ 851{
822 if (drbd_bio_has_active_page(e->private_bio)) { 852 if (drbd_ee_has_active_page(e)) {
823 /* This might happen if sendpage() has not finished */ 853 /* This might happen if sendpage() has not finished */
824 spin_lock_irq(&mdev->req_lock); 854 spin_lock_irq(&mdev->req_lock);
825 list_add_tail(&e->w.list, &mdev->net_ee); 855 list_add_tail(&e->w.list, &mdev->net_ee);
@@ -845,7 +875,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
845 return 1; 875 return 1;
846 } 876 }
847 877
848 if (likely(drbd_bio_uptodate(e->private_bio))) { 878 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
849 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 879 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
850 } else { 880 } else {
851 if (__ratelimit(&drbd_ratelimit_state)) 881 if (__ratelimit(&drbd_ratelimit_state))
@@ -886,7 +916,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
886 put_ldev(mdev); 916 put_ldev(mdev);
887 } 917 }
888 918
889 if (likely(drbd_bio_uptodate(e->private_bio))) { 919 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
890 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 920 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
891 inc_rs_pending(mdev); 921 inc_rs_pending(mdev);
892 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 922 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
@@ -934,7 +964,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
934 964
935 di = (struct digest_info *)(unsigned long)e->block_id; 965 di = (struct digest_info *)(unsigned long)e->block_id;
936 966
937 if (likely(drbd_bio_uptodate(e->private_bio))) { 967 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
938 /* quick hack to try to avoid a race against reconfiguration. 968 /* quick hack to try to avoid a race against reconfiguration.
939 * a real fix would be much more involved, 969 * a real fix would be much more involved,
940 * introducing more locking mechanisms */ 970 * introducing more locking mechanisms */
@@ -944,7 +974,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
944 digest = kmalloc(digest_size, GFP_NOIO); 974 digest = kmalloc(digest_size, GFP_NOIO);
945 } 975 }
946 if (digest) { 976 if (digest) {
947 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); 977 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
948 eq = !memcmp(digest, di->digest, digest_size); 978 eq = !memcmp(digest, di->digest, digest_size);
949 kfree(digest); 979 kfree(digest);
950 } 980 }
@@ -986,14 +1016,14 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
986 if (unlikely(cancel)) 1016 if (unlikely(cancel))
987 goto out; 1017 goto out;
988 1018
989 if (unlikely(!drbd_bio_uptodate(e->private_bio))) 1019 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
990 goto out; 1020 goto out;
991 1021
992 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1022 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
993 /* FIXME if this allocation fails, online verify will not terminate! */ 1023 /* FIXME if this allocation fails, online verify will not terminate! */
994 digest = kmalloc(digest_size, GFP_NOIO); 1024 digest = kmalloc(digest_size, GFP_NOIO);
995 if (digest) { 1025 if (digest) {
996 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); 1026 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
997 inc_rs_pending(mdev); 1027 inc_rs_pending(mdev);
998 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1028 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
999 digest, digest_size, P_OV_REPLY); 1029 digest, digest_size, P_OV_REPLY);
@@ -1042,11 +1072,11 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1042 1072
1043 di = (struct digest_info *)(unsigned long)e->block_id; 1073 di = (struct digest_info *)(unsigned long)e->block_id;
1044 1074
1045 if (likely(drbd_bio_uptodate(e->private_bio))) { 1075 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1046 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1076 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1047 digest = kmalloc(digest_size, GFP_NOIO); 1077 digest = kmalloc(digest_size, GFP_NOIO);
1048 if (digest) { 1078 if (digest) {
1049 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); 1079 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1050 1080
1051 D_ASSERT(digest_size == di->digest_size); 1081 D_ASSERT(digest_size == di->digest_size);
1052 eq = !memcmp(digest, di->digest, digest_size); 1082 eq = !memcmp(digest, di->digest, digest_size);
diff --git a/drivers/block/drbd/drbd_wrappers.h b/drivers/block/drbd/drbd_wrappers.h
index f93fa111ce50..defdb5013ea3 100644
--- a/drivers/block/drbd/drbd_wrappers.h
+++ b/drivers/block/drbd/drbd_wrappers.h
@@ -18,23 +18,9 @@ static inline void drbd_set_my_capacity(struct drbd_conf *mdev,
18 18
19#define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE) 19#define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE)
20 20
21static inline int drbd_bio_has_active_page(struct bio *bio)
22{
23 struct bio_vec *bvec;
24 int i;
25
26 __bio_for_each_segment(bvec, bio, i, 0) {
27 if (page_count(bvec->bv_page) > 1)
28 return 1;
29 }
30
31 return 0;
32}
33
34/* bi_end_io handlers */ 21/* bi_end_io handlers */
35extern void drbd_md_io_complete(struct bio *bio, int error); 22extern void drbd_md_io_complete(struct bio *bio, int error);
36extern void drbd_endio_read_sec(struct bio *bio, int error); 23extern void drbd_endio_sec(struct bio *bio, int error);
37extern void drbd_endio_write_sec(struct bio *bio, int error);
38extern void drbd_endio_pri(struct bio *bio, int error); 24extern void drbd_endio_pri(struct bio *bio, int error);
39 25
40/* 26/*