aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/ib_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/ib_rdma.c')
-rw-r--r--net/rds/ib_rdma.c318
1 files changed, 247 insertions, 71 deletions
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index a54cd63f9e35..18a833c450c8 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -32,11 +32,16 @@
32 */ 32 */
33#include <linux/kernel.h> 33#include <linux/kernel.h>
34#include <linux/slab.h> 34#include <linux/slab.h>
35#include <linux/rculist.h>
35 36
36#include "rds.h" 37#include "rds.h"
37#include "rdma.h"
38#include "ib.h" 38#include "ib.h"
39#include "xlist.h"
39 40
41static struct workqueue_struct *rds_ib_fmr_wq;
42
43static DEFINE_PER_CPU(unsigned long, clean_list_grace);
44#define CLEAN_LIST_BUSY_BIT 0
40 45
41/* 46/*
42 * This is stored as mr->r_trans_private. 47 * This is stored as mr->r_trans_private.
@@ -45,7 +50,11 @@ struct rds_ib_mr {
45 struct rds_ib_device *device; 50 struct rds_ib_device *device;
46 struct rds_ib_mr_pool *pool; 51 struct rds_ib_mr_pool *pool;
47 struct ib_fmr *fmr; 52 struct ib_fmr *fmr;
48 struct list_head list; 53
54 struct xlist_head xlist;
55
56 /* unmap_list is for freeing */
57 struct list_head unmap_list;
49 unsigned int remap_count; 58 unsigned int remap_count;
50 59
51 struct scatterlist *sg; 60 struct scatterlist *sg;
@@ -59,14 +68,16 @@ struct rds_ib_mr {
59 */ 68 */
60struct rds_ib_mr_pool { 69struct rds_ib_mr_pool {
61 struct mutex flush_lock; /* serialize fmr invalidate */ 70 struct mutex flush_lock; /* serialize fmr invalidate */
62 struct work_struct flush_worker; /* flush worker */ 71 struct delayed_work flush_worker; /* flush worker */
63 72
64 spinlock_t list_lock; /* protect variables below */
65 atomic_t item_count; /* total # of MRs */ 73 atomic_t item_count; /* total # of MRs */
66 atomic_t dirty_count; /* # dirty of MRs */ 74 atomic_t dirty_count; /* # dirty of MRs */
67 struct list_head drop_list; /* MRs that have reached their max_maps limit */ 75
68 struct list_head free_list; /* unused MRs */ 76 struct xlist_head drop_list; /* MRs that have reached their max_maps limit */
69 struct list_head clean_list; /* unused & unamapped MRs */ 77 struct xlist_head free_list; /* unused MRs */
78 struct xlist_head clean_list; /* global unused & unamapped MRs */
79 wait_queue_head_t flush_wait;
80
70 atomic_t free_pinned; /* memory pinned by free MRs */ 81 atomic_t free_pinned; /* memory pinned by free MRs */
71 unsigned long max_items; 82 unsigned long max_items;
72 unsigned long max_items_soft; 83 unsigned long max_items_soft;
@@ -74,7 +85,7 @@ struct rds_ib_mr_pool {
74 struct ib_fmr_attr fmr_attr; 85 struct ib_fmr_attr fmr_attr;
75}; 86};
76 87
77static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all); 88static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
78static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr); 89static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
79static void rds_ib_mr_pool_flush_worker(struct work_struct *work); 90static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
80 91
@@ -83,16 +94,17 @@ static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
83 struct rds_ib_device *rds_ibdev; 94 struct rds_ib_device *rds_ibdev;
84 struct rds_ib_ipaddr *i_ipaddr; 95 struct rds_ib_ipaddr *i_ipaddr;
85 96
86 list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { 97 rcu_read_lock();
87 spin_lock_irq(&rds_ibdev->spinlock); 98 list_for_each_entry_rcu(rds_ibdev, &rds_ib_devices, list) {
88 list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) { 99 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
89 if (i_ipaddr->ipaddr == ipaddr) { 100 if (i_ipaddr->ipaddr == ipaddr) {
90 spin_unlock_irq(&rds_ibdev->spinlock); 101 atomic_inc(&rds_ibdev->refcount);
102 rcu_read_unlock();
91 return rds_ibdev; 103 return rds_ibdev;
92 } 104 }
93 } 105 }
94 spin_unlock_irq(&rds_ibdev->spinlock);
95 } 106 }
107 rcu_read_unlock();
96 108
97 return NULL; 109 return NULL;
98} 110}
@@ -108,7 +120,7 @@ static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
108 i_ipaddr->ipaddr = ipaddr; 120 i_ipaddr->ipaddr = ipaddr;
109 121
110 spin_lock_irq(&rds_ibdev->spinlock); 122 spin_lock_irq(&rds_ibdev->spinlock);
111 list_add_tail(&i_ipaddr->list, &rds_ibdev->ipaddr_list); 123 list_add_tail_rcu(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
112 spin_unlock_irq(&rds_ibdev->spinlock); 124 spin_unlock_irq(&rds_ibdev->spinlock);
113 125
114 return 0; 126 return 0;
@@ -116,17 +128,24 @@ static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
116 128
117static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 129static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
118{ 130{
119 struct rds_ib_ipaddr *i_ipaddr, *next; 131 struct rds_ib_ipaddr *i_ipaddr;
132 struct rds_ib_ipaddr *to_free = NULL;
133
120 134
121 spin_lock_irq(&rds_ibdev->spinlock); 135 spin_lock_irq(&rds_ibdev->spinlock);
122 list_for_each_entry_safe(i_ipaddr, next, &rds_ibdev->ipaddr_list, list) { 136 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
123 if (i_ipaddr->ipaddr == ipaddr) { 137 if (i_ipaddr->ipaddr == ipaddr) {
124 list_del(&i_ipaddr->list); 138 list_del_rcu(&i_ipaddr->list);
125 kfree(i_ipaddr); 139 to_free = i_ipaddr;
126 break; 140 break;
127 } 141 }
128 } 142 }
129 spin_unlock_irq(&rds_ibdev->spinlock); 143 spin_unlock_irq(&rds_ibdev->spinlock);
144
145 if (to_free) {
146 synchronize_rcu();
147 kfree(to_free);
148 }
130} 149}
131 150
132int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 151int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
@@ -134,8 +153,10 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
134 struct rds_ib_device *rds_ibdev_old; 153 struct rds_ib_device *rds_ibdev_old;
135 154
136 rds_ibdev_old = rds_ib_get_device(ipaddr); 155 rds_ibdev_old = rds_ib_get_device(ipaddr);
137 if (rds_ibdev_old) 156 if (rds_ibdev_old) {
138 rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr); 157 rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
158 rds_ib_dev_put(rds_ibdev_old);
159 }
139 160
140 return rds_ib_add_ipaddr(rds_ibdev, ipaddr); 161 return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
141} 162}
@@ -150,12 +171,13 @@ void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *con
150 BUG_ON(list_empty(&ic->ib_node)); 171 BUG_ON(list_empty(&ic->ib_node));
151 list_del(&ic->ib_node); 172 list_del(&ic->ib_node);
152 173
153 spin_lock_irq(&rds_ibdev->spinlock); 174 spin_lock(&rds_ibdev->spinlock);
154 list_add_tail(&ic->ib_node, &rds_ibdev->conn_list); 175 list_add_tail(&ic->ib_node, &rds_ibdev->conn_list);
155 spin_unlock_irq(&rds_ibdev->spinlock); 176 spin_unlock(&rds_ibdev->spinlock);
156 spin_unlock_irq(&ib_nodev_conns_lock); 177 spin_unlock_irq(&ib_nodev_conns_lock);
157 178
158 ic->rds_ibdev = rds_ibdev; 179 ic->rds_ibdev = rds_ibdev;
180 atomic_inc(&rds_ibdev->refcount);
159} 181}
160 182
161void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) 183void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
@@ -175,18 +197,18 @@ void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *
175 spin_unlock(&ib_nodev_conns_lock); 197 spin_unlock(&ib_nodev_conns_lock);
176 198
177 ic->rds_ibdev = NULL; 199 ic->rds_ibdev = NULL;
200 rds_ib_dev_put(rds_ibdev);
178} 201}
179 202
180void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock) 203void rds_ib_destroy_nodev_conns(void)
181{ 204{
182 struct rds_ib_connection *ic, *_ic; 205 struct rds_ib_connection *ic, *_ic;
183 LIST_HEAD(tmp_list); 206 LIST_HEAD(tmp_list);
184 207
185 /* avoid calling conn_destroy with irqs off */ 208 /* avoid calling conn_destroy with irqs off */
186 spin_lock_irq(list_lock); 209 spin_lock_irq(&ib_nodev_conns_lock);
187 list_splice(list, &tmp_list); 210 list_splice(&ib_nodev_conns, &tmp_list);
188 INIT_LIST_HEAD(list); 211 spin_unlock_irq(&ib_nodev_conns_lock);
189 spin_unlock_irq(list_lock);
190 212
191 list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node) 213 list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node)
192 rds_conn_destroy(ic->conn); 214 rds_conn_destroy(ic->conn);
@@ -200,12 +222,12 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
200 if (!pool) 222 if (!pool)
201 return ERR_PTR(-ENOMEM); 223 return ERR_PTR(-ENOMEM);
202 224
203 INIT_LIST_HEAD(&pool->free_list); 225 INIT_XLIST_HEAD(&pool->free_list);
204 INIT_LIST_HEAD(&pool->drop_list); 226 INIT_XLIST_HEAD(&pool->drop_list);
205 INIT_LIST_HEAD(&pool->clean_list); 227 INIT_XLIST_HEAD(&pool->clean_list);
206 mutex_init(&pool->flush_lock); 228 mutex_init(&pool->flush_lock);
207 spin_lock_init(&pool->list_lock); 229 init_waitqueue_head(&pool->flush_wait);
208 INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker); 230 INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
209 231
210 pool->fmr_attr.max_pages = fmr_message_size; 232 pool->fmr_attr.max_pages = fmr_message_size;
211 pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; 233 pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
@@ -233,34 +255,60 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
233 255
234void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) 256void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
235{ 257{
236 flush_workqueue(rds_wq); 258 cancel_delayed_work_sync(&pool->flush_worker);
237 rds_ib_flush_mr_pool(pool, 1); 259 rds_ib_flush_mr_pool(pool, 1, NULL);
238 WARN_ON(atomic_read(&pool->item_count)); 260 WARN_ON(atomic_read(&pool->item_count));
239 WARN_ON(atomic_read(&pool->free_pinned)); 261 WARN_ON(atomic_read(&pool->free_pinned));
240 kfree(pool); 262 kfree(pool);
241} 263}
242 264
265static void refill_local(struct rds_ib_mr_pool *pool, struct xlist_head *xl,
266 struct rds_ib_mr **ibmr_ret)
267{
268 struct xlist_head *ibmr_xl;
269 ibmr_xl = xlist_del_head_fast(xl);
270 *ibmr_ret = list_entry(ibmr_xl, struct rds_ib_mr, xlist);
271}
272
243static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool) 273static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
244{ 274{
245 struct rds_ib_mr *ibmr = NULL; 275 struct rds_ib_mr *ibmr = NULL;
246 unsigned long flags; 276 struct xlist_head *ret;
277 unsigned long *flag;
247 278
248 spin_lock_irqsave(&pool->list_lock, flags); 279 preempt_disable();
249 if (!list_empty(&pool->clean_list)) { 280 flag = &__get_cpu_var(clean_list_grace);
250 ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list); 281 set_bit(CLEAN_LIST_BUSY_BIT, flag);
251 list_del_init(&ibmr->list); 282 ret = xlist_del_head(&pool->clean_list);
252 } 283 if (ret)
253 spin_unlock_irqrestore(&pool->list_lock, flags); 284 ibmr = list_entry(ret, struct rds_ib_mr, xlist);
254 285
286 clear_bit(CLEAN_LIST_BUSY_BIT, flag);
287 preempt_enable();
255 return ibmr; 288 return ibmr;
256} 289}
257 290
291static inline void wait_clean_list_grace(void)
292{
293 int cpu;
294 unsigned long *flag;
295
296 for_each_online_cpu(cpu) {
297 flag = &per_cpu(clean_list_grace, cpu);
298 while (test_bit(CLEAN_LIST_BUSY_BIT, flag))
299 cpu_relax();
300 }
301}
302
258static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) 303static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
259{ 304{
260 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 305 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
261 struct rds_ib_mr *ibmr = NULL; 306 struct rds_ib_mr *ibmr = NULL;
262 int err = 0, iter = 0; 307 int err = 0, iter = 0;
263 308
309 if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
310 queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
311
264 while (1) { 312 while (1) {
265 ibmr = rds_ib_reuse_fmr(pool); 313 ibmr = rds_ib_reuse_fmr(pool);
266 if (ibmr) 314 if (ibmr)
@@ -287,19 +335,24 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
287 335
288 /* We do have some empty MRs. Flush them out. */ 336 /* We do have some empty MRs. Flush them out. */
289 rds_ib_stats_inc(s_ib_rdma_mr_pool_wait); 337 rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
290 rds_ib_flush_mr_pool(pool, 0); 338 rds_ib_flush_mr_pool(pool, 0, &ibmr);
339 if (ibmr)
340 return ibmr;
291 } 341 }
292 342
293 ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL); 343 ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
294 if (!ibmr) { 344 if (!ibmr) {
295 err = -ENOMEM; 345 err = -ENOMEM;
296 goto out_no_cigar; 346 goto out_no_cigar;
297 } 347 }
298 348
349 memset(ibmr, 0, sizeof(*ibmr));
350
299 ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, 351 ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
300 (IB_ACCESS_LOCAL_WRITE | 352 (IB_ACCESS_LOCAL_WRITE |
301 IB_ACCESS_REMOTE_READ | 353 IB_ACCESS_REMOTE_READ |
302 IB_ACCESS_REMOTE_WRITE), 354 IB_ACCESS_REMOTE_WRITE|
355 IB_ACCESS_REMOTE_ATOMIC),
303 &pool->fmr_attr); 356 &pool->fmr_attr);
304 if (IS_ERR(ibmr->fmr)) { 357 if (IS_ERR(ibmr->fmr)) {
305 err = PTR_ERR(ibmr->fmr); 358 err = PTR_ERR(ibmr->fmr);
@@ -367,7 +420,8 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
367 if (page_cnt > fmr_message_size) 420 if (page_cnt > fmr_message_size)
368 return -EINVAL; 421 return -EINVAL;
369 422
370 dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC); 423 dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
424 rdsibdev_to_node(rds_ibdev));
371 if (!dma_pages) 425 if (!dma_pages)
372 return -ENOMEM; 426 return -ENOMEM;
373 427
@@ -441,7 +495,7 @@ static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
441 495
442 /* FIXME we need a way to tell a r/w MR 496 /* FIXME we need a way to tell a r/w MR
443 * from a r/o MR */ 497 * from a r/o MR */
444 BUG_ON(in_interrupt()); 498 BUG_ON(irqs_disabled());
445 set_page_dirty(page); 499 set_page_dirty(page);
446 put_page(page); 500 put_page(page);
447 } 501 }
@@ -477,33 +531,109 @@ static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int fr
477} 531}
478 532
479/* 533/*
534 * given an xlist of mrs, put them all into the list_head for more processing
535 */
536static void xlist_append_to_list(struct xlist_head *xlist, struct list_head *list)
537{
538 struct rds_ib_mr *ibmr;
539 struct xlist_head splice;
540 struct xlist_head *cur;
541 struct xlist_head *next;
542
543 splice.next = NULL;
544 xlist_splice(xlist, &splice);
545 cur = splice.next;
546 while (cur) {
547 next = cur->next;
548 ibmr = list_entry(cur, struct rds_ib_mr, xlist);
549 list_add_tail(&ibmr->unmap_list, list);
550 cur = next;
551 }
552}
553
554/*
555 * this takes a list head of mrs and turns it into an xlist of clusters.
556 * each cluster has an xlist of MR_CLUSTER_SIZE mrs that are ready for
557 * reuse.
558 */
559static void list_append_to_xlist(struct rds_ib_mr_pool *pool,
560 struct list_head *list, struct xlist_head *xlist,
561 struct xlist_head **tail_ret)
562{
563 struct rds_ib_mr *ibmr;
564 struct xlist_head *cur_mr = xlist;
565 struct xlist_head *tail_mr = NULL;
566
567 list_for_each_entry(ibmr, list, unmap_list) {
568 tail_mr = &ibmr->xlist;
569 tail_mr->next = NULL;
570 cur_mr->next = tail_mr;
571 cur_mr = tail_mr;
572 }
573 *tail_ret = tail_mr;
574}
575
576/*
480 * Flush our pool of MRs. 577 * Flush our pool of MRs.
481 * At a minimum, all currently unused MRs are unmapped. 578 * At a minimum, all currently unused MRs are unmapped.
482 * If the number of MRs allocated exceeds the limit, we also try 579 * If the number of MRs allocated exceeds the limit, we also try
483 * to free as many MRs as needed to get back to this limit. 580 * to free as many MRs as needed to get back to this limit.
484 */ 581 */
485static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all) 582static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
583 int free_all, struct rds_ib_mr **ibmr_ret)
486{ 584{
487 struct rds_ib_mr *ibmr, *next; 585 struct rds_ib_mr *ibmr, *next;
586 struct xlist_head clean_xlist;
587 struct xlist_head *clean_tail;
488 LIST_HEAD(unmap_list); 588 LIST_HEAD(unmap_list);
489 LIST_HEAD(fmr_list); 589 LIST_HEAD(fmr_list);
490 unsigned long unpinned = 0; 590 unsigned long unpinned = 0;
491 unsigned long flags;
492 unsigned int nfreed = 0, ncleaned = 0, free_goal; 591 unsigned int nfreed = 0, ncleaned = 0, free_goal;
493 int ret = 0; 592 int ret = 0;
494 593
495 rds_ib_stats_inc(s_ib_rdma_mr_pool_flush); 594 rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
496 595
497 mutex_lock(&pool->flush_lock); 596 if (ibmr_ret) {
597 DEFINE_WAIT(wait);
598 while(!mutex_trylock(&pool->flush_lock)) {
599 ibmr = rds_ib_reuse_fmr(pool);
600 if (ibmr) {
601 *ibmr_ret = ibmr;
602 finish_wait(&pool->flush_wait, &wait);
603 goto out_nolock;
604 }
605
606 prepare_to_wait(&pool->flush_wait, &wait,
607 TASK_UNINTERRUPTIBLE);
608 if (xlist_empty(&pool->clean_list))
609 schedule();
610
611 ibmr = rds_ib_reuse_fmr(pool);
612 if (ibmr) {
613 *ibmr_ret = ibmr;
614 finish_wait(&pool->flush_wait, &wait);
615 goto out_nolock;
616 }
617 }
618 finish_wait(&pool->flush_wait, &wait);
619 } else
620 mutex_lock(&pool->flush_lock);
621
622 if (ibmr_ret) {
623 ibmr = rds_ib_reuse_fmr(pool);
624 if (ibmr) {
625 *ibmr_ret = ibmr;
626 goto out;
627 }
628 }
498 629
499 spin_lock_irqsave(&pool->list_lock, flags);
500 /* Get the list of all MRs to be dropped. Ordering matters - 630 /* Get the list of all MRs to be dropped. Ordering matters -
501 * we want to put drop_list ahead of free_list. */ 631 * we want to put drop_list ahead of free_list.
502 list_splice_init(&pool->free_list, &unmap_list); 632 */
503 list_splice_init(&pool->drop_list, &unmap_list); 633 xlist_append_to_list(&pool->drop_list, &unmap_list);
634 xlist_append_to_list(&pool->free_list, &unmap_list);
504 if (free_all) 635 if (free_all)
505 list_splice_init(&pool->clean_list, &unmap_list); 636 xlist_append_to_list(&pool->clean_list, &unmap_list);
506 spin_unlock_irqrestore(&pool->list_lock, flags);
507 637
508 free_goal = rds_ib_flush_goal(pool, free_all); 638 free_goal = rds_ib_flush_goal(pool, free_all);
509 639
@@ -511,19 +641,20 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
511 goto out; 641 goto out;
512 642
513 /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */ 643 /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
514 list_for_each_entry(ibmr, &unmap_list, list) 644 list_for_each_entry(ibmr, &unmap_list, unmap_list)
515 list_add(&ibmr->fmr->list, &fmr_list); 645 list_add(&ibmr->fmr->list, &fmr_list);
646
516 ret = ib_unmap_fmr(&fmr_list); 647 ret = ib_unmap_fmr(&fmr_list);
517 if (ret) 648 if (ret)
518 printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret); 649 printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
519 650
520 /* Now we can destroy the DMA mapping and unpin any pages */ 651 /* Now we can destroy the DMA mapping and unpin any pages */
521 list_for_each_entry_safe(ibmr, next, &unmap_list, list) { 652 list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
522 unpinned += ibmr->sg_len; 653 unpinned += ibmr->sg_len;
523 __rds_ib_teardown_mr(ibmr); 654 __rds_ib_teardown_mr(ibmr);
524 if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) { 655 if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
525 rds_ib_stats_inc(s_ib_rdma_mr_free); 656 rds_ib_stats_inc(s_ib_rdma_mr_free);
526 list_del(&ibmr->list); 657 list_del(&ibmr->unmap_list);
527 ib_dealloc_fmr(ibmr->fmr); 658 ib_dealloc_fmr(ibmr->fmr);
528 kfree(ibmr); 659 kfree(ibmr);
529 nfreed++; 660 nfreed++;
@@ -531,9 +662,27 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
531 ncleaned++; 662 ncleaned++;
532 } 663 }
533 664
534 spin_lock_irqsave(&pool->list_lock, flags); 665 if (!list_empty(&unmap_list)) {
535 list_splice(&unmap_list, &pool->clean_list); 666 /* we have to make sure that none of the things we're about
536 spin_unlock_irqrestore(&pool->list_lock, flags); 667 * to put on the clean list would race with other cpus trying
668 * to pull items off. The xlist would explode if we managed to
669 * remove something from the clean list and then add it back again
670 * while another CPU was spinning on that same item in xlist_del_head.
671 *
672 * This is pretty unlikely, but just in case wait for an xlist grace period
673 * here before adding anything back into the clean list.
674 */
675 wait_clean_list_grace();
676
677 list_append_to_xlist(pool, &unmap_list, &clean_xlist, &clean_tail);
678 if (ibmr_ret)
679 refill_local(pool, &clean_xlist, ibmr_ret);
680
681 /* refill_local may have emptied our list */
682 if (!xlist_empty(&clean_xlist))
683 xlist_add(clean_xlist.next, clean_tail, &pool->clean_list);
684
685 }
537 686
538 atomic_sub(unpinned, &pool->free_pinned); 687 atomic_sub(unpinned, &pool->free_pinned);
539 atomic_sub(ncleaned, &pool->dirty_count); 688 atomic_sub(ncleaned, &pool->dirty_count);
@@ -541,14 +690,35 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
541 690
542out: 691out:
543 mutex_unlock(&pool->flush_lock); 692 mutex_unlock(&pool->flush_lock);
693 if (waitqueue_active(&pool->flush_wait))
694 wake_up(&pool->flush_wait);
695out_nolock:
544 return ret; 696 return ret;
545} 697}
546 698
699int rds_ib_fmr_init(void)
700{
701 rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd");
702 if (!rds_ib_fmr_wq)
703 return -ENOMEM;
704 return 0;
705}
706
707/*
708 * By the time this is called all the IB devices should have been torn down and
709 * had their pools freed. As each pool is freed its work struct is waited on,
710 * so the pool flushing work queue should be idle by the time we get here.
711 */
712void rds_ib_fmr_exit(void)
713{
714 destroy_workqueue(rds_ib_fmr_wq);
715}
716
547static void rds_ib_mr_pool_flush_worker(struct work_struct *work) 717static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
548{ 718{
549 struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker); 719 struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work);
550 720
551 rds_ib_flush_mr_pool(pool, 0); 721 rds_ib_flush_mr_pool(pool, 0, NULL);
552} 722}
553 723
554void rds_ib_free_mr(void *trans_private, int invalidate) 724void rds_ib_free_mr(void *trans_private, int invalidate)
@@ -556,47 +726,49 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
556 struct rds_ib_mr *ibmr = trans_private; 726 struct rds_ib_mr *ibmr = trans_private;
557 struct rds_ib_device *rds_ibdev = ibmr->device; 727 struct rds_ib_device *rds_ibdev = ibmr->device;
558 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 728 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
559 unsigned long flags;
560 729
561 rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); 730 rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
562 731
563 /* Return it to the pool's free list */ 732 /* Return it to the pool's free list */
564 spin_lock_irqsave(&pool->list_lock, flags);
565 if (ibmr->remap_count >= pool->fmr_attr.max_maps) 733 if (ibmr->remap_count >= pool->fmr_attr.max_maps)
566 list_add(&ibmr->list, &pool->drop_list); 734 xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list);
567 else 735 else
568 list_add(&ibmr->list, &pool->free_list); 736 xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list);
569 737
570 atomic_add(ibmr->sg_len, &pool->free_pinned); 738 atomic_add(ibmr->sg_len, &pool->free_pinned);
571 atomic_inc(&pool->dirty_count); 739 atomic_inc(&pool->dirty_count);
572 spin_unlock_irqrestore(&pool->list_lock, flags);
573 740
574 /* If we've pinned too many pages, request a flush */ 741 /* If we've pinned too many pages, request a flush */
575 if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned || 742 if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
576 atomic_read(&pool->dirty_count) >= pool->max_items / 10) 743 atomic_read(&pool->dirty_count) >= pool->max_items / 10)
577 queue_work(rds_wq, &pool->flush_worker); 744 queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
578 745
579 if (invalidate) { 746 if (invalidate) {
580 if (likely(!in_interrupt())) { 747 if (likely(!in_interrupt())) {
581 rds_ib_flush_mr_pool(pool, 0); 748 rds_ib_flush_mr_pool(pool, 0, NULL);
582 } else { 749 } else {
583 /* We get here if the user created a MR marked 750 /* We get here if the user created a MR marked
584 * as use_once and invalidate at the same time. */ 751 * as use_once and invalidate at the same time. */
585 queue_work(rds_wq, &pool->flush_worker); 752 queue_delayed_work(rds_ib_fmr_wq,
753 &pool->flush_worker, 10);
586 } 754 }
587 } 755 }
756
757 rds_ib_dev_put(rds_ibdev);
588} 758}
589 759
590void rds_ib_flush_mrs(void) 760void rds_ib_flush_mrs(void)
591{ 761{
592 struct rds_ib_device *rds_ibdev; 762 struct rds_ib_device *rds_ibdev;
593 763
764 down_read(&rds_ib_devices_lock);
594 list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { 765 list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
595 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 766 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
596 767
597 if (pool) 768 if (pool)
598 rds_ib_flush_mr_pool(pool, 0); 769 rds_ib_flush_mr_pool(pool, 0, NULL);
599 } 770 }
771 up_read(&rds_ib_devices_lock);
600} 772}
601 773
602void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, 774void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
@@ -628,6 +800,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
628 printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret); 800 printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
629 801
630 ibmr->device = rds_ibdev; 802 ibmr->device = rds_ibdev;
803 rds_ibdev = NULL;
631 804
632 out: 805 out:
633 if (ret) { 806 if (ret) {
@@ -635,5 +808,8 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
635 rds_ib_free_mr(ibmr, 0); 808 rds_ib_free_mr(ibmr, 0);
636 ibmr = ERR_PTR(ret); 809 ibmr = ERR_PTR(ret);
637 } 810 }
811 if (rds_ibdev)
812 rds_ib_dev_put(rds_ibdev);
638 return ibmr; 813 return ibmr;
639} 814}
815