aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/ib_recv.c
diff options
context:
space:
mode:
authorChris Mason <chris.mason@oracle.com>2010-05-27 01:05:37 -0400
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:15:23 -0400
commit33244125871734ebc0d8d147680a0d7e99385e0b (patch)
treec4dad958a4de167f5b954e91bc90d69000cd89c1 /net/rds/ib_recv.c
parentfc24f78085e8771670af42f2b8929b16a0c98a22 (diff)
RDS/IB: Add caching of frags and incs
This patch is based heavily on an initial patch by Chris Mason. Instead of freeing slab memory and pages, it keeps them, and funnels them back to be reused. The lock minimization strategy uses xchg and cmpxchg atomic ops for manipulation of pointers to list heads. We anchor the lists with a pointer to a list_head struct instead of a static list_head struct. We just have to carefully use the existing primitives with the difference between a pointer and a static head struct. For example, 'list_empty()' means that our anchor pointer points to a list with a single item instead of meaning that our static head element doesn't point to any list items. Original patch by Chris, with significant mods and fixes by Andy and Zach. Signed-off-by: Chris Mason <chris.mason@oracle.com> Signed-off-by: Andy Grover <andy.grover@oracle.com> Signed-off-by: Zach Brown <zach.brown@oracle.com>
Diffstat (limited to 'net/rds/ib_recv.c')
-rw-r--r--net/rds/ib_recv.c321
1 files changed, 269 insertions, 52 deletions
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 48add108ddee..5b429b7fd81c 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -43,14 +43,6 @@ static struct kmem_cache *rds_ib_incoming_slab;
43static struct kmem_cache *rds_ib_frag_slab; 43static struct kmem_cache *rds_ib_frag_slab;
44static atomic_t rds_ib_allocation = ATOMIC_INIT(0); 44static atomic_t rds_ib_allocation = ATOMIC_INIT(0);
45 45
46/* Free frag and attached recv buffer f_sg */
47static void rds_ib_frag_free(struct rds_page_frag *frag)
48{
49 rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
50 __free_page(sg_page(&frag->f_sg));
51 kmem_cache_free(rds_ib_frag_slab, frag);
52}
53
54void rds_ib_recv_init_ring(struct rds_ib_connection *ic) 46void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
55{ 47{
56 struct rds_ib_recv_work *recv; 48 struct rds_ib_recv_work *recv;
@@ -79,6 +71,151 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
79 } 71 }
80} 72}
81 73
74/*
75 * The entire 'from' list, including the from element itself, is put on
76 * to the tail of the 'to' list.
77 */
78static void list_splice_entire_tail(struct list_head *from,
79 struct list_head *to)
80{
81 struct list_head *from_last = from->prev;
82
83 list_splice_tail(from_last, to);
84 list_add_tail(from_last, to);
85}
86
87static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache)
88{
89 struct list_head *tmp;
90
91 tmp = xchg(&cache->xfer, NULL);
92 if (tmp) {
93 if (cache->ready)
94 list_splice_entire_tail(tmp, cache->ready);
95 else
96 cache->ready = tmp;
97 }
98}
99
100static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache)
101{
102 struct rds_ib_cache_head *head;
103 int cpu;
104
105 cache->percpu = alloc_percpu(struct rds_ib_cache_head);
106 if (!cache->percpu)
107 return -ENOMEM;
108
109 for_each_possible_cpu(cpu) {
110 head = per_cpu_ptr(cache->percpu, cpu);
111 head->first = NULL;
112 head->count = 0;
113 }
114 cache->xfer = NULL;
115 cache->ready = NULL;
116
117 return 0;
118}
119
120int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic)
121{
122 int ret;
123
124 ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs);
125 if (!ret) {
126 ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags);
127 if (ret)
128 free_percpu(ic->i_cache_incs.percpu);
129 }
130
131 return ret;
132}
133
134static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache,
135 struct list_head *caller_list)
136{
137 struct rds_ib_cache_head *head;
138 int cpu;
139
140 for_each_possible_cpu(cpu) {
141 head = per_cpu_ptr(cache->percpu, cpu);
142 if (head->first) {
143 list_splice_entire_tail(head->first, caller_list);
144 head->first = NULL;
145 }
146 }
147
148 if (cache->ready) {
149 list_splice_entire_tail(cache->ready, caller_list);
150 cache->ready = NULL;
151 }
152}
153
154void rds_ib_recv_free_caches(struct rds_ib_connection *ic)
155{
156 struct rds_ib_incoming *inc;
157 struct rds_ib_incoming *inc_tmp;
158 struct rds_page_frag *frag;
159 struct rds_page_frag *frag_tmp;
160 LIST_HEAD(list);
161
162 rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
163 rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list);
164 free_percpu(ic->i_cache_incs.percpu);
165
166 list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) {
167 list_del(&inc->ii_cache_entry);
168 WARN_ON(!list_empty(&inc->ii_frags));
169 kmem_cache_free(rds_ib_incoming_slab, inc);
170 }
171
172 rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
173 rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list);
174 free_percpu(ic->i_cache_frags.percpu);
175
176 list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) {
177 list_del(&frag->f_cache_entry);
178 WARN_ON(!list_empty(&frag->f_item));
179 kmem_cache_free(rds_ib_frag_slab, frag);
180 }
181}
182
183/* fwd decl */
184static void rds_ib_recv_cache_put(struct list_head *new_item,
185 struct rds_ib_refill_cache *cache);
186static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache);
187
188
189/* Recycle frag and attached recv buffer f_sg */
190static void rds_ib_frag_free(struct rds_ib_connection *ic,
191 struct rds_page_frag *frag)
192{
193 rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
194
195 rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
196}
197
198/* Recycle inc after freeing attached frags */
199void rds_ib_inc_free(struct rds_incoming *inc)
200{
201 struct rds_ib_incoming *ibinc;
202 struct rds_page_frag *frag;
203 struct rds_page_frag *pos;
204 struct rds_ib_connection *ic = inc->i_conn->c_transport_data;
205
206 ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
207
208 /* Free attached frags */
209 list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
210 list_del_init(&frag->f_item);
211 rds_ib_frag_free(ic, frag);
212 }
213 BUG_ON(!list_empty(&ibinc->ii_frags));
214
215 rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
216 rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs);
217}
218
82static void rds_ib_recv_clear_one(struct rds_ib_connection *ic, 219static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
83 struct rds_ib_recv_work *recv) 220 struct rds_ib_recv_work *recv)
84{ 221{
@@ -88,7 +225,7 @@ static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
88 } 225 }
89 if (recv->r_frag) { 226 if (recv->r_frag) {
90 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); 227 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
91 rds_ib_frag_free(recv->r_frag); 228 rds_ib_frag_free(ic, recv->r_frag);
92 recv->r_frag = NULL; 229 recv->r_frag = NULL;
93 } 230 }
94} 231}
@@ -101,6 +238,61 @@ void rds_ib_recv_clear_ring(struct rds_ib_connection *ic)
101 rds_ib_recv_clear_one(ic, &ic->i_recvs[i]); 238 rds_ib_recv_clear_one(ic, &ic->i_recvs[i]);
102} 239}
103 240
241static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic)
242{
243 struct rds_ib_incoming *ibinc;
244 struct list_head *cache_item;
245 int avail_allocs;
246
247 cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs);
248 if (cache_item) {
249 ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry);
250 } else {
251 avail_allocs = atomic_add_unless(&rds_ib_allocation,
252 1, rds_ib_sysctl_max_recv_allocation);
253 if (!avail_allocs) {
254 rds_ib_stats_inc(s_ib_rx_alloc_limit);
255 return NULL;
256 }
257 ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
258 if (!ibinc) {
259 atomic_dec(&rds_ib_allocation);
260 return NULL;
261 }
262 }
263 INIT_LIST_HEAD(&ibinc->ii_frags);
264 rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
265
266 return ibinc;
267}
268
269static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic)
270{
271 struct rds_page_frag *frag;
272 struct list_head *cache_item;
273 int ret;
274
275 cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
276 if (cache_item) {
277 frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
278 } else {
279 frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT);
280 if (!frag)
281 return NULL;
282
283 ret = rds_page_remainder_alloc(&frag->f_sg,
284 RDS_FRAG_SIZE, GFP_NOWAIT);
285 if (ret) {
286 kmem_cache_free(rds_ib_frag_slab, frag);
287 return NULL;
288 }
289 }
290
291 INIT_LIST_HEAD(&frag->f_item);
292
293 return frag;
294}
295
104static int rds_ib_recv_refill_one(struct rds_connection *conn, 296static int rds_ib_recv_refill_one(struct rds_connection *conn,
105 struct rds_ib_recv_work *recv) 297 struct rds_ib_recv_work *recv)
106{ 298{
@@ -108,37 +300,25 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
108 struct ib_sge *sge; 300 struct ib_sge *sge;
109 int ret = -ENOMEM; 301 int ret = -ENOMEM;
110 302
303 if (!ic->i_cache_incs.ready)
304 rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
305 if (!ic->i_cache_frags.ready)
306 rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
307
111 /* 308 /*
112 * ibinc was taken from recv if recv contained the start of a message. 309 * ibinc was taken from recv if recv contained the start of a message.
113 * recvs that were continuations will still have this allocated. 310 * recvs that were continuations will still have this allocated.
114 */ 311 */
115 if (!recv->r_ibinc) { 312 if (!recv->r_ibinc) {
116 if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) { 313 recv->r_ibinc = rds_ib_refill_one_inc(ic);
117 rds_ib_stats_inc(s_ib_rx_alloc_limit); 314 if (!recv->r_ibinc)
118 goto out;
119 }
120 recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
121 if (!recv->r_ibinc) {
122 atomic_dec(&rds_ib_allocation);
123 goto out; 315 goto out;
124 }
125 INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
126 rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
127 } 316 }
128 317
129 WARN_ON(recv->r_frag); /* leak! */ 318 WARN_ON(recv->r_frag); /* leak! */
130 recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT); 319 recv->r_frag = rds_ib_refill_one_frag(ic);
131 if (!recv->r_frag) 320 if (!recv->r_frag)
132 goto out; 321 goto out;
133 INIT_LIST_HEAD(&recv->r_frag->f_item);
134 sg_init_table(&recv->r_frag->f_sg, 1);
135 ret = rds_page_remainder_alloc(&recv->r_frag->f_sg,
136 RDS_FRAG_SIZE, GFP_NOWAIT);
137 if (ret) {
138 kmem_cache_free(rds_ib_frag_slab, recv->r_frag);
139 recv->r_frag = NULL;
140 goto out;
141 }
142 322
143 ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 323 ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg,
144 1, DMA_FROM_DEVICE); 324 1, DMA_FROM_DEVICE);
@@ -160,8 +340,7 @@ out:
160/* 340/*
161 * This tries to allocate and post unused work requests after making sure that 341 * This tries to allocate and post unused work requests after making sure that
162 * they have all the allocations they need to queue received fragments into 342 * they have all the allocations they need to queue received fragments into
163 * sockets. The i_recv_mutex is held here so that ring_alloc and _unalloc 343 * sockets.
164 * pairs don't go unmatched.
165 * 344 *
166 * -1 is returned if posting fails due to temporary resource exhaustion. 345 * -1 is returned if posting fails due to temporary resource exhaustion.
167 */ 346 */
@@ -216,33 +395,71 @@ int rds_ib_recv_refill(struct rds_connection *conn, int prefill)
216 return ret; 395 return ret;
217} 396}
218 397
219static void rds_ib_inc_purge(struct rds_incoming *inc) 398/*
399 * We want to recycle several types of recv allocations, like incs and frags.
400 * To use this, the *_free() function passes in the ptr to a list_head within
401 * the recyclee, as well as the cache to put it on.
402 *
403 * First, we put the memory on a percpu list. When this reaches a certain size,
404 * We move it to an intermediate non-percpu list in a lockless manner, with some
405 * xchg/compxchg wizardry.
406 *
407 * N.B. Instead of a list_head as the anchor, we use a single pointer, which can
408 * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and
409 * list_empty() will return true with one element is actually present.
410 */
411static void rds_ib_recv_cache_put(struct list_head *new_item,
412 struct rds_ib_refill_cache *cache)
220{ 413{
221 struct rds_ib_incoming *ibinc; 414 unsigned long flags;
222 struct rds_page_frag *frag; 415 struct rds_ib_cache_head *chp;
223 struct rds_page_frag *pos; 416 struct list_head *old;
224 417
225 ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); 418 local_irq_save(flags);
226 rdsdebug("purging ibinc %p inc %p\n", ibinc, inc);
227 419
228 list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) { 420 chp = per_cpu_ptr(cache->percpu, smp_processor_id());
229 list_del_init(&frag->f_item); 421 if (!chp->first)
230 rds_ib_frag_free(frag); 422 INIT_LIST_HEAD(new_item);
231 } 423 else /* put on front */
424 list_add_tail(new_item, chp->first);
425 chp->first = new_item;
426 chp->count++;
427
428 if (chp->count < RDS_IB_RECYCLE_BATCH_COUNT)
429 goto end;
430
431 /*
432 * Return our per-cpu first list to the cache's xfer by atomically
433 * grabbing the current xfer list, appending it to our per-cpu list,
434 * and then atomically returning that entire list back to the
435 * cache's xfer list as long as it's still empty.
436 */
437 do {
438 old = xchg(&cache->xfer, NULL);
439 if (old)
440 list_splice_entire_tail(old, chp->first);
441 old = cmpxchg(&cache->xfer, NULL, chp->first);
442 } while (old);
443
444 chp->first = NULL;
445 chp->count = 0;
446end:
447 local_irq_restore(flags);
232} 448}
233 449
234void rds_ib_inc_free(struct rds_incoming *inc) 450static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache)
235{ 451{
236 struct rds_ib_incoming *ibinc; 452 struct list_head *head = cache->ready;
237 453
238 ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); 454 if (head) {
455 if (!list_empty(head)) {
456 cache->ready = head->next;
457 list_del_init(head);
458 } else
459 cache->ready = NULL;
460 }
239 461
240 rds_ib_inc_purge(inc); 462 return head;
241 rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
242 BUG_ON(!list_empty(&ibinc->ii_frags));
243 kmem_cache_free(rds_ib_incoming_slab, ibinc);
244 atomic_dec(&rds_ib_allocation);
245 BUG_ON(atomic_read(&rds_ib_allocation) < 0);
246} 463}
247 464
248int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov, 465int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
@@ -647,7 +864,7 @@ static void rds_ib_process_recv(struct rds_connection *conn,
647 * 864 *
648 * FIXME: Fold this into the code path below. 865 * FIXME: Fold this into the code path below.
649 */ 866 */
650 rds_ib_frag_free(recv->r_frag); 867 rds_ib_frag_free(ic, recv->r_frag);
651 recv->r_frag = NULL; 868 recv->r_frag = NULL;
652 return; 869 return;
653 } 870 }