aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikulas Patocka <mpatocka@redhat.com>2008-04-24 16:43:44 -0400
committerAlasdair G Kergon <agk@redhat.com>2008-04-25 08:26:49 -0400
commit8c0cbc2f79bb222d21b466422fde71fcc9bd37e3 (patch)
tree392d54c0fb5cc92300ef2b93237fb5accca9ead4
parent2a23aa1ddb1f0c9eef2c929c89565c387f6bf68b (diff)
dm kcopyd: per device
Make one kcopyd thread per device. The original shared kcopyd could deadlock. Configuration:
-rw-r--r--drivers/md/kcopyd.c132
1 files changed, 73 insertions, 59 deletions
diff --git a/drivers/md/kcopyd.c b/drivers/md/kcopyd.c
index 4f2c61acf7c6..3fb6c8334a82 100644
--- a/drivers/md/kcopyd.c
+++ b/drivers/md/kcopyd.c
@@ -26,14 +26,6 @@
26#include "kcopyd.h" 26#include "kcopyd.h"
27#include "dm.h" 27#include "dm.h"
28 28
29static struct workqueue_struct *_kcopyd_wq;
30static struct work_struct _kcopyd_work;
31
32static void wake(void)
33{
34 queue_work(_kcopyd_wq, &_kcopyd_work);
35}
36
37/*----------------------------------------------------------------- 29/*-----------------------------------------------------------------
38 * Each kcopyd client has its own little pool of preallocated 30 * Each kcopyd client has its own little pool of preallocated
39 * pages for kcopyd io. 31 * pages for kcopyd io.
@@ -50,8 +42,30 @@ struct dm_kcopyd_client {
50 42
51 wait_queue_head_t destroyq; 43 wait_queue_head_t destroyq;
52 atomic_t nr_jobs; 44 atomic_t nr_jobs;
45
46 struct workqueue_struct *kcopyd_wq;
47 struct work_struct kcopyd_work;
48
49/*
50 * We maintain three lists of jobs:
51 *
52 * i) jobs waiting for pages
53 * ii) jobs that have pages, and are waiting for the io to be issued.
54 * iii) jobs that have completed.
55 *
56 * All three of these are protected by job_lock.
57 */
58 spinlock_t job_lock;
59 struct list_head complete_jobs;
60 struct list_head io_jobs;
61 struct list_head pages_jobs;
53}; 62};
54 63
64static void wake(struct dm_kcopyd_client *kc)
65{
66 queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
67}
68
55static struct page_list *alloc_pl(void) 69static struct page_list *alloc_pl(void)
56{ 70{
57 struct page_list *pl; 71 struct page_list *pl;
@@ -209,21 +223,6 @@ struct kcopyd_job {
209static struct kmem_cache *_job_cache; 223static struct kmem_cache *_job_cache;
210static mempool_t *_job_pool; 224static mempool_t *_job_pool;
211 225
212/*
213 * We maintain three lists of jobs:
214 *
215 * i) jobs waiting for pages
216 * ii) jobs that have pages, and are waiting for the io to be issued.
217 * iii) jobs that have completed.
218 *
219 * All three of these are protected by job_lock.
220 */
221static DEFINE_SPINLOCK(_job_lock);
222
223static LIST_HEAD(_complete_jobs);
224static LIST_HEAD(_io_jobs);
225static LIST_HEAD(_pages_jobs);
226
227static int jobs_init(void) 226static int jobs_init(void)
228{ 227{
229 _job_cache = KMEM_CACHE(kcopyd_job, 0); 228 _job_cache = KMEM_CACHE(kcopyd_job, 0);
@@ -241,10 +240,6 @@ static int jobs_init(void)
241 240
242static void jobs_exit(void) 241static void jobs_exit(void)
243{ 242{
244 BUG_ON(!list_empty(&_complete_jobs));
245 BUG_ON(!list_empty(&_io_jobs));
246 BUG_ON(!list_empty(&_pages_jobs));
247
248 mempool_destroy(_job_pool); 243 mempool_destroy(_job_pool);
249 kmem_cache_destroy(_job_cache); 244 kmem_cache_destroy(_job_cache);
250 _job_pool = NULL; 245 _job_pool = NULL;
@@ -255,18 +250,19 @@ static void jobs_exit(void)
255 * Functions to push and pop a job onto the head of a given job 250 * Functions to push and pop a job onto the head of a given job
256 * list. 251 * list.
257 */ 252 */
258static struct kcopyd_job *pop(struct list_head *jobs) 253static struct kcopyd_job *pop(struct list_head *jobs,
254 struct dm_kcopyd_client *kc)
259{ 255{
260 struct kcopyd_job *job = NULL; 256 struct kcopyd_job *job = NULL;
261 unsigned long flags; 257 unsigned long flags;
262 258
263 spin_lock_irqsave(&_job_lock, flags); 259 spin_lock_irqsave(&kc->job_lock, flags);
264 260
265 if (!list_empty(jobs)) { 261 if (!list_empty(jobs)) {
266 job = list_entry(jobs->next, struct kcopyd_job, list); 262 job = list_entry(jobs->next, struct kcopyd_job, list);
267 list_del(&job->list); 263 list_del(&job->list);
268 } 264 }
269 spin_unlock_irqrestore(&_job_lock, flags); 265 spin_unlock_irqrestore(&kc->job_lock, flags);
270 266
271 return job; 267 return job;
272} 268}
@@ -274,10 +270,11 @@ static struct kcopyd_job *pop(struct list_head *jobs)
274static void push(struct list_head *jobs, struct kcopyd_job *job) 270static void push(struct list_head *jobs, struct kcopyd_job *job)
275{ 271{
276 unsigned long flags; 272 unsigned long flags;
273 struct dm_kcopyd_client *kc = job->kc;
277 274
278 spin_lock_irqsave(&_job_lock, flags); 275 spin_lock_irqsave(&kc->job_lock, flags);
279 list_add_tail(&job->list, jobs); 276 list_add_tail(&job->list, jobs);
280 spin_unlock_irqrestore(&_job_lock, flags); 277 spin_unlock_irqrestore(&kc->job_lock, flags);
281} 278}
282 279
283/* 280/*
@@ -310,6 +307,7 @@ static int run_complete_job(struct kcopyd_job *job)
310static void complete_io(unsigned long error, void *context) 307static void complete_io(unsigned long error, void *context)
311{ 308{
312 struct kcopyd_job *job = (struct kcopyd_job *) context; 309 struct kcopyd_job *job = (struct kcopyd_job *) context;
310 struct dm_kcopyd_client *kc = job->kc;
313 311
314 if (error) { 312 if (error) {
315 if (job->rw == WRITE) 313 if (job->rw == WRITE)
@@ -318,21 +316,21 @@ static void complete_io(unsigned long error, void *context)
318 job->read_err = 1; 316 job->read_err = 1;
319 317
320 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 318 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
321 push(&_complete_jobs, job); 319 push(&kc->complete_jobs, job);
322 wake(); 320 wake(kc);
323 return; 321 return;
324 } 322 }
325 } 323 }
326 324
327 if (job->rw == WRITE) 325 if (job->rw == WRITE)
328 push(&_complete_jobs, job); 326 push(&kc->complete_jobs, job);
329 327
330 else { 328 else {
331 job->rw = WRITE; 329 job->rw = WRITE;
332 push(&_io_jobs, job); 330 push(&kc->io_jobs, job);
333 } 331 }
334 332
335 wake(); 333 wake(kc);
336} 334}
337 335
338/* 336/*
@@ -369,7 +367,7 @@ static int run_pages_job(struct kcopyd_job *job)
369 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 367 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
370 if (!r) { 368 if (!r) {
371 /* this job is ready for io */ 369 /* this job is ready for io */
372 push(&_io_jobs, job); 370 push(&job->kc->io_jobs, job);
373 return 0; 371 return 0;
374 } 372 }
375 373
@@ -384,12 +382,13 @@ static int run_pages_job(struct kcopyd_job *job)
384 * Run through a list for as long as possible. Returns the count 382 * Run through a list for as long as possible. Returns the count
385 * of successful jobs. 383 * of successful jobs.
386 */ 384 */
387static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) 385static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
386 int (*fn) (struct kcopyd_job *))
388{ 387{
389 struct kcopyd_job *job; 388 struct kcopyd_job *job;
390 int r, count = 0; 389 int r, count = 0;
391 390
392 while ((job = pop(jobs))) { 391 while ((job = pop(jobs, kc))) {
393 392
394 r = fn(job); 393 r = fn(job);
395 394
@@ -399,7 +398,7 @@ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
399 job->write_err = (unsigned long) -1L; 398 job->write_err = (unsigned long) -1L;
400 else 399 else
401 job->read_err = 1; 400 job->read_err = 1;
402 push(&_complete_jobs, job); 401 push(&kc->complete_jobs, job);
403 break; 402 break;
404 } 403 }
405 404
@@ -421,8 +420,11 @@ static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
421/* 420/*
422 * kcopyd does this every time it's woken up. 421 * kcopyd does this every time it's woken up.
423 */ 422 */
424static void do_work(struct work_struct *ignored) 423static void do_work(struct work_struct *work)
425{ 424{
425 struct dm_kcopyd_client *kc = container_of(work,
426 struct dm_kcopyd_client, kcopyd_work);
427
426 /* 428 /*
427 * The order that these are called is *very* important. 429 * The order that these are called is *very* important.
428 * complete jobs can free some pages for pages jobs. 430 * complete jobs can free some pages for pages jobs.
@@ -430,9 +432,9 @@ static void do_work(struct work_struct *ignored)
430 * list. io jobs call wake when they complete and it all 432 * list. io jobs call wake when they complete and it all
431 * starts again. 433 * starts again.
432 */ 434 */
433 process_jobs(&_complete_jobs, run_complete_job); 435 process_jobs(&kc->complete_jobs, kc, run_complete_job);
434 process_jobs(&_pages_jobs, run_pages_job); 436 process_jobs(&kc->pages_jobs, kc, run_pages_job);
435 process_jobs(&_io_jobs, run_io_job); 437 process_jobs(&kc->io_jobs, kc, run_io_job);
436} 438}
437 439
438/* 440/*
@@ -442,9 +444,10 @@ static void do_work(struct work_struct *ignored)
442 */ 444 */
443static void dispatch_job(struct kcopyd_job *job) 445static void dispatch_job(struct kcopyd_job *job)
444{ 446{
445 atomic_inc(&job->kc->nr_jobs); 447 struct dm_kcopyd_client *kc = job->kc;
446 push(&_pages_jobs, job); 448 atomic_inc(&kc->nr_jobs);
447 wake(); 449 push(&kc->pages_jobs, job);
450 wake(kc);
448} 451}
449 452
450#define SUB_JOB_SIZE 128 453#define SUB_JOB_SIZE 128
@@ -625,15 +628,7 @@ static int kcopyd_init(void)
625 return r; 628 return r;
626 } 629 }
627 630
628 _kcopyd_wq = create_singlethread_workqueue("kcopyd");
629 if (!_kcopyd_wq) {
630 jobs_exit();
631 mutex_unlock(&kcopyd_init_lock);
632 return -ENOMEM;
633 }
634
635 kcopyd_clients++; 631 kcopyd_clients++;
636 INIT_WORK(&_kcopyd_work, do_work);
637 mutex_unlock(&kcopyd_init_lock); 632 mutex_unlock(&kcopyd_init_lock);
638 return 0; 633 return 0;
639} 634}
@@ -644,8 +639,6 @@ static void kcopyd_exit(void)
644 kcopyd_clients--; 639 kcopyd_clients--;
645 if (!kcopyd_clients) { 640 if (!kcopyd_clients) {
646 jobs_exit(); 641 jobs_exit();
647 destroy_workqueue(_kcopyd_wq);
648 _kcopyd_wq = NULL;
649 } 642 }
650 mutex_unlock(&kcopyd_init_lock); 643 mutex_unlock(&kcopyd_init_lock);
651} 644}
@@ -662,15 +655,31 @@ int dm_kcopyd_client_create(unsigned int nr_pages,
662 655
663 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 656 kc = kmalloc(sizeof(*kc), GFP_KERNEL);
664 if (!kc) { 657 if (!kc) {
658 r = -ENOMEM;
665 kcopyd_exit(); 659 kcopyd_exit();
666 return -ENOMEM; 660 return r;
667 } 661 }
668 662
669 spin_lock_init(&kc->lock); 663 spin_lock_init(&kc->lock);
664 spin_lock_init(&kc->job_lock);
665 INIT_LIST_HEAD(&kc->complete_jobs);
666 INIT_LIST_HEAD(&kc->io_jobs);
667 INIT_LIST_HEAD(&kc->pages_jobs);
668
669 INIT_WORK(&kc->kcopyd_work, do_work);
670 kc->kcopyd_wq = create_singlethread_workqueue("kcopyd");
671 if (!kc->kcopyd_wq) {
672 r = -ENOMEM;
673 kfree(kc);
674 kcopyd_exit();
675 return r;
676 }
677
670 kc->pages = NULL; 678 kc->pages = NULL;
671 kc->nr_pages = kc->nr_free_pages = 0; 679 kc->nr_pages = kc->nr_free_pages = 0;
672 r = client_alloc_pages(kc, nr_pages); 680 r = client_alloc_pages(kc, nr_pages);
673 if (r) { 681 if (r) {
682 destroy_workqueue(kc->kcopyd_wq);
674 kfree(kc); 683 kfree(kc);
675 kcopyd_exit(); 684 kcopyd_exit();
676 return r; 685 return r;
@@ -680,6 +689,7 @@ int dm_kcopyd_client_create(unsigned int nr_pages,
680 if (IS_ERR(kc->io_client)) { 689 if (IS_ERR(kc->io_client)) {
681 r = PTR_ERR(kc->io_client); 690 r = PTR_ERR(kc->io_client);
682 client_free_pages(kc); 691 client_free_pages(kc);
692 destroy_workqueue(kc->kcopyd_wq);
683 kfree(kc); 693 kfree(kc);
684 kcopyd_exit(); 694 kcopyd_exit();
685 return r; 695 return r;
@@ -699,6 +709,10 @@ void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
699 /* Wait for completion of all jobs submitted by this client. */ 709 /* Wait for completion of all jobs submitted by this client. */
700 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 710 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
701 711
712 BUG_ON(!list_empty(&kc->complete_jobs));
713 BUG_ON(!list_empty(&kc->io_jobs));
714 BUG_ON(!list_empty(&kc->pages_jobs));
715 destroy_workqueue(kc->kcopyd_wq);
702 dm_io_client_destroy(kc->io_client); 716 dm_io_client_destroy(kc->io_client);
703 client_free_pages(kc); 717 client_free_pages(kc);
704 client_del(kc); 718 client_del(kc);