aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r--drivers/block/drbd/drbd_receiver.c483
1 files changed, 274 insertions, 209 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index fee0d249adf7..388a3e8bb0d0 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo
80 80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82 82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) 83/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
84{ 159{
85 struct page *page = NULL; 160 struct page *page = NULL;
161 struct page *tmp = NULL;
162 int i = 0;
86 163
87 /* Yes, testing drbd_pp_vacant outside the lock is racy. 164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */ 165 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) { 166 if (drbd_pp_vacant >= number) {
90 spin_lock(&drbd_pp_lock); 167 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool; 168 page = page_chain_del(&drbd_pp_pool, number);
92 if (page) { 169 if (page)
93 drbd_pp_pool = (struct page *)page_private(page); 170 drbd_pp_vacant -= number;
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock); 171 spin_unlock(&drbd_pp_lock);
172 if (page)
173 return page;
98 } 174 }
175
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD, 177 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */ 178 * which in turn might block on the other node at this very place. */
102 if (!page) 179 for (i = 0; i < number; i++) {
103 page = alloc_page(GFP_TRY); 180 tmp = alloc_page(GFP_TRY);
104 if (page) 181 if (!tmp)
105 atomic_inc(&mdev->pp_in_use); 182 break;
106 return page; 183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
107} 201}
108 202
109/* kick lower level device, if we have more than (arbitrary number) 203/* kick lower level device, if we have more than (arbitrary number)
@@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed
127 221
128 list_for_each_safe(le, tle, &mdev->net_ee) { 222 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list); 223 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio)) 224 if (drbd_ee_has_active_page(e))
131 break; 225 break;
132 list_move(le, to_be_freed); 226 list_move(le, to_be_freed);
133 } 227 }
@@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
148} 242}
149 243
150/** 244/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in 245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
152 * @mdev: DRBD device. 246 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled) 247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
249 *
250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
154 * 253 *
155 * Tries to allocate a page, first from our own page pool, then from the 254 * Returns a page chain linked via page->private.
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */ 255 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) 256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
160{ 257{
161 struct page *page = NULL; 258 struct page *page = NULL;
162 DEFINE_WAIT(wait); 259 DEFINE_WAIT(wait);
163 260
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 261 /* Yes, we may run up to @number over max_buffers. If we
165 page = drbd_pp_first_page_or_try_alloc(mdev); 262 * follow it strictly, the admin will get it wrong anyways. */
166 if (page) 263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
167 return page; 264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
168 }
169 265
170 for (;;) { 266 while (page == NULL) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172 268
173 drbd_kick_lo_and_reclaim_net(mdev); 269 drbd_kick_lo_and_reclaim_net(mdev);
174 270
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev); 272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
177 if (page) 273 if (page)
178 break; 274 break;
179 } 275 }
@@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
190 } 286 }
191 finish_wait(&drbd_pp_wait, &wait); 287 finish_wait(&drbd_pp_wait, &wait);
192 288
289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
193 return page; 291 return page;
194} 292}
195 293
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. 294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ 295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) 298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{ 299{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i; 300 int i;
227 301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
228 spin_lock(&drbd_pp_lock); 302 i = page_chain_free(page);
229 __bio_for_each_segment(bvec, bio, i, 0) { 303 else {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { 304 struct page *tmp;
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); 305 tmp = page_chain_tail(page, &i);
232 p_to_be_freed = bvec->bv_page; 306 spin_lock(&drbd_pp_lock);
233 } else { 307 page_chain_add(&drbd_pp_pool, page, tmp);
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); 308 drbd_pp_vacant += i;
235 drbd_pp_pool = bvec->bv_page; 309 spin_unlock(&drbd_pp_lock);
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 } 310 }
248 311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
249 wake_up(&drbd_pp_wait); 315 wake_up(&drbd_pp_wait);
250} 316}
251 317
@@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
270 unsigned int data_size, 336 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local) 337 gfp_t gfp_mask) __must_hold(local)
272{ 338{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e; 339 struct drbd_epoch_entry *e;
275 struct page *page; 340 struct page *page;
276 struct bio *bio; 341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
277 unsigned int ds;
278 342
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) 343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL; 344 return NULL;
@@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
286 return NULL; 350 return NULL;
287 } 351 }
288 352
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); 353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
290 if (!bio) { 354 if (!page)
291 if (!(gfp_mask & __GFP_NOWARN)) 355 goto fail;
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344 356
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision); 357 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL; 358 e->epoch = NULL;
359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
349 e->flags = 0; 363 e->flags = 0;
364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
350 367
351 return e; 368 return e;
352 369
353 fail2: 370 fail:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool); 371 mempool_free(e, drbd_ee_mempool);
358
359 return NULL; 372 return NULL;
360} 373}
361 374
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{ 376{
364 struct bio *bio = e->private_bio; 377 drbd_pp_free(mdev, e->pages);
365 drbd_pp_free_bio_pages(mdev, bio); 378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision)); 379 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool); 380 mempool_free(e, drbd_ee_mempool);
369} 381}
@@ -1121,6 +1133,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1121} 1133}
1122 1134
1123/** 1135/**
1136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153 /* In most cases, we will only need one bio. But in case the lower
1154 * level restrictions happen to be different at this offset on this
1155 * side than those of the sending peer, we may need to submit the
1156 * request in more than one bio. */
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
1163 /* > e->sector, unless this is the first bio */
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
1166 /* we special case some flags in the multi-bio case, see below
1167 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1168 bio->bi_rw = rw;
1169 bio->bi_private = e;
1170 bio->bi_end_io = drbd_endio_sec;
1171
1172 bio->bi_next = bios;
1173 bios = bio;
1174 ++n_bios;
1175
1176 page_chain_for_each(page) {
1177 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1178 if (!bio_add_page(bio, page, len, 0)) {
1179 /* a single page must always be possible! */
1180 BUG_ON(bio->bi_vcnt == 0);
1181 goto next_bio;
1182 }
1183 ds -= len;
1184 sector += len >> 9;
1185 --nr_pages;
1186 }
1187 D_ASSERT(page == NULL);
1188 D_ASSERT(ds == 0);
1189
1190 atomic_set(&e->pending_bios, n_bios);
1191 do {
1192 bio = bios;
1193 bios = bios->bi_next;
1194 bio->bi_next = NULL;
1195
1196 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1197 if (bios)
1198 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1199
1200 drbd_generic_make_request(mdev, fault_type, bio);
1201
1202 /* strip off BIO_RW_BARRIER,
1203 * unless it is the first or last bio */
1204 if (bios && bios->bi_next)
1205 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1206 } while (bios);
1207 maybe_kick_lo(mdev);
1208 return 0;
1209
1210fail:
1211 while (bios) {
1212 bio = bios;
1213 bios = bios->bi_next;
1214 bio_put(bio);
1215 }
1216 return -ENOMEM;
1217}
1218
1219/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set 1220 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device. 1221 * @mdev: DRBD device.
1126 * @w: work object. 1222 * @w: work object.
@@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) 1225int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{ 1226{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1227 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, 1228 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) 1229 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch(). 1230 so that we can finish that epoch in drbd_may_finish_epoch().
@@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
1144 if (previous_epoch(mdev, e->epoch)) 1238 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); 1239 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146 1240
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference, 1241 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */ 1242 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166 1243
1167 e->w.cb = e_end_block; 1244 e->w.cb = e_end_block;
1168 1245 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1169 /* This is no longer a barrier request. */ 1246 /* drbd_submit_ee fails for one reason only:
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); 1247 * if was not able to allocate sufficient bios.
1171 1248 * requeue, try again later. */
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); 1249 e->w.cb = w_e_reissue;
1173 1250 drbd_queue_work(&mdev->data.work, &e->w);
1251 }
1174 return 1; 1252 return 1;
1175} 1253}
1176 1254
@@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1264{ 1342{
1265 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1343 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1266 struct drbd_epoch_entry *e; 1344 struct drbd_epoch_entry *e;
1267 struct bio_vec *bvec;
1268 struct page *page; 1345 struct page *page;
1269 struct bio *bio; 1346 int dgs, ds, rr;
1270 int dgs, ds, i, rr;
1271 void *dig_in = mdev->int_dig_in; 1347 void *dig_in = mdev->int_dig_in;
1272 void *dig_vv = mdev->int_dig_vv; 1348 void *dig_vv = mdev->int_dig_vv;
1273 unsigned long *data; 1349 unsigned long *data;
@@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
1304 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); 1380 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1305 if (!e) 1381 if (!e)
1306 return NULL; 1382 return NULL;
1307 bio = e->private_bio; 1383
1308 ds = data_size; 1384 ds = data_size;
1309 bio_for_each_segment(bvec, bio, i) { 1385 page = e->pages;
1310 page = bvec->bv_page; 1386 page_chain_for_each(page) {
1387 unsigned len = min_t(int, ds, PAGE_SIZE);
1311 data = kmap(page); 1388 data = kmap(page);
1312 rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE)); 1389 rr = drbd_recv(mdev, data, len);
1313 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { 1390 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1314 dev_err(DEV, "Fault injection: Corrupting data on receive\n"); 1391 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1315 data[0] = data[0] ^ (unsigned long)-1; 1392 data[0] = data[0] ^ (unsigned long)-1;
1316 } 1393 }
1317 kunmap(page); 1394 kunmap(page);
1318 if (rr != min_t(int, ds, PAGE_SIZE)) { 1395 if (rr != len) {
1319 drbd_free_ee(mdev, e); 1396 drbd_free_ee(mdev, e);
1320 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1397 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1321 rr, min_t(int, ds, PAGE_SIZE)); 1398 rr, len);
1322 return NULL; 1399 return NULL;
1323 } 1400 }
1324 ds -= rr; 1401 ds -= rr;
1325 } 1402 }
1326 1403
1327 if (dgs) { 1404 if (dgs) {
1328 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1405 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1329 if (memcmp(dig_in, dig_vv, dgs)) { 1406 if (memcmp(dig_in, dig_vv, dgs)) {
1330 dev_err(DEV, "Digest integrity check FAILED.\n"); 1407 dev_err(DEV, "Digest integrity check FAILED.\n");
1331 drbd_bcast_ee(mdev, "digest failed", 1408 drbd_bcast_ee(mdev, "digest failed",
@@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1350 if (!data_size) 1427 if (!data_size)
1351 return TRUE; 1428 return TRUE;
1352 1429
1353 page = drbd_pp_alloc(mdev, 1); 1430 page = drbd_pp_alloc(mdev, 1, 1);
1354 1431
1355 data = kmap(page); 1432 data = kmap(page);
1356 while (data_size) { 1433 while (data_size) {
@@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1414 } 1491 }
1415 1492
1416 if (dgs) { 1493 if (dgs) {
1417 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1494 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1418 if (memcmp(dig_in, dig_vv, dgs)) { 1495 if (memcmp(dig_in, dig_vv, dgs)) {
1419 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); 1496 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1420 return 0; 1497 return 0;
@@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u
1435 1512
1436 D_ASSERT(hlist_unhashed(&e->colision)); 1513 D_ASSERT(hlist_unhashed(&e->colision));
1437 1514
1438 if (likely(drbd_bio_uptodate(e->private_bio))) { 1515 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1439 drbd_set_in_sync(mdev, sector, e->size); 1516 drbd_set_in_sync(mdev, sector, e->size);
1440 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); 1517 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1441 } else { 1518 } else {
@@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
1454 struct drbd_epoch_entry *e; 1531 struct drbd_epoch_entry *e;
1455 1532
1456 e = read_in_block(mdev, ID_SYNCER, sector, data_size); 1533 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1457 if (!e) { 1534 if (!e)
1458 put_ldev(mdev); 1535 goto fail;
1459 return FALSE;
1460 }
1461 1536
1462 dec_rs_pending(mdev); 1537 dec_rs_pending(mdev);
1463 1538
1464 e->private_bio->bi_end_io = drbd_endio_write_sec;
1465 e->private_bio->bi_rw = WRITE;
1466 e->w.cb = e_end_resync_block;
1467
1468 inc_unacked(mdev); 1539 inc_unacked(mdev);
1469 /* corresponding dec_unacked() in e_end_resync_block() 1540 /* corresponding dec_unacked() in e_end_resync_block()
1470 * respective _drbd_clear_done_ee */ 1541 * respective _drbd_clear_done_ee */
1471 1542
1543 e->w.cb = e_end_resync_block;
1544
1472 spin_lock_irq(&mdev->req_lock); 1545 spin_lock_irq(&mdev->req_lock);
1473 list_add(&e->w.list, &mdev->sync_ee); 1546 list_add(&e->w.list, &mdev->sync_ee);
1474 spin_unlock_irq(&mdev->req_lock); 1547 spin_unlock_irq(&mdev->req_lock);
1475 1548
1476 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); 1549 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1477 /* accounting done in endio */ 1550 return TRUE;
1478 1551
1479 maybe_kick_lo(mdev); 1552 drbd_free_ee(mdev, e);
1480 return TRUE; 1553fail:
1554 put_ldev(mdev);
1555 return FALSE;
1481} 1556}
1482 1557
1483static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) 1558static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
@@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1572 } 1647 }
1573 1648
1574 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { 1649 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1575 if (likely(drbd_bio_uptodate(e->private_bio))) { 1650 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1576 pcmd = (mdev->state.conn >= C_SYNC_SOURCE && 1651 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1577 mdev->state.conn <= C_PAUSED_SYNC_T && 1652 mdev->state.conn <= C_PAUSED_SYNC_T &&
1578 e->flags & EE_MAY_SET_IN_SYNC) ? 1653 e->flags & EE_MAY_SET_IN_SYNC) ?
@@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1718 return FALSE; 1793 return FALSE;
1719 } 1794 }
1720 1795
1721 e->private_bio->bi_end_io = drbd_endio_write_sec;
1722 e->w.cb = e_end_block; 1796 e->w.cb = e_end_block;
1723 1797
1724 spin_lock(&mdev->epoch_lock); 1798 spin_lock(&mdev->epoch_lock);
@@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1914 drbd_al_begin_io(mdev, e->sector); 1988 drbd_al_begin_io(mdev, e->sector);
1915 } 1989 }
1916 1990
1917 e->private_bio->bi_rw = rw; 1991 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1918 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); 1992 return TRUE;
1919 /* accounting done in endio */
1920
1921 maybe_kick_lo(mdev);
1922 return TRUE;
1923 1993
1924out_interrupted: 1994out_interrupted:
1925 /* yes, the epoch_size now is imbalanced. 1995 /* yes, the epoch_size now is imbalanced.
@@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1977 return FALSE; 2047 return FALSE;
1978 } 2048 }
1979 2049
1980 e->private_bio->bi_rw = READ;
1981 e->private_bio->bi_end_io = drbd_endio_read_sec;
1982
1983 switch (h->command) { 2050 switch (h->command) {
1984 case P_DATA_REQUEST: 2051 case P_DATA_REQUEST:
1985 e->w.cb = w_e_end_data_req; 2052 e->w.cb = w_e_end_data_req;
@@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2073 2140
2074 inc_unacked(mdev); 2141 inc_unacked(mdev);
2075 2142
2076 drbd_generic_make_request(mdev, fault_type, e->private_bio); 2143 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2077 maybe_kick_lo(mdev); 2144 return TRUE;
2078
2079 return TRUE;
2080 2145
2081out_free_e: 2146out_free_e:
2082 kfree(di); 2147 kfree(di);
@@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
3837 dev_info(DEV, "net_ee not empty, killed %u entries\n", i); 3902 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3838 i = atomic_read(&mdev->pp_in_use); 3903 i = atomic_read(&mdev->pp_in_use);
3839 if (i) 3904 if (i)
3840 dev_info(DEV, "pp_in_use = %u, expected 0\n", i); 3905 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3841 3906
3842 D_ASSERT(list_empty(&mdev->read_ee)); 3907 D_ASSERT(list_empty(&mdev->read_ee));
3843 D_ASSERT(list_empty(&mdev->active_ee)); 3908 D_ASSERT(list_empty(&mdev->active_ee));