aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/drbd/drbd_int.h90
-rw-r--r--drivers/block/drbd/drbd_main.c19
-rw-r--r--drivers/block/drbd/drbd_nl.c18
-rw-r--r--drivers/block/drbd/drbd_receiver.c483
-rw-r--r--drivers/block/drbd/drbd_worker.c178
-rw-r--r--drivers/block/drbd/drbd_wrappers.h16
6 files changed, 480 insertions, 324 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 1bc86ddac38b..4b97f30bb7c6 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -740,18 +740,6 @@ enum epoch_event {
740 EV_CLEANUP = 32, /* used as flag */ 740 EV_CLEANUP = 32, /* used as flag */
741}; 741};
742 742
743struct drbd_epoch_entry {
744 struct drbd_work w;
745 struct drbd_conf *mdev;
746 struct bio *private_bio;
747 struct hlist_node colision;
748 sector_t sector;
749 unsigned int size;
750 unsigned int flags;
751 struct drbd_epoch *epoch;
752 u64 block_id;
753};
754
755struct drbd_wq_barrier { 743struct drbd_wq_barrier {
756 struct drbd_work w; 744 struct drbd_work w;
757 struct completion done; 745 struct completion done;
@@ -762,17 +750,49 @@ struct digest_info {
762 void *digest; 750 void *digest;
763}; 751};
764 752
765/* ee flag bits */ 753struct drbd_epoch_entry {
754 struct drbd_work w;
755 struct hlist_node colision;
756 struct drbd_epoch *epoch;
757 struct drbd_conf *mdev;
758 struct page *pages;
759 atomic_t pending_bios;
760 unsigned int size;
761 /* see comments on ee flag bits below */
762 unsigned long flags;
763 sector_t sector;
764 u64 block_id;
765};
766
767/* ee flag bits.
768 * While corresponding bios are in flight, the only modification will be
769 * set_bit WAS_ERROR, which has to be atomic.
770 * If no bios are in flight yet, or all have been completed,
771 * non-atomic modification to ee->flags is ok.
772 */
766enum { 773enum {
767 __EE_CALL_AL_COMPLETE_IO, 774 __EE_CALL_AL_COMPLETE_IO,
768 __EE_CONFLICT_PENDING,
769 __EE_MAY_SET_IN_SYNC, 775 __EE_MAY_SET_IN_SYNC,
776
777 /* This epoch entry closes an epoch using a barrier.
778 * On sucessful completion, the epoch is released,
779 * and the P_BARRIER_ACK send. */
770 __EE_IS_BARRIER, 780 __EE_IS_BARRIER,
781
782 /* In case a barrier failed,
783 * we need to resubmit without the barrier flag. */
784 __EE_RESUBMITTED,
785
786 /* we may have several bios per epoch entry.
787 * if any of those fail, we set this flag atomically
788 * from the endio callback */
789 __EE_WAS_ERROR,
771}; 790};
772#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO) 791#define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
773#define EE_CONFLICT_PENDING (1<<__EE_CONFLICT_PENDING)
774#define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC) 792#define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC)
775#define EE_IS_BARRIER (1<<__EE_IS_BARRIER) 793#define EE_IS_BARRIER (1<<__EE_IS_BARRIER)
794#define EE_RESUBMITTED (1<<__EE_RESUBMITTED)
795#define EE_WAS_ERROR (1<<__EE_WAS_ERROR)
776 796
777/* global flag bits */ 797/* global flag bits */
778enum { 798enum {
@@ -1441,7 +1461,8 @@ static inline void ov_oos_print(struct drbd_conf *mdev)
1441} 1461}
1442 1462
1443 1463
1444extern void drbd_csum(struct drbd_conf *, struct crypto_hash *, struct bio *, void *); 1464extern void drbd_csum_bio(struct drbd_conf *, struct crypto_hash *, struct bio *, void *);
1465extern void drbd_csum_ee(struct drbd_conf *, struct crypto_hash *, struct drbd_epoch_entry *, void *);
1445/* worker callbacks */ 1466/* worker callbacks */
1446extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int); 1467extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int);
1447extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int); 1468extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int);
@@ -1465,6 +1486,8 @@ extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int);
1465extern void resync_timer_fn(unsigned long data); 1486extern void resync_timer_fn(unsigned long data);
1466 1487
1467/* drbd_receiver.c */ 1488/* drbd_receiver.c */
1489extern int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1490 const unsigned rw, const int fault_type);
1468extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list); 1491extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list);
1469extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, 1492extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
1470 u64 id, 1493 u64 id,
@@ -1620,6 +1643,41 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
1620 * inline helper functions 1643 * inline helper functions
1621 *************************/ 1644 *************************/
1622 1645
1646/* see also page_chain_add and friends in drbd_receiver.c */
1647static inline struct page *page_chain_next(struct page *page)
1648{
1649 return (struct page *)page_private(page);
1650}
1651#define page_chain_for_each(page) \
1652 for (; page && ({ prefetch(page_chain_next(page)); 1; }); \
1653 page = page_chain_next(page))
1654#define page_chain_for_each_safe(page, n) \
1655 for (; page && ({ n = page_chain_next(page); 1; }); page = n)
1656
1657static inline int drbd_bio_has_active_page(struct bio *bio)
1658{
1659 struct bio_vec *bvec;
1660 int i;
1661
1662 __bio_for_each_segment(bvec, bio, i, 0) {
1663 if (page_count(bvec->bv_page) > 1)
1664 return 1;
1665 }
1666
1667 return 0;
1668}
1669
1670static inline int drbd_ee_has_active_page(struct drbd_epoch_entry *e)
1671{
1672 struct page *page = e->pages;
1673 page_chain_for_each(page) {
1674 if (page_count(page) > 1)
1675 return 1;
1676 }
1677 return 0;
1678}
1679
1680
1623static inline void drbd_state_lock(struct drbd_conf *mdev) 1681static inline void drbd_state_lock(struct drbd_conf *mdev)
1624{ 1682{
1625 wait_event(mdev->misc_wait, 1683 wait_event(mdev->misc_wait,
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 3aa0add1c230..d0fabace1452 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -2354,6 +2354,19 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2354 return 1; 2354 return 1;
2355} 2355}
2356 2356
2357static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2358{
2359 struct page *page = e->pages;
2360 unsigned len = e->size;
2361 page_chain_for_each(page) {
2362 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2363 if (!_drbd_send_page(mdev, page, 0, l))
2364 return 0;
2365 len -= l;
2366 }
2367 return 1;
2368}
2369
2357static void consider_delay_probes(struct drbd_conf *mdev) 2370static void consider_delay_probes(struct drbd_conf *mdev)
2358{ 2371{
2359 if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93) 2372 if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
@@ -2430,7 +2443,7 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2430 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE)); 2443 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2431 if (ok && dgs) { 2444 if (ok && dgs) {
2432 dgb = mdev->int_dig_out; 2445 dgb = mdev->int_dig_out;
2433 drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb); 2446 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2434 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); 2447 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2435 } 2448 }
2436 if (ok) { 2449 if (ok) {
@@ -2483,11 +2496,11 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2483 sizeof(p), MSG_MORE); 2496 sizeof(p), MSG_MORE);
2484 if (ok && dgs) { 2497 if (ok && dgs) {
2485 dgb = mdev->int_dig_out; 2498 dgb = mdev->int_dig_out;
2486 drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb); 2499 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2487 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); 2500 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2488 } 2501 }
2489 if (ok) 2502 if (ok)
2490 ok = _drbd_send_zc_bio(mdev, e->private_bio); 2503 ok = _drbd_send_zc_ee(mdev, e);
2491 2504
2492 drbd_put_data_sock(mdev); 2505 drbd_put_data_sock(mdev);
2493 2506
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 93d150661f4b..28ef76bd5230 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -2215,9 +2215,9 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
2215{ 2215{
2216 struct cn_msg *cn_reply; 2216 struct cn_msg *cn_reply;
2217 struct drbd_nl_cfg_reply *reply; 2217 struct drbd_nl_cfg_reply *reply;
2218 struct bio_vec *bvec;
2219 unsigned short *tl; 2218 unsigned short *tl;
2220 int i; 2219 struct page *page;
2220 unsigned len;
2221 2221
2222 if (!e) 2222 if (!e)
2223 return; 2223 return;
@@ -2255,11 +2255,15 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
2255 put_unaligned(T_ee_data, tl++); 2255 put_unaligned(T_ee_data, tl++);
2256 put_unaligned(e->size, tl++); 2256 put_unaligned(e->size, tl++);
2257 2257
2258 __bio_for_each_segment(bvec, e->private_bio, i, 0) { 2258 len = e->size;
2259 void *d = kmap(bvec->bv_page); 2259 page = e->pages;
2260 memcpy(tl, d + bvec->bv_offset, bvec->bv_len); 2260 page_chain_for_each(page) {
2261 kunmap(bvec->bv_page); 2261 void *d = kmap_atomic(page, KM_USER0);
2262 tl=(unsigned short*)((char*)tl + bvec->bv_len); 2262 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2263 memcpy(tl, d, l);
2264 kunmap_atomic(d, KM_USER0);
2265 tl = (unsigned short*)((char*)tl + l);
2266 len -= l;
2263 } 2267 }
2264 put_unaligned(TT_END, tl++); /* Close the tag list */ 2268 put_unaligned(TT_END, tl++); /* Close the tag list */
2265 2269
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index fee0d249adf7..388a3e8bb0d0 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo
80 80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82 82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) 83/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
84{ 159{
85 struct page *page = NULL; 160 struct page *page = NULL;
161 struct page *tmp = NULL;
162 int i = 0;
86 163
87 /* Yes, testing drbd_pp_vacant outside the lock is racy. 164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */ 165 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) { 166 if (drbd_pp_vacant >= number) {
90 spin_lock(&drbd_pp_lock); 167 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool; 168 page = page_chain_del(&drbd_pp_pool, number);
92 if (page) { 169 if (page)
93 drbd_pp_pool = (struct page *)page_private(page); 170 drbd_pp_vacant -= number;
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock); 171 spin_unlock(&drbd_pp_lock);
172 if (page)
173 return page;
98 } 174 }
175
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD, 177 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */ 178 * which in turn might block on the other node at this very place. */
102 if (!page) 179 for (i = 0; i < number; i++) {
103 page = alloc_page(GFP_TRY); 180 tmp = alloc_page(GFP_TRY);
104 if (page) 181 if (!tmp)
105 atomic_inc(&mdev->pp_in_use); 182 break;
106 return page; 183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
107} 201}
108 202
109/* kick lower level device, if we have more than (arbitrary number) 203/* kick lower level device, if we have more than (arbitrary number)
@@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed
127 221
128 list_for_each_safe(le, tle, &mdev->net_ee) { 222 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list); 223 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio)) 224 if (drbd_ee_has_active_page(e))
131 break; 225 break;
132 list_move(le, to_be_freed); 226 list_move(le, to_be_freed);
133 } 227 }
@@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
148} 242}
149 243
150/** 244/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in 245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
152 * @mdev: DRBD device. 246 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled) 247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
249 *
250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
154 * 253 *
155 * Tries to allocate a page, first from our own page pool, then from the 254 * Returns a page chain linked via page->private.
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */ 255 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) 256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
160{ 257{
161 struct page *page = NULL; 258 struct page *page = NULL;
162 DEFINE_WAIT(wait); 259 DEFINE_WAIT(wait);
163 260
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 261 /* Yes, we may run up to @number over max_buffers. If we
165 page = drbd_pp_first_page_or_try_alloc(mdev); 262 * follow it strictly, the admin will get it wrong anyways. */
166 if (page) 263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
167 return page; 264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
168 }
169 265
170 for (;;) { 266 while (page == NULL) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172 268
173 drbd_kick_lo_and_reclaim_net(mdev); 269 drbd_kick_lo_and_reclaim_net(mdev);
174 270
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev); 272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
177 if (page) 273 if (page)
178 break; 274 break;
179 } 275 }
@@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
190 } 286 }
191 finish_wait(&drbd_pp_wait, &wait); 287 finish_wait(&drbd_pp_wait, &wait);
192 288
289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
193 return page; 291 return page;
194} 292}
195 293
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. 294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ 295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) 298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{ 299{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i; 300 int i;
227 301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
228 spin_lock(&drbd_pp_lock); 302 i = page_chain_free(page);
229 __bio_for_each_segment(bvec, bio, i, 0) { 303 else {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { 304 struct page *tmp;
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); 305 tmp = page_chain_tail(page, &i);
232 p_to_be_freed = bvec->bv_page; 306 spin_lock(&drbd_pp_lock);
233 } else { 307 page_chain_add(&drbd_pp_pool, page, tmp);
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); 308 drbd_pp_vacant += i;
235 drbd_pp_pool = bvec->bv_page; 309 spin_unlock(&drbd_pp_lock);
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 } 310 }
248 311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
249 wake_up(&drbd_pp_wait); 315 wake_up(&drbd_pp_wait);
250} 316}
251 317
@@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
270 unsigned int data_size, 336 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local) 337 gfp_t gfp_mask) __must_hold(local)
272{ 338{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e; 339 struct drbd_epoch_entry *e;
275 struct page *page; 340 struct page *page;
276 struct bio *bio; 341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
277 unsigned int ds;
278 342
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) 343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL; 344 return NULL;
@@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
286 return NULL; 350 return NULL;
287 } 351 }
288 352
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); 353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
290 if (!bio) { 354 if (!page)
291 if (!(gfp_mask & __GFP_NOWARN)) 355 goto fail;
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344 356
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision); 357 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL; 358 e->epoch = NULL;
359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
349 e->flags = 0; 363 e->flags = 0;
364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
350 367
351 return e; 368 return e;
352 369
353 fail2: 370 fail:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool); 371 mempool_free(e, drbd_ee_mempool);
358
359 return NULL; 372 return NULL;
360} 373}
361 374
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{ 376{
364 struct bio *bio = e->private_bio; 377 drbd_pp_free(mdev, e->pages);
365 drbd_pp_free_bio_pages(mdev, bio); 378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision)); 379 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool); 380 mempool_free(e, drbd_ee_mempool);
369} 381}
@@ -1121,6 +1133,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1121} 1133}
1122 1134
1123/** 1135/**
1136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153 /* In most cases, we will only need one bio. But in case the lower
1154 * level restrictions happen to be different at this offset on this
1155 * side than those of the sending peer, we may need to submit the
1156 * request in more than one bio. */
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
1163 /* > e->sector, unless this is the first bio */
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
1166 /* we special case some flags in the multi-bio case, see below
1167 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1168 bio->bi_rw = rw;
1169 bio->bi_private = e;
1170 bio->bi_end_io = drbd_endio_sec;
1171
1172 bio->bi_next = bios;
1173 bios = bio;
1174 ++n_bios;
1175
1176 page_chain_for_each(page) {
1177 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1178 if (!bio_add_page(bio, page, len, 0)) {
1179 /* a single page must always be possible! */
1180 BUG_ON(bio->bi_vcnt == 0);
1181 goto next_bio;
1182 }
1183 ds -= len;
1184 sector += len >> 9;
1185 --nr_pages;
1186 }
1187 D_ASSERT(page == NULL);
1188 D_ASSERT(ds == 0);
1189
1190 atomic_set(&e->pending_bios, n_bios);
1191 do {
1192 bio = bios;
1193 bios = bios->bi_next;
1194 bio->bi_next = NULL;
1195
1196 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1197 if (bios)
1198 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1199
1200 drbd_generic_make_request(mdev, fault_type, bio);
1201
1202 /* strip off BIO_RW_BARRIER,
1203 * unless it is the first or last bio */
1204 if (bios && bios->bi_next)
1205 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1206 } while (bios);
1207 maybe_kick_lo(mdev);
1208 return 0;
1209
1210fail:
1211 while (bios) {
1212 bio = bios;
1213 bios = bios->bi_next;
1214 bio_put(bio);
1215 }
1216 return -ENOMEM;
1217}
1218
1219/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set 1220 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device. 1221 * @mdev: DRBD device.
1126 * @w: work object. 1222 * @w: work object.
@@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) 1225int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{ 1226{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1227 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, 1228 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) 1229 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch(). 1230 so that we can finish that epoch in drbd_may_finish_epoch().
@@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
1144 if (previous_epoch(mdev, e->epoch)) 1238 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); 1239 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146 1240
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference, 1241 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */ 1242 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166 1243
1167 e->w.cb = e_end_block; 1244 e->w.cb = e_end_block;
1168 1245 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1169 /* This is no longer a barrier request. */ 1246 /* drbd_submit_ee fails for one reason only:
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); 1247 * if was not able to allocate sufficient bios.
1171 1248 * requeue, try again later. */
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); 1249 e->w.cb = w_e_reissue;
1173 1250 drbd_queue_work(&mdev->data.work, &e->w);
1251 }
1174 return 1; 1252 return 1;
1175} 1253}
1176 1254
@@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1264{ 1342{
1265 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1343 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1266 struct drbd_epoch_entry *e; 1344 struct drbd_epoch_entry *e;
1267 struct bio_vec *bvec;
1268 struct page *page; 1345 struct page *page;
1269 struct bio *bio; 1346 int dgs, ds, rr;
1270 int dgs, ds, i, rr;
1271 void *dig_in = mdev->int_dig_in; 1347 void *dig_in = mdev->int_dig_in;
1272 void *dig_vv = mdev->int_dig_vv; 1348 void *dig_vv = mdev->int_dig_vv;
1273 unsigned long *data; 1349 unsigned long *data;
@@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1304 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); 1380 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1305 if (!e) 1381 if (!e)
1306 return NULL; 1382 return NULL;
1307 bio = e->private_bio; 1383
1308 ds = data_size; 1384 ds = data_size;
1309 bio_for_each_segment(bvec, bio, i) { 1385 page = e->pages;
1310 page = bvec->bv_page; 1386 page_chain_for_each(page) {
1387 unsigned len = min_t(int, ds, PAGE_SIZE);
1311 data = kmap(page); 1388 data = kmap(page);
1312 rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE)); 1389 rr = drbd_recv(mdev, data, len);
1313 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { 1390 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1314 dev_err(DEV, "Fault injection: Corrupting data on receive\n"); 1391 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1315 data[0] = data[0] ^ (unsigned long)-1; 1392 data[0] = data[0] ^ (unsigned long)-1;
1316 } 1393 }
1317 kunmap(page); 1394 kunmap(page);
1318 if (rr != min_t(int, ds, PAGE_SIZE)) { 1395 if (rr != len) {
1319 drbd_free_ee(mdev, e); 1396 drbd_free_ee(mdev, e);
1320 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1397 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1321 rr, min_t(int, ds, PAGE_SIZE)); 1398 rr, len);
1322 return NULL; 1399 return NULL;
1323 } 1400 }
1324 ds -= rr; 1401 ds -= rr;
1325 } 1402 }
1326 1403
1327 if (dgs) { 1404 if (dgs) {
1328 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1405 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1329 if (memcmp(dig_in, dig_vv, dgs)) { 1406 if (memcmp(dig_in, dig_vv, dgs)) {
1330 dev_err(DEV, "Digest integrity check FAILED.\n"); 1407 dev_err(DEV, "Digest integrity check FAILED.\n");
1331 drbd_bcast_ee(mdev, "digest failed", 1408 drbd_bcast_ee(mdev, "digest failed",
@@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1350 if (!data_size) 1427 if (!data_size)
1351 return TRUE; 1428 return TRUE;
1352 1429
1353 page = drbd_pp_alloc(mdev, 1); 1430 page = drbd_pp_alloc(mdev, 1, 1);
1354 1431
1355 data = kmap(page); 1432 data = kmap(page);
1356 while (data_size) { 1433 while (data_size) {
@@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1414 } 1491 }
1415 1492
1416 if (dgs) { 1493 if (dgs) {
1417 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1494 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1418 if (memcmp(dig_in, dig_vv, dgs)) { 1495 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); 1496 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0; 1497 return 0;
@@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u
1435 1512
1436 D_ASSERT(hlist_unhashed(&e->colision)); 1513 D_ASSERT(hlist_unhashed(&e->colision));
1437 1514
1438 if (likely(drbd_bio_uptodate(e->private_bio))) { 1515 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1439 drbd_set_in_sync(mdev, sector, e->size); 1516 drbd_set_in_sync(mdev, sector, e->size);
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); 1517 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else { 1518 } else {
@@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
1454 struct drbd_epoch_entry *e; 1531 struct drbd_epoch_entry *e;
1455 1532
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size); 1533 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1457 if (!e) { 1534 if (!e)
1458 put_ldev(mdev); 1535 goto fail;
1459 return FALSE;
1460 }
1461 1536
1462 dec_rs_pending(mdev); 1537 dec_rs_pending(mdev);
1463 1538
1464 e->private_bio->bi_end_io = drbd_endio_write_sec;
1465 e->private_bio->bi_rw = WRITE;
1466 e->w.cb = e_end_resync_block;
1467
1468 inc_unacked(mdev); 1539 inc_unacked(mdev);
1469 /* corresponding dec_unacked() in e_end_resync_block() 1540 /* corresponding dec_unacked() in e_end_resync_block()
1470 * respective _drbd_clear_done_ee */ 1541 * respective _drbd_clear_done_ee */
1471 1542
1543 e->w.cb = e_end_resync_block;
1544
1472 spin_lock_irq(&mdev->req_lock); 1545 spin_lock_irq(&mdev->req_lock);
1473 list_add(&e->w.list, &mdev->sync_ee); 1546 list_add(&e->w.list, &mdev->sync_ee);
1474 spin_unlock_irq(&mdev->req_lock); 1547 spin_unlock_irq(&mdev->req_lock);
1475 1548
1476 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); 1549 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1477 /* accounting done in endio */ 1550 return TRUE;
1478 1551
1479 maybe_kick_lo(mdev); 1552 drbd_free_ee(mdev, e);
1480 return TRUE; 1553fail:
1554 put_ldev(mdev);
1555 return FALSE;
1481} 1556}
1482 1557
1483static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) 1558static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
@@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1572 } 1647 }
1573 1648
1574 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { 1649 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1575 if (likely(drbd_bio_uptodate(e->private_bio))) { 1650 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1576 pcmd = (mdev->state.conn >= C_SYNC_SOURCE && 1651 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1577 mdev->state.conn <= C_PAUSED_SYNC_T && 1652 mdev->state.conn <= C_PAUSED_SYNC_T &&
1578 e->flags & EE_MAY_SET_IN_SYNC) ? 1653 e->flags & EE_MAY_SET_IN_SYNC) ?
@@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1718 return FALSE; 1793 return FALSE;
1719 } 1794 }
1720 1795
1721 e->private_bio->bi_end_io = drbd_endio_write_sec;
1722 e->w.cb = e_end_block; 1796 e->w.cb = e_end_block;
1723 1797
1724 spin_lock(&mdev->epoch_lock); 1798 spin_lock(&mdev->epoch_lock);
@@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1914 drbd_al_begin_io(mdev, e->sector); 1988 drbd_al_begin_io(mdev, e->sector);
1915 } 1989 }
1916 1990
1917 e->private_bio->bi_rw = rw; 1991 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1918 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); 1992 return TRUE;
1919 /* accounting done in endio */
1920
1921 maybe_kick_lo(mdev);
1922 return TRUE;
1923 1993
1924out_interrupted: 1994out_interrupted:
1925 /* yes, the epoch_size now is imbalanced. 1995 /* yes, the epoch_size now is imbalanced.
@@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1977 return FALSE; 2047 return FALSE;
1978 } 2048 }
1979 2049
1980 e->private_bio->bi_rw = READ;
1981 e->private_bio->bi_end_io = drbd_endio_read_sec;
1982
1983 switch (h->command) { 2050 switch (h->command) {
1984 case P_DATA_REQUEST: 2051 case P_DATA_REQUEST:
1985 e->w.cb = w_e_end_data_req; 2052 e->w.cb = w_e_end_data_req;
@@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2073 2140
2074 inc_unacked(mdev); 2141 inc_unacked(mdev);
2075 2142
2076 drbd_generic_make_request(mdev, fault_type, e->private_bio); 2143 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2077 maybe_kick_lo(mdev); 2144 return TRUE;
2078
2079 return TRUE;
2080 2145
2081out_free_e: 2146out_free_e:
2082 kfree(di); 2147 kfree(di);
@@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
3837 dev_info(DEV, "net_ee not empty, killed %u entries\n", i); 3902 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3838 i = atomic_read(&mdev->pp_in_use); 3903 i = atomic_read(&mdev->pp_in_use);
3839 if (i) 3904 if (i)
3840 dev_info(DEV, "pp_in_use = %u, expected 0\n", i); 3905 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3841 3906
3842 D_ASSERT(list_empty(&mdev->read_ee)); 3907 D_ASSERT(list_empty(&mdev->read_ee));
3843 D_ASSERT(list_empty(&mdev->active_ee)); 3908 D_ASSERT(list_empty(&mdev->active_ee));
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 0bbecf45b485..d771b1e0424b 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -47,8 +47,7 @@ static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int ca
47 47
48/* defined here: 48/* defined here:
49 drbd_md_io_complete 49 drbd_md_io_complete
50 drbd_endio_write_sec 50 drbd_endio_sec
51 drbd_endio_read_sec
52 drbd_endio_pri 51 drbd_endio_pri
53 52
54 * more endio handlers: 53 * more endio handlers:
@@ -85,27 +84,10 @@ void drbd_md_io_complete(struct bio *bio, int error)
85/* reads on behalf of the partner, 84/* reads on behalf of the partner,
86 * "submitted" by the receiver 85 * "submitted" by the receiver
87 */ 86 */
88void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) 87void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
89{ 88{
90 unsigned long flags = 0; 89 unsigned long flags = 0;
91 struct drbd_epoch_entry *e = NULL; 90 struct drbd_conf *mdev = e->mdev;
92 struct drbd_conf *mdev;
93 int uptodate = bio_flagged(bio, BIO_UPTODATE);
94
95 e = bio->bi_private;
96 mdev = e->mdev;
97
98 if (error)
99 dev_warn(DEV, "read: error=%d s=%llus\n", error,
100 (unsigned long long)e->sector);
101 if (!error && !uptodate) {
102 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103 (unsigned long long)e->sector);
104 /* strange behavior of some lower level drivers...
105 * fail the request by clearing the uptodate flag,
106 * but do not return any error?! */
107 error = -EIO;
108 }
109 91
110 D_ASSERT(e->block_id != ID_VACANT); 92 D_ASSERT(e->block_id != ID_VACANT);
111 93
@@ -114,49 +96,38 @@ void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
114 list_del(&e->w.list); 96 list_del(&e->w.list);
115 if (list_empty(&mdev->read_ee)) 97 if (list_empty(&mdev->read_ee))
116 wake_up(&mdev->ee_wait); 98 wake_up(&mdev->ee_wait);
99 if (test_bit(__EE_WAS_ERROR, &e->flags))
100 __drbd_chk_io_error(mdev, FALSE);
117 spin_unlock_irqrestore(&mdev->req_lock, flags); 101 spin_unlock_irqrestore(&mdev->req_lock, flags);
118 102
119 drbd_chk_io_error(mdev, error, FALSE);
120 drbd_queue_work(&mdev->data.work, &e->w); 103 drbd_queue_work(&mdev->data.work, &e->w);
121 put_ldev(mdev); 104 put_ldev(mdev);
122} 105}
123 106
107static int is_failed_barrier(int ee_flags)
108{
109 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 == (EE_IS_BARRIER|EE_WAS_ERROR);
111}
112
124/* writes on behalf of the partner, or resync writes, 113/* writes on behalf of the partner, or resync writes,
125 * "submitted" by the receiver. 114 * "submitted" by the receiver, final stage. */
126 */ 115static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
127void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
128{ 116{
129 unsigned long flags = 0; 117 unsigned long flags = 0;
130 struct drbd_epoch_entry *e = NULL; 118 struct drbd_conf *mdev = e->mdev;
131 struct drbd_conf *mdev;
132 sector_t e_sector; 119 sector_t e_sector;
133 int do_wake; 120 int do_wake;
134 int is_syncer_req; 121 int is_syncer_req;
135 int do_al_complete_io; 122 int do_al_complete_io;
136 int uptodate = bio_flagged(bio, BIO_UPTODATE);
137 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
138 123
139 e = bio->bi_private; 124 /* if this is a failed barrier request, disable use of barriers,
140 mdev = e->mdev; 125 * and schedule for resubmission */
141 126 if (is_failed_barrier(e->flags)) {
142 if (error)
143 dev_warn(DEV, "write: error=%d s=%llus\n", error,
144 (unsigned long long)e->sector);
145 if (!error && !uptodate) {
146 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147 (unsigned long long)e->sector);
148 /* strange behavior of some lower level drivers...
149 * fail the request by clearing the uptodate flag,
150 * but do not return any error?! */
151 error = -EIO;
152 }
153
154 /* error == -ENOTSUPP would be a better test,
155 * alas it is not reliable */
156 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157 drbd_bump_write_ordering(mdev, WO_bdev_flush); 127 drbd_bump_write_ordering(mdev, WO_bdev_flush);
158 spin_lock_irqsave(&mdev->req_lock, flags); 128 spin_lock_irqsave(&mdev->req_lock, flags);
159 list_del(&e->w.list); 129 list_del(&e->w.list);
130 e->flags |= EE_RESUBMITTED;
160 e->w.cb = w_e_reissue; 131 e->w.cb = w_e_reissue;
161 /* put_ldev actually happens below, once we come here again. */ 132 /* put_ldev actually happens below, once we come here again. */
162 __release(local); 133 __release(local);
@@ -167,17 +138,16 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
167 138
168 D_ASSERT(e->block_id != ID_VACANT); 139 D_ASSERT(e->block_id != ID_VACANT);
169 140
170 spin_lock_irqsave(&mdev->req_lock, flags);
171 mdev->writ_cnt += e->size >> 9;
172 is_syncer_req = is_syncer_block_id(e->block_id);
173
174 /* after we moved e to done_ee, 141 /* after we moved e to done_ee,
175 * we may no longer access it, 142 * we may no longer access it,
176 * it may be freed/reused already! 143 * it may be freed/reused already!
177 * (as soon as we release the req_lock) */ 144 * (as soon as we release the req_lock) */
178 e_sector = e->sector; 145 e_sector = e->sector;
179 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 146 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 is_syncer_req = is_syncer_block_id(e->block_id);
180 148
149 spin_lock_irqsave(&mdev->req_lock, flags);
150 mdev->writ_cnt += e->size >> 9;
181 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 151 list_del(&e->w.list); /* has been on active_ee or sync_ee */
182 list_add_tail(&e->w.list, &mdev->done_ee); 152 list_add_tail(&e->w.list, &mdev->done_ee);
183 153
@@ -190,7 +160,7 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
190 ? list_empty(&mdev->sync_ee) 160 ? list_empty(&mdev->sync_ee)
191 : list_empty(&mdev->active_ee); 161 : list_empty(&mdev->active_ee);
192 162
193 if (error) 163 if (test_bit(__EE_WAS_ERROR, &e->flags))
194 __drbd_chk_io_error(mdev, FALSE); 164 __drbd_chk_io_error(mdev, FALSE);
195 spin_unlock_irqrestore(&mdev->req_lock, flags); 165 spin_unlock_irqrestore(&mdev->req_lock, flags);
196 166
@@ -205,7 +175,42 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
205 175
206 wake_asender(mdev); 176 wake_asender(mdev);
207 put_ldev(mdev); 177 put_ldev(mdev);
178}
208 179
180/* writes on behalf of the partner, or resync writes,
181 * "submitted" by the receiver.
182 */
183void drbd_endio_sec(struct bio *bio, int error)
184{
185 struct drbd_epoch_entry *e = bio->bi_private;
186 struct drbd_conf *mdev = e->mdev;
187 int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 int is_write = bio_data_dir(bio) == WRITE;
189
190 if (error)
191 dev_warn(DEV, "%s: error=%d s=%llus\n",
192 is_write ? "write" : "read", error,
193 (unsigned long long)e->sector);
194 if (!error && !uptodate) {
195 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 is_write ? "write" : "read",
197 (unsigned long long)e->sector);
198 /* strange behavior of some lower level drivers...
199 * fail the request by clearing the uptodate flag,
200 * but do not return any error?! */
201 error = -EIO;
202 }
203
204 if (error)
205 set_bit(__EE_WAS_ERROR, &e->flags);
206
207 bio_put(bio); /* no need for the bio anymore */
208 if (atomic_dec_and_test(&e->pending_bios)) {
209 if (is_write)
210 drbd_endio_write_sec_final(e);
211 else
212 drbd_endio_read_sec_final(e);
213 }
209} 214}
210 215
211/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 216/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
@@ -295,7 +300,34 @@ int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
295 return 1; /* Simply ignore this! */ 300 return 1; /* Simply ignore this! */
296} 301}
297 302
298void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 303void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
304{
305 struct hash_desc desc;
306 struct scatterlist sg;
307 struct page *page = e->pages;
308 struct page *tmp;
309 unsigned len;
310
311 desc.tfm = tfm;
312 desc.flags = 0;
313
314 sg_init_table(&sg, 1);
315 crypto_hash_init(&desc);
316
317 while ((tmp = page_chain_next(page))) {
318 /* all but the last page will be fully used */
319 sg_set_page(&sg, page, PAGE_SIZE, 0);
320 crypto_hash_update(&desc, &sg, sg.length);
321 page = tmp;
322 }
323 /* and now the last, possibly only partially used page */
324 len = e->size & (PAGE_SIZE - 1);
325 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
326 crypto_hash_update(&desc, &sg, sg.length);
327 crypto_hash_final(&desc, digest);
328}
329
330void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
299{ 331{
300 struct hash_desc desc; 332 struct hash_desc desc;
301 struct scatterlist sg; 333 struct scatterlist sg;
@@ -329,11 +361,11 @@ static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel
329 return 1; 361 return 1;
330 } 362 }
331 363
332 if (likely(drbd_bio_uptodate(e->private_bio))) { 364 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
333 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 365 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334 digest = kmalloc(digest_size, GFP_NOIO); 366 digest = kmalloc(digest_size, GFP_NOIO);
335 if (digest) { 367 if (digest) {
336 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); 368 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
337 369
338 inc_rs_pending(mdev); 370 inc_rs_pending(mdev);
339 ok = drbd_send_drequest_csum(mdev, 371 ok = drbd_send_drequest_csum(mdev,
@@ -369,23 +401,21 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
369 /* GFP_TRY, because if there is no memory available right now, this may 401 /* GFP_TRY, because if there is no memory available right now, this may
370 * be rescheduled for later. It is "only" background resync, after all. */ 402 * be rescheduled for later. It is "only" background resync, after all. */
371 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 403 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372 if (!e) { 404 if (!e)
373 put_ldev(mdev); 405 goto fail;
374 return 2;
375 }
376 406
377 spin_lock_irq(&mdev->req_lock); 407 spin_lock_irq(&mdev->req_lock);
378 list_add(&e->w.list, &mdev->read_ee); 408 list_add(&e->w.list, &mdev->read_ee);
379 spin_unlock_irq(&mdev->req_lock); 409 spin_unlock_irq(&mdev->req_lock);
380 410
381 e->private_bio->bi_end_io = drbd_endio_read_sec;
382 e->private_bio->bi_rw = READ;
383 e->w.cb = w_e_send_csum; 411 e->w.cb = w_e_send_csum;
412 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
413 return 1;
384 414
385 mdev->read_cnt += size >> 9; 415 drbd_free_ee(mdev, e);
386 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio); 416fail:
387 417 put_ldev(mdev);
388 return 1; 418 return 2;
389} 419}
390 420
391void resync_timer_fn(unsigned long data) 421void resync_timer_fn(unsigned long data)
@@ -819,7 +849,7 @@ out:
819/* helper */ 849/* helper */
820static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 850static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
821{ 851{
822 if (drbd_bio_has_active_page(e->private_bio)) { 852 if (drbd_ee_has_active_page(e)) {
823 /* This might happen if sendpage() has not finished */ 853 /* This might happen if sendpage() has not finished */
824 spin_lock_irq(&mdev->req_lock); 854 spin_lock_irq(&mdev->req_lock);
825 list_add_tail(&e->w.list, &mdev->net_ee); 855 list_add_tail(&e->w.list, &mdev->net_ee);
@@ -845,7 +875,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
845 return 1; 875 return 1;
846 } 876 }
847 877
848 if (likely(drbd_bio_uptodate(e->private_bio))) { 878 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
849 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 879 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
850 } else { 880 } else {
851 if (__ratelimit(&drbd_ratelimit_state)) 881 if (__ratelimit(&drbd_ratelimit_state))
@@ -886,7 +916,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
886 put_ldev(mdev); 916 put_ldev(mdev);
887 } 917 }
888 918
889 if (likely(drbd_bio_uptodate(e->private_bio))) { 919 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
890 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 920 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
891 inc_rs_pending(mdev); 921 inc_rs_pending(mdev);
892 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 922 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
@@ -934,7 +964,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
934 964
935 di = (struct digest_info *)(unsigned long)e->block_id; 965 di = (struct digest_info *)(unsigned long)e->block_id;
936 966
937 if (likely(drbd_bio_uptodate(e->private_bio))) { 967 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
938 /* quick hack to try to avoid a race against reconfiguration. 968 /* quick hack to try to avoid a race against reconfiguration.
939 * a real fix would be much more involved, 969 * a real fix would be much more involved,
940 * introducing more locking mechanisms */ 970 * introducing more locking mechanisms */
@@ -944,7 +974,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
944 digest = kmalloc(digest_size, GFP_NOIO); 974 digest = kmalloc(digest_size, GFP_NOIO);
945 } 975 }
946 if (digest) { 976 if (digest) {
947 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); 977 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
948 eq = !memcmp(digest, di->digest, digest_size); 978 eq = !memcmp(digest, di->digest, digest_size);
949 kfree(digest); 979 kfree(digest);
950 } 980 }
@@ -986,14 +1016,14 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
986 if (unlikely(cancel)) 1016 if (unlikely(cancel))
987 goto out; 1017 goto out;
988 1018
989 if (unlikely(!drbd_bio_uptodate(e->private_bio))) 1019 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
990 goto out; 1020 goto out;
991 1021
992 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1022 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
993 /* FIXME if this allocation fails, online verify will not terminate! */ 1023 /* FIXME if this allocation fails, online verify will not terminate! */
994 digest = kmalloc(digest_size, GFP_NOIO); 1024 digest = kmalloc(digest_size, GFP_NOIO);
995 if (digest) { 1025 if (digest) {
996 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); 1026 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
997 inc_rs_pending(mdev); 1027 inc_rs_pending(mdev);
998 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1028 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
999 digest, digest_size, P_OV_REPLY); 1029 digest, digest_size, P_OV_REPLY);
@@ -1042,11 +1072,11 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1042 1072
1043 di = (struct digest_info *)(unsigned long)e->block_id; 1073 di = (struct digest_info *)(unsigned long)e->block_id;
1044 1074
1045 if (likely(drbd_bio_uptodate(e->private_bio))) { 1075 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1046 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1076 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1047 digest = kmalloc(digest_size, GFP_NOIO); 1077 digest = kmalloc(digest_size, GFP_NOIO);
1048 if (digest) { 1078 if (digest) {
1049 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); 1079 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1050 1080
1051 D_ASSERT(digest_size == di->digest_size); 1081 D_ASSERT(digest_size == di->digest_size);
1052 eq = !memcmp(digest, di->digest, digest_size); 1082 eq = !memcmp(digest, di->digest, digest_size);
diff --git a/drivers/block/drbd/drbd_wrappers.h b/drivers/block/drbd/drbd_wrappers.h
index f93fa111ce50..defdb5013ea3 100644
--- a/drivers/block/drbd/drbd_wrappers.h
+++ b/drivers/block/drbd/drbd_wrappers.h
@@ -18,23 +18,9 @@ static inline void drbd_set_my_capacity(struct drbd_conf *mdev,
18 18
19#define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE) 19#define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE)
20 20
21static inline int drbd_bio_has_active_page(struct bio *bio)
22{
23 struct bio_vec *bvec;
24 int i;
25
26 __bio_for_each_segment(bvec, bio, i, 0) {
27 if (page_count(bvec->bv_page) > 1)
28 return 1;
29 }
30
31 return 0;
32}
33
34/* bi_end_io handlers */ 21/* bi_end_io handlers */
35extern void drbd_md_io_complete(struct bio *bio, int error); 22extern void drbd_md_io_complete(struct bio *bio, int error);
36extern void drbd_endio_read_sec(struct bio *bio, int error); 23extern void drbd_endio_sec(struct bio *bio, int error);
37extern void drbd_endio_write_sec(struct bio *bio, int error);
38extern void drbd_endio_pri(struct bio *bio, int error); 24extern void drbd_endio_pri(struct bio *bio, int error);
39 25
40/* 26/*