aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/message.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/message.c')
-rw-r--r--net/rds/message.c142
1 files changed, 64 insertions, 78 deletions
diff --git a/net/rds/message.c b/net/rds/message.c
index 9a1d67e001ba..a84545dae370 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -34,9 +34,6 @@
34#include <linux/slab.h> 34#include <linux/slab.h>
35 35
36#include "rds.h" 36#include "rds.h"
37#include "rdma.h"
38
39static DECLARE_WAIT_QUEUE_HEAD(rds_message_flush_waitq);
40 37
41static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = { 38static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = {
42[RDS_EXTHDR_NONE] = 0, 39[RDS_EXTHDR_NONE] = 0,
@@ -63,29 +60,31 @@ static void rds_message_purge(struct rds_message *rm)
63 if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags))) 60 if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
64 return; 61 return;
65 62
66 for (i = 0; i < rm->m_nents; i++) { 63 for (i = 0; i < rm->data.op_nents; i++) {
67 rdsdebug("putting data page %p\n", (void *)sg_page(&rm->m_sg[i])); 64 rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
68 /* XXX will have to put_page for page refs */ 65 /* XXX will have to put_page for page refs */
69 __free_page(sg_page(&rm->m_sg[i])); 66 __free_page(sg_page(&rm->data.op_sg[i]));
70 } 67 }
71 rm->m_nents = 0; 68 rm->data.op_nents = 0;
72 69
73 if (rm->m_rdma_op) 70 if (rm->rdma.op_active)
74 rds_rdma_free_op(rm->m_rdma_op); 71 rds_rdma_free_op(&rm->rdma);
75 if (rm->m_rdma_mr) 72 if (rm->rdma.op_rdma_mr)
76 rds_mr_put(rm->m_rdma_mr); 73 rds_mr_put(rm->rdma.op_rdma_mr);
77}
78 74
79void rds_message_inc_purge(struct rds_incoming *inc) 75 if (rm->atomic.op_active)
80{ 76 rds_atomic_free_op(&rm->atomic);
81 struct rds_message *rm = container_of(inc, struct rds_message, m_inc); 77 if (rm->atomic.op_rdma_mr)
82 rds_message_purge(rm); 78 rds_mr_put(rm->atomic.op_rdma_mr);
83} 79}
84 80
85void rds_message_put(struct rds_message *rm) 81void rds_message_put(struct rds_message *rm)
86{ 82{
87 rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount)); 83 rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount));
88 84 if (atomic_read(&rm->m_refcount) == 0) {
85printk(KERN_CRIT "danger refcount zero on %p\n", rm);
86WARN_ON(1);
87 }
89 if (atomic_dec_and_test(&rm->m_refcount)) { 88 if (atomic_dec_and_test(&rm->m_refcount)) {
90 BUG_ON(!list_empty(&rm->m_sock_item)); 89 BUG_ON(!list_empty(&rm->m_sock_item));
91 BUG_ON(!list_empty(&rm->m_conn_item)); 90 BUG_ON(!list_empty(&rm->m_conn_item));
@@ -96,12 +95,6 @@ void rds_message_put(struct rds_message *rm)
96} 95}
97EXPORT_SYMBOL_GPL(rds_message_put); 96EXPORT_SYMBOL_GPL(rds_message_put);
98 97
99void rds_message_inc_free(struct rds_incoming *inc)
100{
101 struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
102 rds_message_put(rm);
103}
104
105void rds_message_populate_header(struct rds_header *hdr, __be16 sport, 98void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
106 __be16 dport, u64 seq) 99 __be16 dport, u64 seq)
107{ 100{
@@ -113,8 +106,8 @@ void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
113} 106}
114EXPORT_SYMBOL_GPL(rds_message_populate_header); 107EXPORT_SYMBOL_GPL(rds_message_populate_header);
115 108
116int rds_message_add_extension(struct rds_header *hdr, 109int rds_message_add_extension(struct rds_header *hdr, unsigned int type,
117 unsigned int type, const void *data, unsigned int len) 110 const void *data, unsigned int len)
118{ 111{
119 unsigned int ext_len = sizeof(u8) + len; 112 unsigned int ext_len = sizeof(u8) + len;
120 unsigned char *dst; 113 unsigned char *dst;
@@ -184,26 +177,6 @@ none:
184 return RDS_EXTHDR_NONE; 177 return RDS_EXTHDR_NONE;
185} 178}
186 179
187int rds_message_add_version_extension(struct rds_header *hdr, unsigned int version)
188{
189 struct rds_ext_header_version ext_hdr;
190
191 ext_hdr.h_version = cpu_to_be32(version);
192 return rds_message_add_extension(hdr, RDS_EXTHDR_VERSION, &ext_hdr, sizeof(ext_hdr));
193}
194
195int rds_message_get_version_extension(struct rds_header *hdr, unsigned int *version)
196{
197 struct rds_ext_header_version ext_hdr;
198 unsigned int pos = 0, len = sizeof(ext_hdr);
199
200 /* We assume the version extension is the only one present */
201 if (rds_message_next_extension(hdr, &pos, &ext_hdr, &len) != RDS_EXTHDR_VERSION)
202 return 0;
203 *version = be32_to_cpu(ext_hdr.h_version);
204 return 1;
205}
206
207int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset) 180int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset)
208{ 181{
209 struct rds_ext_header_rdma_dest ext_hdr; 182 struct rds_ext_header_rdma_dest ext_hdr;
@@ -214,41 +187,68 @@ int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 o
214} 187}
215EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension); 188EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension);
216 189
217struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp) 190/*
191 * Each rds_message is allocated with extra space for the scatterlist entries
192 * rds ops will need. This is to minimize memory allocation count. Then, each rds op
193 * can grab SGs when initializing its part of the rds_message.
194 */
195struct rds_message *rds_message_alloc(unsigned int extra_len, gfp_t gfp)
218{ 196{
219 struct rds_message *rm; 197 struct rds_message *rm;
220 198
221 rm = kzalloc(sizeof(struct rds_message) + 199 rm = kzalloc(sizeof(struct rds_message) + extra_len, gfp);
222 (nents * sizeof(struct scatterlist)), gfp);
223 if (!rm) 200 if (!rm)
224 goto out; 201 goto out;
225 202
226 if (nents) 203 rm->m_used_sgs = 0;
227 sg_init_table(rm->m_sg, nents); 204 rm->m_total_sgs = extra_len / sizeof(struct scatterlist);
205
228 atomic_set(&rm->m_refcount, 1); 206 atomic_set(&rm->m_refcount, 1);
229 INIT_LIST_HEAD(&rm->m_sock_item); 207 INIT_LIST_HEAD(&rm->m_sock_item);
230 INIT_LIST_HEAD(&rm->m_conn_item); 208 INIT_LIST_HEAD(&rm->m_conn_item);
231 spin_lock_init(&rm->m_rs_lock); 209 spin_lock_init(&rm->m_rs_lock);
210 init_waitqueue_head(&rm->m_flush_wait);
232 211
233out: 212out:
234 return rm; 213 return rm;
235} 214}
236 215
216/*
217 * RDS ops use this to grab SG entries from the rm's sg pool.
218 */
219struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents)
220{
221 struct scatterlist *sg_first = (struct scatterlist *) &rm[1];
222 struct scatterlist *sg_ret;
223
224 WARN_ON(rm->m_used_sgs + nents > rm->m_total_sgs);
225 WARN_ON(!nents);
226
227 sg_ret = &sg_first[rm->m_used_sgs];
228 sg_init_table(sg_ret, nents);
229 rm->m_used_sgs += nents;
230
231 return sg_ret;
232}
233
237struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len) 234struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len)
238{ 235{
239 struct rds_message *rm; 236 struct rds_message *rm;
240 unsigned int i; 237 unsigned int i;
238 int num_sgs = ceil(total_len, PAGE_SIZE);
239 int extra_bytes = num_sgs * sizeof(struct scatterlist);
241 240
242 rm = rds_message_alloc(ceil(total_len, PAGE_SIZE), GFP_KERNEL); 241 rm = rds_message_alloc(extra_bytes, GFP_NOWAIT);
243 if (rm == NULL) 242 if (!rm)
244 return ERR_PTR(-ENOMEM); 243 return ERR_PTR(-ENOMEM);
245 244
246 set_bit(RDS_MSG_PAGEVEC, &rm->m_flags); 245 set_bit(RDS_MSG_PAGEVEC, &rm->m_flags);
247 rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len); 246 rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len);
248 rm->m_nents = ceil(total_len, PAGE_SIZE); 247 rm->data.op_nents = ceil(total_len, PAGE_SIZE);
248 rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
249 249
250 for (i = 0; i < rm->m_nents; ++i) { 250 for (i = 0; i < rm->data.op_nents; ++i) {
251 sg_set_page(&rm->m_sg[i], 251 sg_set_page(&rm->data.op_sg[i],
252 virt_to_page(page_addrs[i]), 252 virt_to_page(page_addrs[i]),
253 PAGE_SIZE, 0); 253 PAGE_SIZE, 0);
254 } 254 }
@@ -256,40 +256,33 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
256 return rm; 256 return rm;
257} 257}
258 258
259struct rds_message *rds_message_copy_from_user(struct iovec *first_iov, 259int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
260 size_t total_len) 260 size_t total_len)
261{ 261{
262 unsigned long to_copy; 262 unsigned long to_copy;
263 unsigned long iov_off; 263 unsigned long iov_off;
264 unsigned long sg_off; 264 unsigned long sg_off;
265 struct rds_message *rm;
266 struct iovec *iov; 265 struct iovec *iov;
267 struct scatterlist *sg; 266 struct scatterlist *sg;
268 int ret; 267 int ret = 0;
269
270 rm = rds_message_alloc(ceil(total_len, PAGE_SIZE), GFP_KERNEL);
271 if (rm == NULL) {
272 ret = -ENOMEM;
273 goto out;
274 }
275 268
276 rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len); 269 rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len);
277 270
278 /* 271 /*
279 * now allocate and copy in the data payload. 272 * now allocate and copy in the data payload.
280 */ 273 */
281 sg = rm->m_sg; 274 sg = rm->data.op_sg;
282 iov = first_iov; 275 iov = first_iov;
283 iov_off = 0; 276 iov_off = 0;
284 sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ 277 sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
285 278
286 while (total_len) { 279 while (total_len) {
287 if (sg_page(sg) == NULL) { 280 if (!sg_page(sg)) {
288 ret = rds_page_remainder_alloc(sg, total_len, 281 ret = rds_page_remainder_alloc(sg, total_len,
289 GFP_HIGHUSER); 282 GFP_HIGHUSER);
290 if (ret) 283 if (ret)
291 goto out; 284 goto out;
292 rm->m_nents++; 285 rm->data.op_nents++;
293 sg_off = 0; 286 sg_off = 0;
294 } 287 }
295 288
@@ -320,14 +313,8 @@ struct rds_message *rds_message_copy_from_user(struct iovec *first_iov,
320 sg++; 313 sg++;
321 } 314 }
322 315
323 ret = 0;
324out: 316out:
325 if (ret) { 317 return ret;
326 if (rm)
327 rds_message_put(rm);
328 rm = ERR_PTR(ret);
329 }
330 return rm;
331} 318}
332 319
333int rds_message_inc_copy_to_user(struct rds_incoming *inc, 320int rds_message_inc_copy_to_user(struct rds_incoming *inc,
@@ -348,7 +335,7 @@ int rds_message_inc_copy_to_user(struct rds_incoming *inc,
348 335
349 iov = first_iov; 336 iov = first_iov;
350 iov_off = 0; 337 iov_off = 0;
351 sg = rm->m_sg; 338 sg = rm->data.op_sg;
352 vec_off = 0; 339 vec_off = 0;
353 copied = 0; 340 copied = 0;
354 341
@@ -394,15 +381,14 @@ int rds_message_inc_copy_to_user(struct rds_incoming *inc,
394 */ 381 */
395void rds_message_wait(struct rds_message *rm) 382void rds_message_wait(struct rds_message *rm)
396{ 383{
397 wait_event(rds_message_flush_waitq, 384 wait_event_interruptible(rm->m_flush_wait,
398 !test_bit(RDS_MSG_MAPPED, &rm->m_flags)); 385 !test_bit(RDS_MSG_MAPPED, &rm->m_flags));
399} 386}
400 387
401void rds_message_unmapped(struct rds_message *rm) 388void rds_message_unmapped(struct rds_message *rm)
402{ 389{
403 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 390 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
404 if (waitqueue_active(&rds_message_flush_waitq)) 391 wake_up_interruptible(&rm->m_flush_wait);
405 wake_up(&rds_message_flush_waitq);
406} 392}
407EXPORT_SYMBOL_GPL(rds_message_unmapped); 393EXPORT_SYMBOL_GPL(rds_message_unmapped);
408 394