aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_receiver.c
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2010-05-14 11:10:48 -0400
committerPhilipp Reisner <philipp.reisner@linbit.com>2010-05-17 20:01:23 -0400
commit45bb912bd5ea4d2b3a270a93cbdf767a0e2df6f5 (patch)
treed95d27ea8e945fcda3427c50a5bc062c804c6eff /drivers/block/drbd/drbd_receiver.c
parent708d740ed8242b84eefc63df144313a7308c7de5 (diff)
drbd: Allow drbd_epoch_entries to use multiple bios.
This should allow for better performance if the lower level IO stack of the peers differs in limits exposed either via the queue, or via some merge_bvec_fn. Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r--drivers/block/drbd/drbd_receiver.c483
1 files changed, 274 insertions, 209 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index fee0d249adf7..388a3e8bb0d0 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo
80 80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82 82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) 83/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
84{ 159{
85 struct page *page = NULL; 160 struct page *page = NULL;
161 struct page *tmp = NULL;
162 int i = 0;
86 163
87 /* Yes, testing drbd_pp_vacant outside the lock is racy. 164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */ 165 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) { 166 if (drbd_pp_vacant >= number) {
90 spin_lock(&drbd_pp_lock); 167 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool; 168 page = page_chain_del(&drbd_pp_pool, number);
92 if (page) { 169 if (page)
93 drbd_pp_pool = (struct page *)page_private(page); 170 drbd_pp_vacant -= number;
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock); 171 spin_unlock(&drbd_pp_lock);
172 if (page)
173 return page;
98 } 174 }
175
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD, 177 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */ 178 * which in turn might block on the other node at this very place. */
102 if (!page) 179 for (i = 0; i < number; i++) {
103 page = alloc_page(GFP_TRY); 180 tmp = alloc_page(GFP_TRY);
104 if (page) 181 if (!tmp)
105 atomic_inc(&mdev->pp_in_use); 182 break;
106 return page; 183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
107} 201}
108 202
109/* kick lower level device, if we have more than (arbitrary number) 203/* kick lower level device, if we have more than (arbitrary number)
@@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed
127 221
128 list_for_each_safe(le, tle, &mdev->net_ee) { 222 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list); 223 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio)) 224 if (drbd_ee_has_active_page(e))
131 break; 225 break;
132 list_move(le, to_be_freed); 226 list_move(le, to_be_freed);
133 } 227 }
@@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
148} 242}
149 243
150/** 244/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in 245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
152 * @mdev: DRBD device. 246 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled) 247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
249 *
250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
154 * 253 *
155 * Tries to allocate a page, first from our own page pool, then from the 254 * Returns a page chain linked via page->private.
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */ 255 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) 256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
160{ 257{
161 struct page *page = NULL; 258 struct page *page = NULL;
162 DEFINE_WAIT(wait); 259 DEFINE_WAIT(wait);
163 260
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 261 /* Yes, we may run up to @number over max_buffers. If we
165 page = drbd_pp_first_page_or_try_alloc(mdev); 262 * follow it strictly, the admin will get it wrong anyways. */
166 if (page) 263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
167 return page; 264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
168 }
169 265
170 for (;;) { 266 while (page == NULL) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172 268
173 drbd_kick_lo_and_reclaim_net(mdev); 269 drbd_kick_lo_and_reclaim_net(mdev);
174 270
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev); 272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
177 if (page) 273 if (page)
178 break; 274 break;
179 } 275 }
@@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
190 } 286 }
191 finish_wait(&drbd_pp_wait, &wait); 287 finish_wait(&drbd_pp_wait, &wait);
192 288
289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
193 return page; 291 return page;
194} 292}
195 293
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. 294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ 295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) 298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{ 299{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i; 300 int i;
227 301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
228 spin_lock(&drbd_pp_lock); 302 i = page_chain_free(page);
229 __bio_for_each_segment(bvec, bio, i, 0) { 303 else {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { 304 struct page *tmp;
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); 305 tmp = page_chain_tail(page, &i);
232 p_to_be_freed = bvec->bv_page; 306 spin_lock(&drbd_pp_lock);
233 } else { 307 page_chain_add(&drbd_pp_pool, page, tmp);
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); 308 drbd_pp_vacant += i;
235 drbd_pp_pool = bvec->bv_page; 309 spin_unlock(&drbd_pp_lock);
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 } 310 }
248 311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
249 wake_up(&drbd_pp_wait); 315 wake_up(&drbd_pp_wait);
250} 316}
251 317
@@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
270 unsigned int data_size, 336 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local) 337 gfp_t gfp_mask) __must_hold(local)
272{ 338{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e; 339 struct drbd_epoch_entry *e;
275 struct page *page; 340 struct page *page;
276 struct bio *bio; 341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
277 unsigned int ds;
278 342
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) 343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL; 344 return NULL;
@@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
286 return NULL; 350 return NULL;
287 } 351 }
288 352
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); 353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
290 if (!bio) { 354 if (!page)
291 if (!(gfp_mask & __GFP_NOWARN)) 355 goto fail;
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344 356
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision); 357 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL; 358 e->epoch = NULL;
359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
349 e->flags = 0; 363 e->flags = 0;
364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
350 367
351 return e; 368 return e;
352 369
353 fail2: 370 fail:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool); 371 mempool_free(e, drbd_ee_mempool);
358
359 return NULL; 372 return NULL;
360} 373}
361 374
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{ 376{
364 struct bio *bio = e->private_bio; 377 drbd_pp_free(mdev, e->pages);
365 drbd_pp_free_bio_pages(mdev, bio); 378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision)); 379 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool); 380 mempool_free(e, drbd_ee_mempool);
369} 381}
@@ -1121,6 +1133,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1121} 1133}
1122 1134
1123/** 1135/**
1136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153 /* In most cases, we will only need one bio. But in case the lower
1154 * level restrictions happen to be different at this offset on this
1155 * side than those of the sending peer, we may need to submit the
1156 * request in more than one bio. */
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
1163 /* > e->sector, unless this is the first bio */
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
1166 /* we special case some flags in the multi-bio case, see below
1167 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1168 bio->bi_rw = rw;
1169 bio->bi_private = e;
1170 bio->bi_end_io = drbd_endio_sec;
1171
1172 bio->bi_next = bios;
1173 bios = bio;
1174 ++n_bios;
1175
1176 page_chain_for_each(page) {
1177 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1178 if (!bio_add_page(bio, page, len, 0)) {
1179 /* a single page must always be possible! */
1180 BUG_ON(bio->bi_vcnt == 0);
1181 goto next_bio;
1182 }
1183 ds -= len;
1184 sector += len >> 9;
1185 --nr_pages;
1186 }
1187 D_ASSERT(page == NULL);
1188 D_ASSERT(ds == 0);
1189
1190 atomic_set(&e->pending_bios, n_bios);
1191 do {
1192 bio = bios;
1193 bios = bios->bi_next;
1194 bio->bi_next = NULL;
1195
1196 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1197 if (bios)
1198 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1199
1200 drbd_generic_make_request(mdev, fault_type, bio);
1201
1202 /* strip off BIO_RW_BARRIER,
1203 * unless it is the first or last bio */
1204 if (bios && bios->bi_next)
1205 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1206 } while (bios);
1207 maybe_kick_lo(mdev);
1208 return 0;
1209
1210fail:
1211 while (bios) {
1212 bio = bios;
1213 bios = bios->bi_next;
1214 bio_put(bio);
1215 }
1216 return -ENOMEM;
1217}
1218
1219/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set 1220 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device. 1221 * @mdev: DRBD device.
1126 * @w: work object. 1222 * @w: work object.
@@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) 1225int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{ 1226{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1227 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, 1228 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) 1229 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch(). 1230 so that we can finish that epoch in drbd_may_finish_epoch().
@@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
1144 if (previous_epoch(mdev, e->epoch)) 1238 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); 1239 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146 1240
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference, 1241 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */ 1242 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166 1243
1167 e->w.cb = e_end_block; 1244 e->w.cb = e_end_block;
1168 1245 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1169 /* This is no longer a barrier request. */ 1246 /* drbd_submit_ee fails for one reason only:
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); 1247 * if was not able to allocate sufficient bios.
1171 1248 * requeue, try again later. */
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); 1249 e->w.cb = w_e_reissue;
1173 1250 drbd_queue_work(&mdev->data.work, &e->w);
1251 }
1174 return 1; 1252 return 1;
1175} 1253}
1176 1254
@@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1264{ 1342{
1265 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1343 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1266 struct drbd_epoch_entry *e; 1344 struct drbd_epoch_entry *e;
1267 struct bio_vec *bvec;
1268 struct page *page; 1345 struct page *page;
1269 struct bio *bio; 1346 int dgs, ds, rr;
1270 int dgs, ds, i, rr;
1271 void *dig_in = mdev->int_dig_in; 1347 void *dig_in = mdev->int_dig_in;
1272 void *dig_vv = mdev->int_dig_vv; 1348 void *dig_vv = mdev->int_dig_vv;
1273 unsigned long *data; 1349 unsigned long *data;
@@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1304 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); 1380 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1305 if (!e) 1381 if (!e)
1306 return NULL; 1382 return NULL;
1307 bio = e->private_bio; 1383
1308 ds = data_size; 1384 ds = data_size;
1309 bio_for_each_segment(bvec, bio, i) { 1385 page = e->pages;
1310 page = bvec->bv_page; 1386 page_chain_for_each(page) {
1387 unsigned len = min_t(int, ds, PAGE_SIZE);
1311 data = kmap(page); 1388 data = kmap(page);
1312 rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE)); 1389 rr = drbd_recv(mdev, data, len);
1313 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { 1390 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1314 dev_err(DEV, "Fault injection: Corrupting data on receive\n"); 1391 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1315 data[0] = data[0] ^ (unsigned long)-1; 1392 data[0] = data[0] ^ (unsigned long)-1;
1316 } 1393 }
1317 kunmap(page); 1394 kunmap(page);
1318 if (rr != min_t(int, ds, PAGE_SIZE)) { 1395 if (rr != len) {
1319 drbd_free_ee(mdev, e); 1396 drbd_free_ee(mdev, e);
1320 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1397 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1321 rr, min_t(int, ds, PAGE_SIZE)); 1398 rr, len);
1322 return NULL; 1399 return NULL;
1323 } 1400 }
1324 ds -= rr; 1401 ds -= rr;
1325 } 1402 }
1326 1403
1327 if (dgs) { 1404 if (dgs) {
1328 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1405 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1329 if (memcmp(dig_in, dig_vv, dgs)) { 1406 if (memcmp(dig_in, dig_vv, dgs)) {
1330 dev_err(DEV, "Digest integrity check FAILED.\n"); 1407 dev_err(DEV, "Digest integrity check FAILED.\n");
1331 drbd_bcast_ee(mdev, "digest failed", 1408 drbd_bcast_ee(mdev, "digest failed",
@@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1350 if (!data_size) 1427 if (!data_size)
1351 return TRUE; 1428 return TRUE;
1352 1429
1353 page = drbd_pp_alloc(mdev, 1); 1430 page = drbd_pp_alloc(mdev, 1, 1);
1354 1431
1355 data = kmap(page); 1432 data = kmap(page);
1356 while (data_size) { 1433 while (data_size) {
@@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1414 } 1491 }
1415 1492
1416 if (dgs) { 1493 if (dgs) {
1417 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1494 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1418 if (memcmp(dig_in, dig_vv, dgs)) { 1495 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); 1496 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0; 1497 return 0;
@@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u
1435 1512
1436 D_ASSERT(hlist_unhashed(&e->colision)); 1513 D_ASSERT(hlist_unhashed(&e->colision));
1437 1514
1438 if (likely(drbd_bio_uptodate(e->private_bio))) { 1515 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1439 drbd_set_in_sync(mdev, sector, e->size); 1516 drbd_set_in_sync(mdev, sector, e->size);
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); 1517 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else { 1518 } else {
@@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
1454 struct drbd_epoch_entry *e; 1531 struct drbd_epoch_entry *e;
1455 1532
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size); 1533 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1457 if (!e) { 1534 if (!e)
1458 put_ldev(mdev); 1535 goto fail;
1459 return FALSE;
1460 }
1461 1536
1462 dec_rs_pending(mdev); 1537 dec_rs_pending(mdev);
1463 1538
1464 e->private_bio->bi_end_io = drbd_endio_write_sec;
1465 e->private_bio->bi_rw = WRITE;
1466 e->w.cb = e_end_resync_block;
1467
1468 inc_unacked(mdev); 1539 inc_unacked(mdev);
1469 /* corresponding dec_unacked() in e_end_resync_block() 1540 /* corresponding dec_unacked() in e_end_resync_block()
1470 * respective _drbd_clear_done_ee */ 1541 * respective _drbd_clear_done_ee */
1471 1542
1543 e->w.cb = e_end_resync_block;
1544
1472 spin_lock_irq(&mdev->req_lock); 1545 spin_lock_irq(&mdev->req_lock);
1473 list_add(&e->w.list, &mdev->sync_ee); 1546 list_add(&e->w.list, &mdev->sync_ee);
1474 spin_unlock_irq(&mdev->req_lock); 1547 spin_unlock_irq(&mdev->req_lock);
1475 1548
1476 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); 1549 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1477 /* accounting done in endio */ 1550 return TRUE;
1478 1551
1479 maybe_kick_lo(mdev); 1552 drbd_free_ee(mdev, e);
1480 return TRUE; 1553fail:
1554 put_ldev(mdev);
1555 return FALSE;
1481} 1556}
1482 1557
1483static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) 1558static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
@@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1572 } 1647 }
1573 1648
1574 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { 1649 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1575 if (likely(drbd_bio_uptodate(e->private_bio))) { 1650 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1576 pcmd = (mdev->state.conn >= C_SYNC_SOURCE && 1651 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1577 mdev->state.conn <= C_PAUSED_SYNC_T && 1652 mdev->state.conn <= C_PAUSED_SYNC_T &&
1578 e->flags & EE_MAY_SET_IN_SYNC) ? 1653 e->flags & EE_MAY_SET_IN_SYNC) ?
@@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1718 return FALSE; 1793 return FALSE;
1719 } 1794 }
1720 1795
1721 e->private_bio->bi_end_io = drbd_endio_write_sec;
1722 e->w.cb = e_end_block; 1796 e->w.cb = e_end_block;
1723 1797
1724 spin_lock(&mdev->epoch_lock); 1798 spin_lock(&mdev->epoch_lock);
@@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1914 drbd_al_begin_io(mdev, e->sector); 1988 drbd_al_begin_io(mdev, e->sector);
1915 } 1989 }
1916 1990
1917 e->private_bio->bi_rw = rw; 1991 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1918 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); 1992 return TRUE;
1919 /* accounting done in endio */
1920
1921 maybe_kick_lo(mdev);
1922 return TRUE;
1923 1993
1924out_interrupted: 1994out_interrupted:
1925 /* yes, the epoch_size now is imbalanced. 1995 /* yes, the epoch_size now is imbalanced.
@@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1977 return FALSE; 2047 return FALSE;
1978 } 2048 }
1979 2049
1980 e->private_bio->bi_rw = READ;
1981 e->private_bio->bi_end_io = drbd_endio_read_sec;
1982
1983 switch (h->command) { 2050 switch (h->command) {
1984 case P_DATA_REQUEST: 2051 case P_DATA_REQUEST:
1985 e->w.cb = w_e_end_data_req; 2052 e->w.cb = w_e_end_data_req;
@@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2073 2140
2074 inc_unacked(mdev); 2141 inc_unacked(mdev);
2075 2142
2076 drbd_generic_make_request(mdev, fault_type, e->private_bio); 2143 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2077 maybe_kick_lo(mdev); 2144 return TRUE;
2078
2079 return TRUE;
2080 2145
2081out_free_e: 2146out_free_e:
2082 kfree(di); 2147 kfree(di);
@@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
3837 dev_info(DEV, "net_ee not empty, killed %u entries\n", i); 3902 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3838 i = atomic_read(&mdev->pp_in_use); 3903 i = atomic_read(&mdev->pp_in_use);
3839 if (i) 3904 if (i)
3840 dev_info(DEV, "pp_in_use = %u, expected 0\n", i); 3905 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3841 3906
3842 D_ASSERT(list_empty(&mdev->read_ee)); 3907 D_ASSERT(list_empty(&mdev->read_ee));
3843 D_ASSERT(list_empty(&mdev->active_ee)); 3908 D_ASSERT(list_empty(&mdev->active_ee));