aboutsummaryrefslogtreecommitdiffstats
path: root/drivers/block
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2011-03-08 11:11:40 -0500
committerPhilipp Reisner <philipp.reisner@linbit.com>2011-05-24 04:02:41 -0400
commit53ea433145d9a56c7ad5e69f21f5662053e00e84 (patch)
treefcd581287611f2ad2e169e50c34678192fb085b3 /drivers/block
parent600942e0fdb7ed1565d056d7305c46c7c0544a3e (diff)
drbd: fix potential distributed deadlock
We limit ourselves to a configurable maximum number of pages used as temporary bio pages. If the configured "max_buffers" is not big enough to match the bandwidth of the respective deployment, a distributed deadlock could be triggered by e.g. fast online verify and heavy application IO. TCP connections would block on congestion, because both receivers would wait on pages to become available. Fortunately the respective senders in this case would be able to give back some pages already. So do that. Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/drbd/drbd_worker.c94
1 files changed, 59 insertions, 35 deletions
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index f7e6c92f8d03..b5e53695fd7e 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -297,42 +297,48 @@ void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *
297 crypto_hash_final(&desc, digest); 297 crypto_hash_final(&desc, digest);
298} 298}
299 299
300static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 300/* TODO merge common code with w_e_end_ov_req */
301int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
301{ 302{
302 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 303 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
303 int digest_size; 304 int digest_size;
304 void *digest; 305 void *digest;
305 int ok; 306 int ok = 1;
306 307
307 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 308 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
308 309
309 if (unlikely(cancel)) { 310 if (unlikely(cancel))
310 drbd_free_ee(mdev, e); 311 goto out;
311 return 1;
312 }
313 312
314 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 313 if (likely((e->flags & EE_WAS_ERROR) != 0))
315 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 314 goto out;
316 digest = kmalloc(digest_size, GFP_NOIO);
317 if (digest) {
318 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
319 315
320 inc_rs_pending(mdev); 316 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
321 ok = drbd_send_drequest_csum(mdev, 317 digest = kmalloc(digest_size, GFP_NOIO);
322 e->sector, 318 if (digest) {
323 e->size, 319 sector_t sector = e->sector;
324 digest, 320 unsigned int size = e->size;
325 digest_size, 321 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326 P_CSUM_RS_REQUEST); 322 /* Free e and pages before send.
327 kfree(digest); 323 * In case we block on congestion, we could otherwise run into
328 } else { 324 * some distributed deadlock, if the other side blocks on
329 dev_err(DEV, "kmalloc() of digest failed.\n"); 325 * congestion as well, because our receiver blocks in
330 ok = 0; 326 * drbd_pp_alloc due to pp_in_use > max_buffers. */
331 } 327 drbd_free_ee(mdev, e);
332 } else 328 e = NULL;
333 ok = 1; 329 inc_rs_pending(mdev);
330 ok = drbd_send_drequest_csum(mdev, sector, size,
331 digest, digest_size,
332 P_CSUM_RS_REQUEST);
333 kfree(digest);
334 } else {
335 dev_err(DEV, "kmalloc() of digest failed.\n");
336 ok = 0;
337 }
334 338
335 drbd_free_ee(mdev, e); 339out:
340 if (e)
341 drbd_free_ee(mdev, e);
336 342
337 if (unlikely(!ok)) 343 if (unlikely(!ok))
338 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 344 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
@@ -1071,9 +1077,12 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1071 return ok; 1077 return ok;
1072} 1078}
1073 1079
1080/* TODO merge common code with w_e_send_csum */
1074int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1081int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1075{ 1082{
1076 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1083 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1084 sector_t sector = e->sector;
1085 unsigned int size = e->size;
1077 int digest_size; 1086 int digest_size;
1078 void *digest; 1087 void *digest;
1079 int ok = 1; 1088 int ok = 1;
@@ -1093,17 +1102,25 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1093 else 1102 else
1094 memset(digest, 0, digest_size); 1103 memset(digest, 0, digest_size);
1095 1104
1105 /* Free e and pages before send.
1106 * In case we block on congestion, we could otherwise run into
1107 * some distributed deadlock, if the other side blocks on
1108 * congestion as well, because our receiver blocks in
1109 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1110 drbd_free_ee(mdev, e);
1111 e = NULL;
1096 inc_rs_pending(mdev); 1112 inc_rs_pending(mdev);
1097 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1113 ok = drbd_send_drequest_csum(mdev, sector, size,
1098 digest, digest_size, P_OV_REPLY); 1114 digest, digest_size,
1115 P_OV_REPLY);
1099 if (!ok) 1116 if (!ok)
1100 dec_rs_pending(mdev); 1117 dec_rs_pending(mdev);
1101 kfree(digest); 1118 kfree(digest);
1102 1119
1103out: 1120out:
1104 drbd_free_ee(mdev, e); 1121 if (e)
1122 drbd_free_ee(mdev, e);
1105 dec_unacked(mdev); 1123 dec_unacked(mdev);
1106
1107 return ok; 1124 return ok;
1108} 1125}
1109 1126
@@ -1122,8 +1139,10 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122{ 1139{
1123 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1140 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1124 struct digest_info *di; 1141 struct digest_info *di;
1125 int digest_size;
1126 void *digest; 1142 void *digest;
1143 sector_t sector = e->sector;
1144 unsigned int size = e->size;
1145 int digest_size;
1127 int ok, eq = 0; 1146 int ok, eq = 0;
1128 1147
1129 if (unlikely(cancel)) { 1148 if (unlikely(cancel)) {
@@ -1153,16 +1172,21 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1153 } 1172 }
1154 } 1173 }
1155 1174
1156 dec_unacked(mdev); 1175 /* Free e and pages before send.
1176 * In case we block on congestion, we could otherwise run into
1177 * some distributed deadlock, if the other side blocks on
1178 * congestion as well, because our receiver blocks in
1179 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1180 drbd_free_ee(mdev, e);
1157 if (!eq) 1181 if (!eq)
1158 drbd_ov_oos_found(mdev, e->sector, e->size); 1182 drbd_ov_oos_found(mdev, sector, size);
1159 else 1183 else
1160 ov_oos_print(mdev); 1184 ov_oos_print(mdev);
1161 1185
1162 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1186 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1163 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1187 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1164 1188
1165 drbd_free_ee(mdev, e); 1189 dec_unacked(mdev);
1166 1190
1167 --mdev->ov_left; 1191 --mdev->ov_left;
1168 1192