aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/Kconfig26
-rw-r--r--net/rds/Makefile11
-rw-r--r--net/rds/af_rds.c8
-rw-r--r--net/rds/bind.c3
-rw-r--r--net/rds/cong.c1
-rw-r--r--net/rds/connection.c54
-rw-r--r--net/rds/ib.c7
-rw-r--r--net/rds/ib.h18
-rw-r--r--net/rds/ib_cm.c62
-rw-r--r--net/rds/ib_rdma.c12
-rw-r--r--net/rds/ib_recv.c53
-rw-r--r--net/rds/ib_stats.c2
-rw-r--r--net/rds/ib_sysctl.c12
-rw-r--r--net/rds/info.c3
-rw-r--r--net/rds/iw.c16
-rw-r--r--net/rds/iw.h1
-rw-r--r--net/rds/iw_rdma.c28
-rw-r--r--net/rds/iw_send.c2
-rw-r--r--net/rds/iw_stats.c2
-rw-r--r--net/rds/message.c6
-rw-r--r--net/rds/page.c1
-rw-r--r--net/rds/rdma_transport.c16
-rw-r--r--net/rds/rds.h9
-rw-r--r--net/rds/recv.c28
-rw-r--r--net/rds/send.c3
-rw-r--r--net/rds/stats.c6
-rw-r--r--net/rds/tcp.c320
-rw-r--r--net/rds/tcp.h93
-rw-r--r--net/rds/tcp_connect.c153
-rw-r--r--net/rds/tcp_listen.c199
-rw-r--r--net/rds/tcp_recv.c356
-rw-r--r--net/rds/tcp_send.c263
-rw-r--r--net/rds/tcp_stats.c74
-rw-r--r--net/rds/threads.c2
-rw-r--r--net/rds/transport.c31
35 files changed, 1744 insertions, 137 deletions
diff --git a/net/rds/Kconfig b/net/rds/Kconfig
index 796773b5df9b..ec753b3ae72a 100644
--- a/net/rds/Kconfig
+++ b/net/rds/Kconfig
@@ -1,14 +1,28 @@
1 1
2config RDS 2config RDS
3 tristate "Reliable Datagram Sockets (RDS) (EXPERIMENTAL)" 3 tristate "The RDS Protocol (EXPERIMENTAL)"
4 depends on INET && INFINIBAND_IPOIB && EXPERIMENTAL 4 depends on INET && EXPERIMENTAL
5 depends on INFINIBAND && INFINIBAND_ADDR_TRANS
6 ---help--- 5 ---help---
7 RDS provides reliable, sequenced delivery of datagrams 6 The RDS (Reliable Datagram Sockets) protocol provides reliable,
8 over Infiniband. 7 sequenced delivery of datagrams over Infiniband, iWARP,
8 or TCP.
9
10config RDS_RDMA
11 tristate "RDS over Infiniband and iWARP"
12 depends on RDS && INFINIBAND && INFINIBAND_ADDR_TRANS
13 ---help---
14 Allow RDS to use Infiniband and iWARP as a transport.
15 This transport supports RDMA operations.
16
17config RDS_TCP
18 tristate "RDS over TCP"
19 depends on RDS
20 ---help---
21 Allow RDS to use TCP as a transport.
22 This transport does not support RDMA operations.
9 23
10config RDS_DEBUG 24config RDS_DEBUG
11 bool "Debugging messages" 25 bool "RDS debugging messages"
12 depends on RDS 26 depends on RDS
13 default n 27 default n
14 28
diff --git a/net/rds/Makefile b/net/rds/Makefile
index 51f27585fa08..b46eca109688 100644
--- a/net/rds/Makefile
+++ b/net/rds/Makefile
@@ -1,13 +1,20 @@
1obj-$(CONFIG_RDS) += rds.o 1obj-$(CONFIG_RDS) += rds.o
2rds-y := af_rds.o bind.o cong.o connection.o info.o message.o \ 2rds-y := af_rds.o bind.o cong.o connection.o info.o message.o \
3 recv.o send.o stats.o sysctl.o threads.o transport.o \ 3 recv.o send.o stats.o sysctl.o threads.o transport.o \
4 loop.o page.o rdma.o \ 4 loop.o page.o rdma.o
5 rdma_transport.o \ 5
6obj-$(CONFIG_RDS_RDMA) += rds_rdma.o
7rds_rdma-objs := rdma_transport.o \
6 ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \ 8 ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \
7 ib_sysctl.o ib_rdma.o \ 9 ib_sysctl.o ib_rdma.o \
8 iw.o iw_cm.o iw_recv.o iw_ring.o iw_send.o iw_stats.o \ 10 iw.o iw_cm.o iw_recv.o iw_ring.o iw_send.o iw_stats.o \
9 iw_sysctl.o iw_rdma.o 11 iw_sysctl.o iw_rdma.o
10 12
13
14obj-$(CONFIG_RDS_TCP) += rds_tcp.o
15rds_tcp-objs := tcp.o tcp_connect.o tcp_listen.o tcp_recv.o \
16 tcp_send.o tcp_stats.o
17
11ifeq ($(CONFIG_RDS_DEBUG), y) 18ifeq ($(CONFIG_RDS_DEBUG), y)
12EXTRA_CFLAGS += -DDEBUG 19EXTRA_CFLAGS += -DDEBUG
13endif 20endif
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index b11e7e527864..108ed2e671c5 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -39,7 +39,6 @@
39 39
40#include "rds.h" 40#include "rds.h"
41#include "rdma.h" 41#include "rdma.h"
42#include "rdma_transport.h"
43 42
44/* this is just used for stats gathering :/ */ 43/* this is just used for stats gathering :/ */
45static DEFINE_SPINLOCK(rds_sock_lock); 44static DEFINE_SPINLOCK(rds_sock_lock);
@@ -509,7 +508,6 @@ out:
509 508
510static void __exit rds_exit(void) 509static void __exit rds_exit(void)
511{ 510{
512 rds_rdma_exit();
513 sock_unregister(rds_family_ops.family); 511 sock_unregister(rds_family_ops.family);
514 proto_unregister(&rds_proto); 512 proto_unregister(&rds_proto);
515 rds_conn_exit(); 513 rds_conn_exit();
@@ -549,14 +547,8 @@ static int __init rds_init(void)
549 rds_info_register_func(RDS_INFO_SOCKETS, rds_sock_info); 547 rds_info_register_func(RDS_INFO_SOCKETS, rds_sock_info);
550 rds_info_register_func(RDS_INFO_RECV_MESSAGES, rds_sock_inc_info); 548 rds_info_register_func(RDS_INFO_RECV_MESSAGES, rds_sock_inc_info);
551 549
552 /* ib/iwarp transports currently compiled-in */
553 ret = rds_rdma_init();
554 if (ret)
555 goto out_sock;
556 goto out; 550 goto out;
557 551
558out_sock:
559 sock_unregister(rds_family_ops.family);
560out_proto: 552out_proto:
561 proto_unregister(&rds_proto); 553 proto_unregister(&rds_proto);
562out_stats: 554out_stats:
diff --git a/net/rds/bind.c b/net/rds/bind.c
index c17cc39160ce..5d95fc007f1a 100644
--- a/net/rds/bind.c
+++ b/net/rds/bind.c
@@ -187,6 +187,9 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
187 if (trans == NULL) { 187 if (trans == NULL) {
188 ret = -EADDRNOTAVAIL; 188 ret = -EADDRNOTAVAIL;
189 rds_remove_bound(rs); 189 rds_remove_bound(rs);
190 if (printk_ratelimit())
191 printk(KERN_INFO "RDS: rds_bind() could not find a transport, "
192 "load rds_tcp or rds_rdma?\n");
190 goto out; 193 goto out;
191 } 194 }
192 195
diff --git a/net/rds/cong.c b/net/rds/cong.c
index 710e4599d76c..dd2711df640b 100644
--- a/net/rds/cong.c
+++ b/net/rds/cong.c
@@ -254,6 +254,7 @@ void rds_cong_map_updated(struct rds_cong_map *map, uint64_t portmask)
254 read_unlock_irqrestore(&rds_cong_monitor_lock, flags); 254 read_unlock_irqrestore(&rds_cong_monitor_lock, flags);
255 } 255 }
256} 256}
257EXPORT_SYMBOL_GPL(rds_cong_map_updated);
257 258
258int rds_cong_updated_since(unsigned long *recent) 259int rds_cong_updated_since(unsigned long *recent)
259{ 260{
diff --git a/net/rds/connection.c b/net/rds/connection.c
index d14445c48304..cc8b568c0c84 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -126,7 +126,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
126 struct rds_transport *trans, gfp_t gfp, 126 struct rds_transport *trans, gfp_t gfp,
127 int is_outgoing) 127 int is_outgoing)
128{ 128{
129 struct rds_connection *conn, *tmp, *parent = NULL; 129 struct rds_connection *conn, *parent = NULL;
130 struct hlist_head *head = rds_conn_bucket(laddr, faddr); 130 struct hlist_head *head = rds_conn_bucket(laddr, faddr);
131 unsigned long flags; 131 unsigned long flags;
132 int ret; 132 int ret;
@@ -155,7 +155,6 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
155 } 155 }
156 156
157 INIT_HLIST_NODE(&conn->c_hash_node); 157 INIT_HLIST_NODE(&conn->c_hash_node);
158 conn->c_version = RDS_PROTOCOL_3_0;
159 conn->c_laddr = laddr; 158 conn->c_laddr = laddr;
160 conn->c_faddr = faddr; 159 conn->c_faddr = faddr;
161 spin_lock_init(&conn->c_lock); 160 spin_lock_init(&conn->c_lock);
@@ -211,26 +210,40 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
211 trans->t_name ? trans->t_name : "[unknown]", 210 trans->t_name ? trans->t_name : "[unknown]",
212 is_outgoing ? "(outgoing)" : ""); 211 is_outgoing ? "(outgoing)" : "");
213 212
213 /*
214 * Since we ran without holding the conn lock, someone could
215 * have created the same conn (either normal or passive) in the
216 * interim. We check while holding the lock. If we won, we complete
217 * init and return our conn. If we lost, we rollback and return the
218 * other one.
219 */
214 spin_lock_irqsave(&rds_conn_lock, flags); 220 spin_lock_irqsave(&rds_conn_lock, flags);
215 if (parent == NULL) { 221 if (parent) {
216 tmp = rds_conn_lookup(head, laddr, faddr, trans); 222 /* Creating passive conn */
217 if (tmp == NULL) 223 if (parent->c_passive) {
218 hlist_add_head(&conn->c_hash_node, head); 224 trans->conn_free(conn->c_transport_data);
219 } else { 225 kmem_cache_free(rds_conn_slab, conn);
220 tmp = parent->c_passive; 226 conn = parent->c_passive;
221 if (!tmp) 227 } else {
222 parent->c_passive = conn; 228 parent->c_passive = conn;
223 } 229 rds_cong_add_conn(conn);
224 230 rds_conn_count++;
225 if (tmp) { 231 }
226 trans->conn_free(conn->c_transport_data);
227 kmem_cache_free(rds_conn_slab, conn);
228 conn = tmp;
229 } else { 232 } else {
230 rds_cong_add_conn(conn); 233 /* Creating normal conn */
231 rds_conn_count++; 234 struct rds_connection *found;
235
236 found = rds_conn_lookup(head, laddr, faddr, trans);
237 if (found) {
238 trans->conn_free(conn->c_transport_data);
239 kmem_cache_free(rds_conn_slab, conn);
240 conn = found;
241 } else {
242 hlist_add_head(&conn->c_hash_node, head);
243 rds_cong_add_conn(conn);
244 rds_conn_count++;
245 }
232 } 246 }
233
234 spin_unlock_irqrestore(&rds_conn_lock, flags); 247 spin_unlock_irqrestore(&rds_conn_lock, flags);
235 248
236out: 249out:
@@ -242,12 +255,14 @@ struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr,
242{ 255{
243 return __rds_conn_create(laddr, faddr, trans, gfp, 0); 256 return __rds_conn_create(laddr, faddr, trans, gfp, 0);
244} 257}
258EXPORT_SYMBOL_GPL(rds_conn_create);
245 259
246struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, 260struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
247 struct rds_transport *trans, gfp_t gfp) 261 struct rds_transport *trans, gfp_t gfp)
248{ 262{
249 return __rds_conn_create(laddr, faddr, trans, gfp, 1); 263 return __rds_conn_create(laddr, faddr, trans, gfp, 1);
250} 264}
265EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
251 266
252void rds_conn_destroy(struct rds_connection *conn) 267void rds_conn_destroy(struct rds_connection *conn)
253{ 268{
@@ -290,6 +305,7 @@ void rds_conn_destroy(struct rds_connection *conn)
290 305
291 rds_conn_count--; 306 rds_conn_count--;
292} 307}
308EXPORT_SYMBOL_GPL(rds_conn_destroy);
293 309
294static void rds_conn_message_info(struct socket *sock, unsigned int len, 310static void rds_conn_message_info(struct socket *sock, unsigned int len,
295 struct rds_info_iterator *iter, 311 struct rds_info_iterator *iter,
@@ -393,6 +409,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
393 409
394 spin_unlock_irqrestore(&rds_conn_lock, flags); 410 spin_unlock_irqrestore(&rds_conn_lock, flags);
395} 411}
412EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
396 413
397static int rds_conn_info_visitor(struct rds_connection *conn, 414static int rds_conn_info_visitor(struct rds_connection *conn,
398 void *buffer) 415 void *buffer)
@@ -468,6 +485,7 @@ void rds_conn_drop(struct rds_connection *conn)
468 atomic_set(&conn->c_state, RDS_CONN_ERROR); 485 atomic_set(&conn->c_state, RDS_CONN_ERROR);
469 queue_work(rds_wq, &conn->c_down_w); 486 queue_work(rds_wq, &conn->c_down_w);
470} 487}
488EXPORT_SYMBOL_GPL(rds_conn_drop);
471 489
472/* 490/*
473 * An error occurred on the connection 491 * An error occurred on the connection
diff --git a/net/rds/ib.c b/net/rds/ib.c
index b9bcd32431e1..536ebe5d3f6b 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -43,11 +43,14 @@
43 43
44unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE; 44unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
45unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */ 45unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
46unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT;
46 47
47module_param(fmr_pool_size, int, 0444); 48module_param(fmr_pool_size, int, 0444);
48MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA"); 49MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
49module_param(fmr_message_size, int, 0444); 50module_param(fmr_message_size, int, 0444);
50MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer"); 51MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
52module_param(rds_ib_retry_count, int, 0444);
53MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
51 54
52struct list_head rds_ib_devices; 55struct list_head rds_ib_devices;
53 56
@@ -82,9 +85,6 @@ void rds_ib_add_one(struct ib_device *device)
82 rds_ibdev->max_wrs = dev_attr->max_qp_wr; 85 rds_ibdev->max_wrs = dev_attr->max_qp_wr;
83 rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); 86 rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
84 87
85 rds_ibdev->fmr_page_shift = max(9, ffs(dev_attr->page_size_cap) - 1);
86 rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
87 rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size - 1);
88 rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32; 88 rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
89 rds_ibdev->max_fmrs = dev_attr->max_fmr ? 89 rds_ibdev->max_fmrs = dev_attr->max_fmr ?
90 min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) : 90 min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
@@ -282,6 +282,7 @@ struct rds_transport rds_ib_transport = {
282 .flush_mrs = rds_ib_flush_mrs, 282 .flush_mrs = rds_ib_flush_mrs,
283 .t_owner = THIS_MODULE, 283 .t_owner = THIS_MODULE,
284 .t_name = "infiniband", 284 .t_name = "infiniband",
285 .t_type = RDS_TRANS_IB
285}; 286};
286 287
287int __init rds_ib_init(void) 288int __init rds_ib_init(void)
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 455ae73047fe..1378b854cac0 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -15,6 +15,8 @@
15#define RDS_IB_DEFAULT_RECV_WR 1024 15#define RDS_IB_DEFAULT_RECV_WR 1024
16#define RDS_IB_DEFAULT_SEND_WR 256 16#define RDS_IB_DEFAULT_SEND_WR 256
17 17
18#define RDS_IB_DEFAULT_RETRY_COUNT 2
19
18#define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */ 20#define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */
19 21
20extern struct list_head rds_ib_devices; 22extern struct list_head rds_ib_devices;
@@ -157,9 +159,6 @@ struct rds_ib_device {
157 struct ib_pd *pd; 159 struct ib_pd *pd;
158 struct ib_mr *mr; 160 struct ib_mr *mr;
159 struct rds_ib_mr_pool *mr_pool; 161 struct rds_ib_mr_pool *mr_pool;
160 int fmr_page_shift;
161 int fmr_page_size;
162 u64 fmr_page_mask;
163 unsigned int fmr_max_remaps; 162 unsigned int fmr_max_remaps;
164 unsigned int max_fmrs; 163 unsigned int max_fmrs;
165 int max_sge; 164 int max_sge;
@@ -247,6 +246,7 @@ extern struct ib_client rds_ib_client;
247 246
248extern unsigned int fmr_pool_size; 247extern unsigned int fmr_pool_size;
249extern unsigned int fmr_message_size; 248extern unsigned int fmr_message_size;
249extern unsigned int rds_ib_retry_count;
250 250
251extern spinlock_t ib_nodev_conns_lock; 251extern spinlock_t ib_nodev_conns_lock;
252extern struct list_head ib_nodev_conns; 252extern struct list_head ib_nodev_conns;
@@ -355,17 +355,25 @@ extern ctl_table rds_ib_sysctl_table[];
355/* 355/*
356 * Helper functions for getting/setting the header and data SGEs in 356 * Helper functions for getting/setting the header and data SGEs in
357 * RDS packets (not RDMA) 357 * RDS packets (not RDMA)
358 *
359 * From version 3.1 onwards, header is in front of data in the sge.
358 */ 360 */
359static inline struct ib_sge * 361static inline struct ib_sge *
360rds_ib_header_sge(struct rds_ib_connection *ic, struct ib_sge *sge) 362rds_ib_header_sge(struct rds_ib_connection *ic, struct ib_sge *sge)
361{ 363{
362 return &sge[0]; 364 if (ic->conn->c_version > RDS_PROTOCOL_3_0)
365 return &sge[0];
366 else
367 return &sge[1];
363} 368}
364 369
365static inline struct ib_sge * 370static inline struct ib_sge *
366rds_ib_data_sge(struct rds_ib_connection *ic, struct ib_sge *sge) 371rds_ib_data_sge(struct rds_ib_connection *ic, struct ib_sge *sge)
367{ 372{
368 return &sge[1]; 373 if (ic->conn->c_version > RDS_PROTOCOL_3_0)
374 return &sge[1];
375 else
376 return &sge[0];
369} 377}
370 378
371#endif 379#endif
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index f8e40e1a6038..c2d372f13dbb 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -98,21 +98,34 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
98 struct ib_qp_attr qp_attr; 98 struct ib_qp_attr qp_attr;
99 int err; 99 int err;
100 100
101 if (event->param.conn.private_data_len) { 101 if (event->param.conn.private_data_len >= sizeof(*dp)) {
102 dp = event->param.conn.private_data; 102 dp = event->param.conn.private_data;
103 103
104 rds_ib_set_protocol(conn, 104 /* make sure it isn't empty data */
105 if (dp->dp_protocol_major) {
106 rds_ib_set_protocol(conn,
105 RDS_PROTOCOL(dp->dp_protocol_major, 107 RDS_PROTOCOL(dp->dp_protocol_major,
106 dp->dp_protocol_minor)); 108 dp->dp_protocol_minor));
107 rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit)); 109 rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
110 }
108 } 111 }
109 112
110 printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n", 113 printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
111 &conn->c_laddr, 114 &conn->c_faddr,
112 RDS_PROTOCOL_MAJOR(conn->c_version), 115 RDS_PROTOCOL_MAJOR(conn->c_version),
113 RDS_PROTOCOL_MINOR(conn->c_version), 116 RDS_PROTOCOL_MINOR(conn->c_version),
114 ic->i_flowctl ? ", flow control" : ""); 117 ic->i_flowctl ? ", flow control" : "");
115 118
119 /*
120 * Init rings and fill recv. this needs to wait until protocol negotiation
121 * is complete, since ring layout is different from 3.0 to 3.1.
122 */
123 rds_ib_send_init_ring(ic);
124 rds_ib_recv_init_ring(ic);
125 /* Post receive buffers - as a side effect, this will update
126 * the posted credit count. */
127 rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
128
116 /* Tune RNR behavior */ 129 /* Tune RNR behavior */
117 rds_ib_tune_rnr(ic, &qp_attr); 130 rds_ib_tune_rnr(ic, &qp_attr);
118 131
@@ -145,7 +158,7 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
145 /* XXX tune these? */ 158 /* XXX tune these? */
146 conn_param->responder_resources = 1; 159 conn_param->responder_resources = 1;
147 conn_param->initiator_depth = 1; 160 conn_param->initiator_depth = 1;
148 conn_param->retry_count = 7; 161 conn_param->retry_count = min_t(unsigned int, rds_ib_retry_count, 7);
149 conn_param->rnr_retry_count = 7; 162 conn_param->rnr_retry_count = 7;
150 163
151 if (dp) { 164 if (dp) {
@@ -190,9 +203,9 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
190 rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST); 203 rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
191 break; 204 break;
192 default: 205 default:
193 printk(KERN_WARNING "RDS/ib: unhandled QP event %u " 206 rds_ib_conn_error(conn, "RDS/IB: Fatal QP Event %u "
194 "on connection to %pI4\n", event->event, 207 "- connection %pI4->%pI4, reconnecting\n",
195 &conn->c_faddr); 208 event->event, &conn->c_laddr, &conn->c_faddr);
196 break; 209 break;
197 } 210 }
198} 211}
@@ -321,7 +334,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
321 rdsdebug("send allocation failed\n"); 334 rdsdebug("send allocation failed\n");
322 goto out; 335 goto out;
323 } 336 }
324 rds_ib_send_init_ring(ic); 337 memset(ic->i_sends, 0, ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
325 338
326 ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work)); 339 ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
327 if (ic->i_recvs == NULL) { 340 if (ic->i_recvs == NULL) {
@@ -329,14 +342,10 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
329 rdsdebug("recv allocation failed\n"); 342 rdsdebug("recv allocation failed\n");
330 goto out; 343 goto out;
331 } 344 }
345 memset(ic->i_recvs, 0, ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
332 346
333 rds_ib_recv_init_ring(ic);
334 rds_ib_recv_init_ack(ic); 347 rds_ib_recv_init_ack(ic);
335 348
336 /* Post receive buffers - as a side effect, this will update
337 * the posted credit count. */
338 rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
339
340 rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr, 349 rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
341 ic->i_send_cq, ic->i_recv_cq); 350 ic->i_send_cq, ic->i_recv_cq);
342 351
@@ -344,19 +353,32 @@ out:
344 return ret; 353 return ret;
345} 354}
346 355
347static u32 rds_ib_protocol_compatible(const struct rds_ib_connect_private *dp) 356static u32 rds_ib_protocol_compatible(struct rdma_cm_event *event)
348{ 357{
358 const struct rds_ib_connect_private *dp = event->param.conn.private_data;
349 u16 common; 359 u16 common;
350 u32 version = 0; 360 u32 version = 0;
351 361
352 /* rdma_cm private data is odd - when there is any private data in the 362 /*
363 * rdma_cm private data is odd - when there is any private data in the
353 * request, we will be given a pretty large buffer without telling us the 364 * request, we will be given a pretty large buffer without telling us the
354 * original size. The only way to tell the difference is by looking at 365 * original size. The only way to tell the difference is by looking at
355 * the contents, which are initialized to zero. 366 * the contents, which are initialized to zero.
356 * If the protocol version fields aren't set, this is a connection attempt 367 * If the protocol version fields aren't set, this is a connection attempt
357 * from an older version. This could could be 3.0 or 2.0 - we can't tell. 368 * from an older version. This could could be 3.0 or 2.0 - we can't tell.
358 * We really should have changed this for OFED 1.3 :-( */ 369 * We really should have changed this for OFED 1.3 :-(
359 if (dp->dp_protocol_major == 0) 370 */
371
372 /* Be paranoid. RDS always has privdata */
373 if (!event->param.conn.private_data_len) {
374 printk(KERN_NOTICE "RDS incoming connection has no private data, "
375 "rejecting\n");
376 return 0;
377 }
378
379 /* Even if len is crap *now* I still want to check it. -ASG */
380 if (event->param.conn.private_data_len < sizeof (*dp)
381 || dp->dp_protocol_major == 0)
360 return RDS_PROTOCOL_3_0; 382 return RDS_PROTOCOL_3_0;
361 383
362 common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IB_SUPPORTED_PROTOCOLS; 384 common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IB_SUPPORTED_PROTOCOLS;
@@ -388,7 +410,7 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
388 int err, destroy = 1; 410 int err, destroy = 1;
389 411
390 /* Check whether the remote protocol version matches ours. */ 412 /* Check whether the remote protocol version matches ours. */
391 version = rds_ib_protocol_compatible(dp); 413 version = rds_ib_protocol_compatible(event);
392 if (!version) 414 if (!version)
393 goto out; 415 goto out;
394 416
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 81033af93020..ef3ab5b7283e 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -211,7 +211,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
211 211
212 pool->fmr_attr.max_pages = fmr_message_size; 212 pool->fmr_attr.max_pages = fmr_message_size;
213 pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; 213 pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
214 pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift; 214 pool->fmr_attr.page_shift = PAGE_SHIFT;
215 pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4; 215 pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
216 216
217 /* We never allow more than max_items MRs to be allocated. 217 /* We never allow more than max_items MRs to be allocated.
@@ -349,13 +349,13 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
349 unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]); 349 unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
350 u64 dma_addr = ib_sg_dma_address(dev, &scat[i]); 350 u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
351 351
352 if (dma_addr & ~rds_ibdev->fmr_page_mask) { 352 if (dma_addr & ~PAGE_MASK) {
353 if (i > 0) 353 if (i > 0)
354 return -EINVAL; 354 return -EINVAL;
355 else 355 else
356 ++page_cnt; 356 ++page_cnt;
357 } 357 }
358 if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) { 358 if ((dma_addr + dma_len) & ~PAGE_MASK) {
359 if (i < sg_dma_len - 1) 359 if (i < sg_dma_len - 1)
360 return -EINVAL; 360 return -EINVAL;
361 else 361 else
@@ -365,7 +365,7 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
365 len += dma_len; 365 len += dma_len;
366 } 366 }
367 367
368 page_cnt += len >> rds_ibdev->fmr_page_shift; 368 page_cnt += len >> PAGE_SHIFT;
369 if (page_cnt > fmr_message_size) 369 if (page_cnt > fmr_message_size)
370 return -EINVAL; 370 return -EINVAL;
371 371
@@ -378,9 +378,9 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
378 unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]); 378 unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
379 u64 dma_addr = ib_sg_dma_address(dev, &scat[i]); 379 u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
380 380
381 for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size) 381 for (j = 0; j < dma_len; j += PAGE_SIZE)
382 dma_pages[page_cnt++] = 382 dma_pages[page_cnt++] =
383 (dma_addr & rds_ibdev->fmr_page_mask) + j; 383 (dma_addr & PAGE_MASK) + j;
384 } 384 }
385 385
386 ret = ib_map_phys_fmr(ibmr->fmr, 386 ret = ib_map_phys_fmr(ibmr->fmr,
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 5709bad28329..cd7a6cfcab03 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -555,6 +555,47 @@ u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic)
555 return rds_ib_get_ack(ic); 555 return rds_ib_get_ack(ic);
556} 556}
557 557
558static struct rds_header *rds_ib_get_header(struct rds_connection *conn,
559 struct rds_ib_recv_work *recv,
560 u32 data_len)
561{
562 struct rds_ib_connection *ic = conn->c_transport_data;
563 void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs];
564 void *addr;
565 u32 misplaced_hdr_bytes;
566
567 /*
568 * Support header at the front (RDS 3.1+) as well as header-at-end.
569 *
570 * Cases:
571 * 1) header all in header buff (great!)
572 * 2) header all in data page (copy all to header buff)
573 * 3) header split across hdr buf + data page
574 * (move bit in hdr buff to end before copying other bit from data page)
575 */
576 if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDS_FRAG_SIZE)
577 return hdr_buff;
578
579 if (data_len <= (RDS_FRAG_SIZE - sizeof(struct rds_header))) {
580 addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
581 memcpy(hdr_buff,
582 addr + recv->r_frag->f_offset + data_len,
583 sizeof(struct rds_header));
584 kunmap_atomic(addr, KM_SOFTIRQ0);
585 return hdr_buff;
586 }
587
588 misplaced_hdr_bytes = (sizeof(struct rds_header) - (RDS_FRAG_SIZE - data_len));
589
590 memmove(hdr_buff + misplaced_hdr_bytes, hdr_buff, misplaced_hdr_bytes);
591
592 addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
593 memcpy(hdr_buff, addr + recv->r_frag->f_offset + data_len,
594 sizeof(struct rds_header) - misplaced_hdr_bytes);
595 kunmap_atomic(addr, KM_SOFTIRQ0);
596 return hdr_buff;
597}
598
558/* 599/*
559 * It's kind of lame that we're copying from the posted receive pages into 600 * It's kind of lame that we're copying from the posted receive pages into
560 * long-lived bitmaps. We could have posted the bitmaps and rdma written into 601 * long-lived bitmaps. We could have posted the bitmaps and rdma written into
@@ -645,7 +686,7 @@ struct rds_ib_ack_state {
645}; 686};
646 687
647static void rds_ib_process_recv(struct rds_connection *conn, 688static void rds_ib_process_recv(struct rds_connection *conn,
648 struct rds_ib_recv_work *recv, u32 byte_len, 689 struct rds_ib_recv_work *recv, u32 data_len,
649 struct rds_ib_ack_state *state) 690 struct rds_ib_ack_state *state)
650{ 691{
651 struct rds_ib_connection *ic = conn->c_transport_data; 692 struct rds_ib_connection *ic = conn->c_transport_data;
@@ -655,9 +696,9 @@ static void rds_ib_process_recv(struct rds_connection *conn,
655 /* XXX shut down the connection if port 0,0 are seen? */ 696 /* XXX shut down the connection if port 0,0 are seen? */
656 697
657 rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv, 698 rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv,
658 byte_len); 699 data_len);
659 700
660 if (byte_len < sizeof(struct rds_header)) { 701 if (data_len < sizeof(struct rds_header)) {
661 rds_ib_conn_error(conn, "incoming message " 702 rds_ib_conn_error(conn, "incoming message "
662 "from %pI4 didn't inclue a " 703 "from %pI4 didn't inclue a "
663 "header, disconnecting and " 704 "header, disconnecting and "
@@ -665,9 +706,9 @@ static void rds_ib_process_recv(struct rds_connection *conn,
665 &conn->c_faddr); 706 &conn->c_faddr);
666 return; 707 return;
667 } 708 }
668 byte_len -= sizeof(struct rds_header); 709 data_len -= sizeof(struct rds_header);
669 710
670 ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs]; 711 ihdr = rds_ib_get_header(conn, recv, data_len);
671 712
672 /* Validate the checksum. */ 713 /* Validate the checksum. */
673 if (!rds_message_verify_checksum(ihdr)) { 714 if (!rds_message_verify_checksum(ihdr)) {
@@ -687,7 +728,7 @@ static void rds_ib_process_recv(struct rds_connection *conn,
687 if (ihdr->h_credit) 728 if (ihdr->h_credit)
688 rds_ib_send_add_credits(conn, ihdr->h_credit); 729 rds_ib_send_add_credits(conn, ihdr->h_credit);
689 730
690 if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) { 731 if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
691 /* This is an ACK-only packet. The fact that it gets 732 /* This is an ACK-only packet. The fact that it gets
692 * special treatment here is that historically, ACKs 733 * special treatment here is that historically, ACKs
693 * were rather special beasts. 734 * were rather special beasts.
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index 02e3e3d50d4a..8d8488306fe4 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -39,7 +39,7 @@
39 39
40DEFINE_PER_CPU(struct rds_ib_statistics, rds_ib_stats) ____cacheline_aligned; 40DEFINE_PER_CPU(struct rds_ib_statistics, rds_ib_stats) ____cacheline_aligned;
41 41
42static char *rds_ib_stat_names[] = { 42static const char *const rds_ib_stat_names[] = {
43 "ib_connect_raced", 43 "ib_connect_raced",
44 "ib_listen_closed_stale", 44 "ib_listen_closed_stale",
45 "ib_tx_cq_call", 45 "ib_tx_cq_call",
diff --git a/net/rds/ib_sysctl.c b/net/rds/ib_sysctl.c
index d87830db93a0..84b5ffcb280f 100644
--- a/net/rds/ib_sysctl.c
+++ b/net/rds/ib_sysctl.c
@@ -53,7 +53,17 @@ unsigned long rds_ib_sysctl_max_unsig_bytes = (16 << 20);
53static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1; 53static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1;
54static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL; 54static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL;
55 55
56unsigned int rds_ib_sysctl_flow_control = 1; 56/*
57 * This sysctl does nothing.
58 *
59 * Backwards compatibility with RDS 3.0 wire protocol
60 * disables initial FC credit exchange.
61 * If it's ever possible to drop 3.0 support,
62 * setting this to 1 and moving init/refill of send/recv
63 * rings from ib_cm_connect_complete() back into ib_setup_qp()
64 * will cause credits to be added before protocol negotiation.
65 */
66unsigned int rds_ib_sysctl_flow_control = 0;
57 67
58ctl_table rds_ib_sysctl_table[] = { 68ctl_table rds_ib_sysctl_table[] = {
59 { 69 {
diff --git a/net/rds/info.c b/net/rds/info.c
index 62aeef37aefe..814a91a6f4a7 100644
--- a/net/rds/info.c
+++ b/net/rds/info.c
@@ -79,6 +79,7 @@ void rds_info_register_func(int optname, rds_info_func func)
79 rds_info_funcs[offset] = func; 79 rds_info_funcs[offset] = func;
80 spin_unlock(&rds_info_lock); 80 spin_unlock(&rds_info_lock);
81} 81}
82EXPORT_SYMBOL_GPL(rds_info_register_func);
82 83
83void rds_info_deregister_func(int optname, rds_info_func func) 84void rds_info_deregister_func(int optname, rds_info_func func)
84{ 85{
@@ -91,6 +92,7 @@ void rds_info_deregister_func(int optname, rds_info_func func)
91 rds_info_funcs[offset] = NULL; 92 rds_info_funcs[offset] = NULL;
92 spin_unlock(&rds_info_lock); 93 spin_unlock(&rds_info_lock);
93} 94}
95EXPORT_SYMBOL_GPL(rds_info_deregister_func);
94 96
95/* 97/*
96 * Typically we hold an atomic kmap across multiple rds_info_copy() calls 98 * Typically we hold an atomic kmap across multiple rds_info_copy() calls
@@ -137,6 +139,7 @@ void rds_info_copy(struct rds_info_iterator *iter, void *data,
137 } 139 }
138 } 140 }
139} 141}
142EXPORT_SYMBOL_GPL(rds_info_copy);
140 143
141/* 144/*
142 * @optval points to the userspace buffer that the information snapshot 145 * @optval points to the userspace buffer that the information snapshot
diff --git a/net/rds/iw.c b/net/rds/iw.c
index d16e1cbc8e83..db224f7c2937 100644
--- a/net/rds/iw.c
+++ b/net/rds/iw.c
@@ -83,23 +83,16 @@ void rds_iw_add_one(struct ib_device *device)
83 rds_iwdev->max_wrs = dev_attr->max_qp_wr; 83 rds_iwdev->max_wrs = dev_attr->max_qp_wr;
84 rds_iwdev->max_sge = min(dev_attr->max_sge, RDS_IW_MAX_SGE); 84 rds_iwdev->max_sge = min(dev_attr->max_sge, RDS_IW_MAX_SGE);
85 85
86 rds_iwdev->page_shift = max(PAGE_SHIFT, ffs(dev_attr->page_size_cap) - 1);
87
88 rds_iwdev->dev = device; 86 rds_iwdev->dev = device;
89 rds_iwdev->pd = ib_alloc_pd(device); 87 rds_iwdev->pd = ib_alloc_pd(device);
90 if (IS_ERR(rds_iwdev->pd)) 88 if (IS_ERR(rds_iwdev->pd))
91 goto free_dev; 89 goto free_dev;
92 90
93 if (!rds_iwdev->dma_local_lkey) { 91 if (!rds_iwdev->dma_local_lkey) {
94 if (device->node_type != RDMA_NODE_RNIC) { 92 rds_iwdev->mr = ib_get_dma_mr(rds_iwdev->pd,
95 rds_iwdev->mr = ib_get_dma_mr(rds_iwdev->pd, 93 IB_ACCESS_REMOTE_READ |
96 IB_ACCESS_LOCAL_WRITE); 94 IB_ACCESS_REMOTE_WRITE |
97 } else { 95 IB_ACCESS_LOCAL_WRITE);
98 rds_iwdev->mr = ib_get_dma_mr(rds_iwdev->pd,
99 IB_ACCESS_REMOTE_READ |
100 IB_ACCESS_REMOTE_WRITE |
101 IB_ACCESS_LOCAL_WRITE);
102 }
103 if (IS_ERR(rds_iwdev->mr)) 96 if (IS_ERR(rds_iwdev->mr))
104 goto err_pd; 97 goto err_pd;
105 } else 98 } else
@@ -291,6 +284,7 @@ struct rds_transport rds_iw_transport = {
291 .flush_mrs = rds_iw_flush_mrs, 284 .flush_mrs = rds_iw_flush_mrs,
292 .t_owner = THIS_MODULE, 285 .t_owner = THIS_MODULE,
293 .t_name = "iwarp", 286 .t_name = "iwarp",
287 .t_type = RDS_TRANS_IWARP,
294 .t_prefer_loopback = 1, 288 .t_prefer_loopback = 1,
295}; 289};
296 290
diff --git a/net/rds/iw.h b/net/rds/iw.h
index 0715dde323e7..dd72b62bd506 100644
--- a/net/rds/iw.h
+++ b/net/rds/iw.h
@@ -181,7 +181,6 @@ struct rds_iw_device {
181 struct ib_pd *pd; 181 struct ib_pd *pd;
182 struct ib_mr *mr; 182 struct ib_mr *mr;
183 struct rds_iw_mr_pool *mr_pool; 183 struct rds_iw_mr_pool *mr_pool;
184 int page_shift;
185 int max_sge; 184 int max_sge;
186 unsigned int max_wrs; 185 unsigned int max_wrs;
187 unsigned int dma_local_lkey:1; 186 unsigned int dma_local_lkey:1;
diff --git a/net/rds/iw_rdma.c b/net/rds/iw_rdma.c
index dcdb37da80f2..de4a1b16bf7b 100644
--- a/net/rds/iw_rdma.c
+++ b/net/rds/iw_rdma.c
@@ -263,18 +263,12 @@ static void rds_iw_set_scatterlist(struct rds_iw_scatterlist *sg,
263} 263}
264 264
265static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev, 265static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev,
266 struct rds_iw_scatterlist *sg, 266 struct rds_iw_scatterlist *sg)
267 unsigned int dma_page_shift)
268{ 267{
269 struct ib_device *dev = rds_iwdev->dev; 268 struct ib_device *dev = rds_iwdev->dev;
270 u64 *dma_pages = NULL; 269 u64 *dma_pages = NULL;
271 u64 dma_mask;
272 unsigned int dma_page_size;
273 int i, j, ret; 270 int i, j, ret;
274 271
275 dma_page_size = 1 << dma_page_shift;
276 dma_mask = dma_page_size - 1;
277
278 WARN_ON(sg->dma_len); 272 WARN_ON(sg->dma_len);
279 273
280 sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL); 274 sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
@@ -295,18 +289,18 @@ static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev,
295 sg->bytes += dma_len; 289 sg->bytes += dma_len;
296 290
297 end_addr = dma_addr + dma_len; 291 end_addr = dma_addr + dma_len;
298 if (dma_addr & dma_mask) { 292 if (dma_addr & PAGE_MASK) {
299 if (i > 0) 293 if (i > 0)
300 goto out_unmap; 294 goto out_unmap;
301 dma_addr &= ~dma_mask; 295 dma_addr &= ~PAGE_MASK;
302 } 296 }
303 if (end_addr & dma_mask) { 297 if (end_addr & PAGE_MASK) {
304 if (i < sg->dma_len - 1) 298 if (i < sg->dma_len - 1)
305 goto out_unmap; 299 goto out_unmap;
306 end_addr = (end_addr + dma_mask) & ~dma_mask; 300 end_addr = (end_addr + PAGE_MASK) & ~PAGE_MASK;
307 } 301 }
308 302
309 sg->dma_npages += (end_addr - dma_addr) >> dma_page_shift; 303 sg->dma_npages += (end_addr - dma_addr) >> PAGE_SHIFT;
310 } 304 }
311 305
312 /* Now gather the dma addrs into one list */ 306 /* Now gather the dma addrs into one list */
@@ -325,8 +319,8 @@ static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev,
325 u64 end_addr; 319 u64 end_addr;
326 320
327 end_addr = dma_addr + dma_len; 321 end_addr = dma_addr + dma_len;
328 dma_addr &= ~dma_mask; 322 dma_addr &= ~PAGE_MASK;
329 for (; dma_addr < end_addr; dma_addr += dma_page_size) 323 for (; dma_addr < end_addr; dma_addr += PAGE_SIZE)
330 dma_pages[j++] = dma_addr; 324 dma_pages[j++] = dma_addr;
331 BUG_ON(j > sg->dma_npages); 325 BUG_ON(j > sg->dma_npages);
332 } 326 }
@@ -727,7 +721,7 @@ static int rds_iw_rdma_build_fastreg(struct rds_iw_mapping *mapping)
727 f_wr.wr.fast_reg.rkey = mapping->m_rkey; 721 f_wr.wr.fast_reg.rkey = mapping->m_rkey;
728 f_wr.wr.fast_reg.page_list = ibmr->page_list; 722 f_wr.wr.fast_reg.page_list = ibmr->page_list;
729 f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len; 723 f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len;
730 f_wr.wr.fast_reg.page_shift = ibmr->device->page_shift; 724 f_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
731 f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE | 725 f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
732 IB_ACCESS_REMOTE_READ | 726 IB_ACCESS_REMOTE_READ |
733 IB_ACCESS_REMOTE_WRITE; 727 IB_ACCESS_REMOTE_WRITE;
@@ -780,9 +774,7 @@ static int rds_iw_map_fastreg(struct rds_iw_mr_pool *pool,
780 774
781 rds_iw_set_scatterlist(&mapping->m_sg, sg, sg_len); 775 rds_iw_set_scatterlist(&mapping->m_sg, sg, sg_len);
782 776
783 dma_pages = rds_iw_map_scatterlist(rds_iwdev, 777 dma_pages = rds_iw_map_scatterlist(rds_iwdev, &mapping->m_sg);
784 &mapping->m_sg,
785 rds_iwdev->page_shift);
786 if (IS_ERR(dma_pages)) { 778 if (IS_ERR(dma_pages)) {
787 ret = PTR_ERR(dma_pages); 779 ret = PTR_ERR(dma_pages);
788 dma_pages = NULL; 780 dma_pages = NULL;
diff --git a/net/rds/iw_send.c b/net/rds/iw_send.c
index 44a6a0551f28..1f5abe3cf2b4 100644
--- a/net/rds/iw_send.c
+++ b/net/rds/iw_send.c
@@ -779,7 +779,7 @@ static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rd
779 send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey; 779 send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey;
780 send->s_wr.wr.fast_reg.page_list = send->s_page_list; 780 send->s_wr.wr.fast_reg.page_list = send->s_page_list;
781 send->s_wr.wr.fast_reg.page_list_len = nent; 781 send->s_wr.wr.fast_reg.page_list_len = nent;
782 send->s_wr.wr.fast_reg.page_shift = rds_iwdev->page_shift; 782 send->s_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
783 send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE; 783 send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE;
784 send->s_wr.wr.fast_reg.iova_start = sg_addr; 784 send->s_wr.wr.fast_reg.iova_start = sg_addr;
785 785
diff --git a/net/rds/iw_stats.c b/net/rds/iw_stats.c
index ccc7e8f0bf0e..d33ea790484e 100644
--- a/net/rds/iw_stats.c
+++ b/net/rds/iw_stats.c
@@ -39,7 +39,7 @@
39 39
40DEFINE_PER_CPU(struct rds_iw_statistics, rds_iw_stats) ____cacheline_aligned; 40DEFINE_PER_CPU(struct rds_iw_statistics, rds_iw_stats) ____cacheline_aligned;
41 41
42static char *rds_iw_stat_names[] = { 42static const char *const rds_iw_stat_names[] = {
43 "iw_connect_raced", 43 "iw_connect_raced",
44 "iw_listen_closed_stale", 44 "iw_listen_closed_stale",
45 "iw_tx_cq_call", 45 "iw_tx_cq_call",
diff --git a/net/rds/message.c b/net/rds/message.c
index 5a15dc8d0cd7..ca50a8ec9742 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -50,6 +50,7 @@ void rds_message_addref(struct rds_message *rm)
50 rdsdebug("addref rm %p ref %d\n", rm, atomic_read(&rm->m_refcount)); 50 rdsdebug("addref rm %p ref %d\n", rm, atomic_read(&rm->m_refcount));
51 atomic_inc(&rm->m_refcount); 51 atomic_inc(&rm->m_refcount);
52} 52}
53EXPORT_SYMBOL_GPL(rds_message_addref);
53 54
54/* 55/*
55 * This relies on dma_map_sg() not touching sg[].page during merging. 56 * This relies on dma_map_sg() not touching sg[].page during merging.
@@ -92,6 +93,7 @@ void rds_message_put(struct rds_message *rm)
92 kfree(rm); 93 kfree(rm);
93 } 94 }
94} 95}
96EXPORT_SYMBOL_GPL(rds_message_put);
95 97
96void rds_message_inc_free(struct rds_incoming *inc) 98void rds_message_inc_free(struct rds_incoming *inc)
97{ 99{
@@ -108,6 +110,7 @@ void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
108 hdr->h_sequence = cpu_to_be64(seq); 110 hdr->h_sequence = cpu_to_be64(seq);
109 hdr->h_exthdr[0] = RDS_EXTHDR_NONE; 111 hdr->h_exthdr[0] = RDS_EXTHDR_NONE;
110} 112}
113EXPORT_SYMBOL_GPL(rds_message_populate_header);
111 114
112int rds_message_add_extension(struct rds_header *hdr, 115int rds_message_add_extension(struct rds_header *hdr,
113 unsigned int type, const void *data, unsigned int len) 116 unsigned int type, const void *data, unsigned int len)
@@ -133,6 +136,7 @@ int rds_message_add_extension(struct rds_header *hdr,
133 dst[len] = RDS_EXTHDR_NONE; 136 dst[len] = RDS_EXTHDR_NONE;
134 return 1; 137 return 1;
135} 138}
139EXPORT_SYMBOL_GPL(rds_message_add_extension);
136 140
137/* 141/*
138 * If a message has extension headers, retrieve them here. 142 * If a message has extension headers, retrieve them here.
@@ -208,6 +212,7 @@ int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 o
208 ext_hdr.h_rdma_offset = cpu_to_be32(offset); 212 ext_hdr.h_rdma_offset = cpu_to_be32(offset);
209 return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr, sizeof(ext_hdr)); 213 return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr, sizeof(ext_hdr));
210} 214}
215EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension);
211 216
212struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp) 217struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp)
213{ 218{
@@ -399,4 +404,5 @@ void rds_message_unmapped(struct rds_message *rm)
399 if (waitqueue_active(&rds_message_flush_waitq)) 404 if (waitqueue_active(&rds_message_flush_waitq))
400 wake_up(&rds_message_flush_waitq); 405 wake_up(&rds_message_flush_waitq);
401} 406}
407EXPORT_SYMBOL_GPL(rds_message_unmapped);
402 408
diff --git a/net/rds/page.c b/net/rds/page.c
index c460743a89ad..55c21efdb62e 100644
--- a/net/rds/page.c
+++ b/net/rds/page.c
@@ -81,6 +81,7 @@ int rds_page_copy_user(struct page *page, unsigned long offset,
81 81
82 return 0; 82 return 0;
83} 83}
84EXPORT_SYMBOL_GPL(rds_page_copy_user);
84 85
85/* 86/*
86 * Message allocation uses this to build up regions of a message. 87 * Message allocation uses this to build up regions of a message.
diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c
index 7d0f901c93d5..9ece910ea394 100644
--- a/net/rds/rdma_transport.c
+++ b/net/rds/rdma_transport.c
@@ -101,7 +101,7 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
101 break; 101 break;
102 102
103 case RDMA_CM_EVENT_DISCONNECTED: 103 case RDMA_CM_EVENT_DISCONNECTED:
104 printk(KERN_WARNING "RDS/IW: DISCONNECT event - dropping connection " 104 printk(KERN_WARNING "RDS/RDMA: DISCONNECT event - dropping connection "
105 "%pI4->%pI4\n", &conn->c_laddr, 105 "%pI4->%pI4\n", &conn->c_laddr,
106 &conn->c_faddr); 106 &conn->c_faddr);
107 rds_conn_drop(conn); 107 rds_conn_drop(conn);
@@ -132,12 +132,12 @@ static int __init rds_rdma_listen_init(void)
132 cm_id = rdma_create_id(rds_rdma_cm_event_handler, NULL, RDMA_PS_TCP); 132 cm_id = rdma_create_id(rds_rdma_cm_event_handler, NULL, RDMA_PS_TCP);
133 if (IS_ERR(cm_id)) { 133 if (IS_ERR(cm_id)) {
134 ret = PTR_ERR(cm_id); 134 ret = PTR_ERR(cm_id);
135 printk(KERN_ERR "RDS/IW: failed to setup listener, " 135 printk(KERN_ERR "RDS/RDMA: failed to setup listener, "
136 "rdma_create_id() returned %d\n", ret); 136 "rdma_create_id() returned %d\n", ret);
137 goto out; 137 goto out;
138 } 138 }
139 139
140 sin.sin_family = PF_INET, 140 sin.sin_family = AF_INET,
141 sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY); 141 sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
142 sin.sin_port = (__force u16)htons(RDS_PORT); 142 sin.sin_port = (__force u16)htons(RDS_PORT);
143 143
@@ -147,14 +147,14 @@ static int __init rds_rdma_listen_init(void)
147 */ 147 */
148 ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin); 148 ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
149 if (ret) { 149 if (ret) {
150 printk(KERN_ERR "RDS/IW: failed to setup listener, " 150 printk(KERN_ERR "RDS/RDMA: failed to setup listener, "
151 "rdma_bind_addr() returned %d\n", ret); 151 "rdma_bind_addr() returned %d\n", ret);
152 goto out; 152 goto out;
153 } 153 }
154 154
155 ret = rdma_listen(cm_id, 128); 155 ret = rdma_listen(cm_id, 128);
156 if (ret) { 156 if (ret) {
157 printk(KERN_ERR "RDS/IW: failed to setup listener, " 157 printk(KERN_ERR "RDS/RDMA: failed to setup listener, "
158 "rdma_listen() returned %d\n", ret); 158 "rdma_listen() returned %d\n", ret);
159 goto out; 159 goto out;
160 } 160 }
@@ -203,6 +203,7 @@ err_iw_init:
203out: 203out:
204 return ret; 204 return ret;
205} 205}
206module_init(rds_rdma_init);
206 207
207void rds_rdma_exit(void) 208void rds_rdma_exit(void)
208{ 209{
@@ -211,4 +212,9 @@ void rds_rdma_exit(void)
211 rds_ib_exit(); 212 rds_ib_exit();
212 rds_iw_exit(); 213 rds_iw_exit();
213} 214}
215module_exit(rds_rdma_exit);
216
217MODULE_AUTHOR("Oracle Corporation <rds-devel@oss.oracle.com>");
218MODULE_DESCRIPTION("RDS: IB/iWARP transport");
219MODULE_LICENSE("Dual BSD/GPL");
214 220
diff --git a/net/rds/rds.h b/net/rds/rds.h
index dbe111236783..85d6f897ecc7 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -311,11 +311,17 @@ struct rds_notifier {
311 * flag and header. 311 * flag and header.
312 */ 312 */
313 313
314#define RDS_TRANS_IB 0
315#define RDS_TRANS_IWARP 1
316#define RDS_TRANS_TCP 2
317#define RDS_TRANS_COUNT 3
318
314struct rds_transport { 319struct rds_transport {
315 char t_name[TRANSNAMSIZ]; 320 char t_name[TRANSNAMSIZ];
316 struct list_head t_item; 321 struct list_head t_item;
317 struct module *t_owner; 322 struct module *t_owner;
318 unsigned int t_prefer_loopback:1; 323 unsigned int t_prefer_loopback:1;
324 unsigned int t_type;
319 325
320 int (*laddr_check)(__be32 addr); 326 int (*laddr_check)(__be32 addr);
321 int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp); 327 int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
@@ -652,7 +658,8 @@ DECLARE_PER_CPU_SHARED_ALIGNED(struct rds_statistics, rds_stats);
652int __init rds_stats_init(void); 658int __init rds_stats_init(void);
653void rds_stats_exit(void); 659void rds_stats_exit(void);
654void rds_stats_info_copy(struct rds_info_iterator *iter, 660void rds_stats_info_copy(struct rds_info_iterator *iter,
655 uint64_t *values, char **names, size_t nr); 661 uint64_t *values, const char *const *names,
662 size_t nr);
656 663
657/* sysctl.c */ 664/* sysctl.c */
658int __init rds_sysctl_init(void); 665int __init rds_sysctl_init(void);
diff --git a/net/rds/recv.c b/net/rds/recv.c
index f2118c51cfa3..fdff33c7b432 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -46,12 +46,14 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
46 inc->i_saddr = saddr; 46 inc->i_saddr = saddr;
47 inc->i_rdma_cookie = 0; 47 inc->i_rdma_cookie = 0;
48} 48}
49EXPORT_SYMBOL_GPL(rds_inc_init);
49 50
50void rds_inc_addref(struct rds_incoming *inc) 51void rds_inc_addref(struct rds_incoming *inc)
51{ 52{
52 rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount)); 53 rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
53 atomic_inc(&inc->i_refcount); 54 atomic_inc(&inc->i_refcount);
54} 55}
56EXPORT_SYMBOL_GPL(rds_inc_addref);
55 57
56void rds_inc_put(struct rds_incoming *inc) 58void rds_inc_put(struct rds_incoming *inc)
57{ 59{
@@ -62,6 +64,7 @@ void rds_inc_put(struct rds_incoming *inc)
62 inc->i_conn->c_trans->inc_free(inc); 64 inc->i_conn->c_trans->inc_free(inc);
63 } 65 }
64} 66}
67EXPORT_SYMBOL_GPL(rds_inc_put);
65 68
66static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk, 69static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
67 struct rds_cong_map *map, 70 struct rds_cong_map *map,
@@ -237,6 +240,7 @@ out:
237 if (rs) 240 if (rs)
238 rds_sock_put(rs); 241 rds_sock_put(rs);
239} 242}
243EXPORT_SYMBOL_GPL(rds_recv_incoming);
240 244
241/* 245/*
242 * be very careful here. This is being called as the condition in 246 * be very careful here. This is being called as the condition in
@@ -409,18 +413,18 @@ int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
409 if (msg_flags & MSG_OOB) 413 if (msg_flags & MSG_OOB)
410 goto out; 414 goto out;
411 415
412 /* If there are pending notifications, do those - and nothing else */ 416 while (1) {
413 if (!list_empty(&rs->rs_notify_queue)) { 417 /* If there are pending notifications, do those - and nothing else */
414 ret = rds_notify_queue_get(rs, msg); 418 if (!list_empty(&rs->rs_notify_queue)) {
415 goto out; 419 ret = rds_notify_queue_get(rs, msg);
416 } 420 break;
421 }
417 422
418 if (rs->rs_cong_notify) { 423 if (rs->rs_cong_notify) {
419 ret = rds_notify_cong(rs, msg); 424 ret = rds_notify_cong(rs, msg);
420 goto out; 425 break;
421 } 426 }
422 427
423 while (1) {
424 if (!rds_next_incoming(rs, &inc)) { 428 if (!rds_next_incoming(rs, &inc)) {
425 if (nonblock) { 429 if (nonblock) {
426 ret = -EAGAIN; 430 ret = -EAGAIN;
@@ -428,7 +432,9 @@ int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
428 } 432 }
429 433
430 timeo = wait_event_interruptible_timeout(*sk->sk_sleep, 434 timeo = wait_event_interruptible_timeout(*sk->sk_sleep,
431 rds_next_incoming(rs, &inc), 435 (!list_empty(&rs->rs_notify_queue)
436 || rs->rs_cong_notify
437 || rds_next_incoming(rs, &inc)),
432 timeo); 438 timeo);
433 rdsdebug("recvmsg woke inc %p timeo %ld\n", inc, 439 rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
434 timeo); 440 timeo);
diff --git a/net/rds/send.c b/net/rds/send.c
index a4a7f428cd76..28c88ff3d038 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -439,6 +439,7 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
439 sock_put(rds_rs_to_sk(rs)); 439 sock_put(rds_rs_to_sk(rs));
440 } 440 }
441} 441}
442EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
442 443
443/* 444/*
444 * This is the same as rds_rdma_send_complete except we 445 * This is the same as rds_rdma_send_complete except we
@@ -494,6 +495,7 @@ out:
494 495
495 return found; 496 return found;
496} 497}
498EXPORT_SYMBOL_GPL(rds_send_get_message);
497 499
498/* 500/*
499 * This removes messages from the socket's list if they're on it. The list 501 * This removes messages from the socket's list if they're on it. The list
@@ -610,6 +612,7 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
610 /* now remove the messages from the sock list as needed */ 612 /* now remove the messages from the sock list as needed */
611 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 613 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
612} 614}
615EXPORT_SYMBOL_GPL(rds_send_drop_acked);
613 616
614void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) 617void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
615{ 618{
diff --git a/net/rds/stats.c b/net/rds/stats.c
index 637146893cf3..7598eb07cfb1 100644
--- a/net/rds/stats.c
+++ b/net/rds/stats.c
@@ -37,10 +37,11 @@
37#include "rds.h" 37#include "rds.h"
38 38
39DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_statistics, rds_stats); 39DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_statistics, rds_stats);
40EXPORT_PER_CPU_SYMBOL_GPL(rds_stats);
40 41
41/* :.,$s/unsigned long\>.*\<s_\(.*\);/"\1",/g */ 42/* :.,$s/unsigned long\>.*\<s_\(.*\);/"\1",/g */
42 43
43static char *rds_stat_names[] = { 44static const char *const rds_stat_names[] = {
44 "conn_reset", 45 "conn_reset",
45 "recv_drop_bad_checksum", 46 "recv_drop_bad_checksum",
46 "recv_drop_old_seq", 47 "recv_drop_old_seq",
@@ -77,7 +78,7 @@ static char *rds_stat_names[] = {
77}; 78};
78 79
79void rds_stats_info_copy(struct rds_info_iterator *iter, 80void rds_stats_info_copy(struct rds_info_iterator *iter,
80 uint64_t *values, char **names, size_t nr) 81 uint64_t *values, const char *const *names, size_t nr)
81{ 82{
82 struct rds_info_counter ctr; 83 struct rds_info_counter ctr;
83 size_t i; 84 size_t i;
@@ -90,6 +91,7 @@ void rds_stats_info_copy(struct rds_info_iterator *iter,
90 rds_info_copy(iter, &ctr, sizeof(ctr)); 91 rds_info_copy(iter, &ctr, sizeof(ctr));
91 } 92 }
92} 93}
94EXPORT_SYMBOL_GPL(rds_stats_info_copy);
93 95
94/* 96/*
95 * This gives global counters across all the transports. The strings 97 * This gives global counters across all the transports. The strings
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
new file mode 100644
index 000000000000..b5198aee45d3
--- /dev/null
+++ b/net/rds/tcp.c
@@ -0,0 +1,320 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <net/tcp.h>
36
37#include "rds.h"
38#include "tcp.h"
39
40/* only for info exporting */
41static DEFINE_SPINLOCK(rds_tcp_tc_list_lock);
42static LIST_HEAD(rds_tcp_tc_list);
43unsigned int rds_tcp_tc_count;
44
45/* Track rds_tcp_connection structs so they can be cleaned up */
46static DEFINE_SPINLOCK(rds_tcp_conn_lock);
47static LIST_HEAD(rds_tcp_conn_list);
48
49static struct kmem_cache *rds_tcp_conn_slab;
50
51#define RDS_TCP_DEFAULT_BUFSIZE (128 * 1024)
52
53/* doing it this way avoids calling tcp_sk() */
54void rds_tcp_nonagle(struct socket *sock)
55{
56 mm_segment_t oldfs = get_fs();
57 int val = 1;
58
59 set_fs(KERNEL_DS);
60 sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY, (char __user *)&val,
61 sizeof(val));
62 set_fs(oldfs);
63}
64
65void rds_tcp_tune(struct socket *sock)
66{
67 struct sock *sk = sock->sk;
68
69 rds_tcp_nonagle(sock);
70
71 /*
72 * We're trying to saturate gigabit with the default,
73 * see svc_sock_setbufsize().
74 */
75 lock_sock(sk);
76 sk->sk_sndbuf = RDS_TCP_DEFAULT_BUFSIZE;
77 sk->sk_rcvbuf = RDS_TCP_DEFAULT_BUFSIZE;
78 sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
79 release_sock(sk);
80}
81
82u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc)
83{
84 return tcp_sk(tc->t_sock->sk)->snd_nxt;
85}
86
87u32 rds_tcp_snd_una(struct rds_tcp_connection *tc)
88{
89 return tcp_sk(tc->t_sock->sk)->snd_una;
90}
91
92void rds_tcp_restore_callbacks(struct socket *sock,
93 struct rds_tcp_connection *tc)
94{
95 rdsdebug("restoring sock %p callbacks from tc %p\n", sock, tc);
96 write_lock_bh(&sock->sk->sk_callback_lock);
97
98 /* done under the callback_lock to serialize with write_space */
99 spin_lock(&rds_tcp_tc_list_lock);
100 list_del_init(&tc->t_list_item);
101 rds_tcp_tc_count--;
102 spin_unlock(&rds_tcp_tc_list_lock);
103
104 tc->t_sock = NULL;
105
106 sock->sk->sk_write_space = tc->t_orig_write_space;
107 sock->sk->sk_data_ready = tc->t_orig_data_ready;
108 sock->sk->sk_state_change = tc->t_orig_state_change;
109 sock->sk->sk_user_data = NULL;
110
111 write_unlock_bh(&sock->sk->sk_callback_lock);
112}
113
114/*
115 * This is the only path that sets tc->t_sock. Send and receive trust that
116 * it is set. The RDS_CONN_CONNECTED bit protects those paths from being
117 * called while it isn't set.
118 */
119void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn)
120{
121 struct rds_tcp_connection *tc = conn->c_transport_data;
122
123 rdsdebug("setting sock %p callbacks to tc %p\n", sock, tc);
124 write_lock_bh(&sock->sk->sk_callback_lock);
125
126 /* done under the callback_lock to serialize with write_space */
127 spin_lock(&rds_tcp_tc_list_lock);
128 list_add_tail(&tc->t_list_item, &rds_tcp_tc_list);
129 rds_tcp_tc_count++;
130 spin_unlock(&rds_tcp_tc_list_lock);
131
132 /* accepted sockets need our listen data ready undone */
133 if (sock->sk->sk_data_ready == rds_tcp_listen_data_ready)
134 sock->sk->sk_data_ready = sock->sk->sk_user_data;
135
136 tc->t_sock = sock;
137 tc->conn = conn;
138 tc->t_orig_data_ready = sock->sk->sk_data_ready;
139 tc->t_orig_write_space = sock->sk->sk_write_space;
140 tc->t_orig_state_change = sock->sk->sk_state_change;
141
142 sock->sk->sk_user_data = conn;
143 sock->sk->sk_data_ready = rds_tcp_data_ready;
144 sock->sk->sk_write_space = rds_tcp_write_space;
145 sock->sk->sk_state_change = rds_tcp_state_change;
146
147 write_unlock_bh(&sock->sk->sk_callback_lock);
148}
149
150static void rds_tcp_tc_info(struct socket *sock, unsigned int len,
151 struct rds_info_iterator *iter,
152 struct rds_info_lengths *lens)
153{
154 struct rds_info_tcp_socket tsinfo;
155 struct rds_tcp_connection *tc;
156 unsigned long flags;
157 struct sockaddr_in sin;
158 int sinlen;
159
160 spin_lock_irqsave(&rds_tcp_tc_list_lock, flags);
161
162 if (len / sizeof(tsinfo) < rds_tcp_tc_count)
163 goto out;
164
165 list_for_each_entry(tc, &rds_tcp_tc_list, t_list_item) {
166
167 sock->ops->getname(sock, (struct sockaddr *)&sin, &sinlen, 0);
168 tsinfo.local_addr = sin.sin_addr.s_addr;
169 tsinfo.local_port = sin.sin_port;
170 sock->ops->getname(sock, (struct sockaddr *)&sin, &sinlen, 1);
171 tsinfo.peer_addr = sin.sin_addr.s_addr;
172 tsinfo.peer_port = sin.sin_port;
173
174 tsinfo.hdr_rem = tc->t_tinc_hdr_rem;
175 tsinfo.data_rem = tc->t_tinc_data_rem;
176 tsinfo.last_sent_nxt = tc->t_last_sent_nxt;
177 tsinfo.last_expected_una = tc->t_last_expected_una;
178 tsinfo.last_seen_una = tc->t_last_seen_una;
179
180 rds_info_copy(iter, &tsinfo, sizeof(tsinfo));
181 }
182
183out:
184 lens->nr = rds_tcp_tc_count;
185 lens->each = sizeof(tsinfo);
186
187 spin_unlock_irqrestore(&rds_tcp_tc_list_lock, flags);
188}
189
190static int rds_tcp_laddr_check(__be32 addr)
191{
192 if (inet_addr_type(&init_net, addr) == RTN_LOCAL)
193 return 0;
194 return -EADDRNOTAVAIL;
195}
196
197static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
198{
199 struct rds_tcp_connection *tc;
200
201 tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
202 if (tc == NULL)
203 return -ENOMEM;
204
205 tc->t_sock = NULL;
206 tc->t_tinc = NULL;
207 tc->t_tinc_hdr_rem = sizeof(struct rds_header);
208 tc->t_tinc_data_rem = 0;
209
210 conn->c_transport_data = tc;
211
212 spin_lock_irq(&rds_tcp_conn_lock);
213 list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list);
214 spin_unlock_irq(&rds_tcp_conn_lock);
215
216 rdsdebug("alloced tc %p\n", conn->c_transport_data);
217 return 0;
218}
219
220static void rds_tcp_conn_free(void *arg)
221{
222 struct rds_tcp_connection *tc = arg;
223 rdsdebug("freeing tc %p\n", tc);
224 kmem_cache_free(rds_tcp_conn_slab, tc);
225}
226
227static void rds_tcp_destroy_conns(void)
228{
229 struct rds_tcp_connection *tc, *_tc;
230 LIST_HEAD(tmp_list);
231
232 /* avoid calling conn_destroy with irqs off */
233 spin_lock_irq(&rds_tcp_conn_lock);
234 list_splice(&rds_tcp_conn_list, &tmp_list);
235 INIT_LIST_HEAD(&rds_tcp_conn_list);
236 spin_unlock_irq(&rds_tcp_conn_lock);
237
238 list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) {
239 if (tc->conn->c_passive)
240 rds_conn_destroy(tc->conn->c_passive);
241 rds_conn_destroy(tc->conn);
242 }
243}
244
245void rds_tcp_exit(void)
246{
247 rds_info_deregister_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info);
248 rds_tcp_listen_stop();
249 rds_tcp_destroy_conns();
250 rds_trans_unregister(&rds_tcp_transport);
251 rds_tcp_recv_exit();
252 kmem_cache_destroy(rds_tcp_conn_slab);
253}
254module_exit(rds_tcp_exit);
255
256struct rds_transport rds_tcp_transport = {
257 .laddr_check = rds_tcp_laddr_check,
258 .xmit_prepare = rds_tcp_xmit_prepare,
259 .xmit_complete = rds_tcp_xmit_complete,
260 .xmit_cong_map = rds_tcp_xmit_cong_map,
261 .xmit = rds_tcp_xmit,
262 .recv = rds_tcp_recv,
263 .conn_alloc = rds_tcp_conn_alloc,
264 .conn_free = rds_tcp_conn_free,
265 .conn_connect = rds_tcp_conn_connect,
266 .conn_shutdown = rds_tcp_conn_shutdown,
267 .inc_copy_to_user = rds_tcp_inc_copy_to_user,
268 .inc_purge = rds_tcp_inc_purge,
269 .inc_free = rds_tcp_inc_free,
270 .stats_info_copy = rds_tcp_stats_info_copy,
271 .exit = rds_tcp_exit,
272 .t_owner = THIS_MODULE,
273 .t_name = "tcp",
274 .t_type = RDS_TRANS_TCP,
275 .t_prefer_loopback = 1,
276};
277
278int __init rds_tcp_init(void)
279{
280 int ret;
281
282 rds_tcp_conn_slab = kmem_cache_create("rds_tcp_connection",
283 sizeof(struct rds_tcp_connection),
284 0, 0, NULL);
285 if (rds_tcp_conn_slab == NULL) {
286 ret = -ENOMEM;
287 goto out;
288 }
289
290 ret = rds_tcp_recv_init();
291 if (ret)
292 goto out_slab;
293
294 ret = rds_trans_register(&rds_tcp_transport);
295 if (ret)
296 goto out_recv;
297
298 ret = rds_tcp_listen_init();
299 if (ret)
300 goto out_register;
301
302 rds_info_register_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info);
303
304 goto out;
305
306out_register:
307 rds_trans_unregister(&rds_tcp_transport);
308out_recv:
309 rds_tcp_recv_exit();
310out_slab:
311 kmem_cache_destroy(rds_tcp_conn_slab);
312out:
313 return ret;
314}
315module_init(rds_tcp_init);
316
317MODULE_AUTHOR("Oracle Corporation <rds-devel@oss.oracle.com>");
318MODULE_DESCRIPTION("RDS: TCP transport");
319MODULE_LICENSE("Dual BSD/GPL");
320
diff --git a/net/rds/tcp.h b/net/rds/tcp.h
new file mode 100644
index 000000000000..844fa6b9cf5a
--- /dev/null
+++ b/net/rds/tcp.h
@@ -0,0 +1,93 @@
1#ifndef _RDS_TCP_H
2#define _RDS_TCP_H
3
4#define RDS_TCP_PORT 16385
5
6struct rds_tcp_incoming {
7 struct rds_incoming ti_inc;
8 struct sk_buff_head ti_skb_list;
9};
10
11struct rds_tcp_connection {
12
13 struct list_head t_tcp_node;
14 struct rds_connection *conn;
15 struct socket *t_sock;
16 void *t_orig_write_space;
17 void *t_orig_data_ready;
18 void *t_orig_state_change;
19
20 struct rds_tcp_incoming *t_tinc;
21 size_t t_tinc_hdr_rem;
22 size_t t_tinc_data_rem;
23
24 /* XXX error report? */
25 struct work_struct t_conn_w;
26 struct work_struct t_send_w;
27 struct work_struct t_down_w;
28 struct work_struct t_recv_w;
29
30 /* for info exporting only */
31 struct list_head t_list_item;
32 u32 t_last_sent_nxt;
33 u32 t_last_expected_una;
34 u32 t_last_seen_una;
35};
36
37struct rds_tcp_statistics {
38 uint64_t s_tcp_data_ready_calls;
39 uint64_t s_tcp_write_space_calls;
40 uint64_t s_tcp_sndbuf_full;
41 uint64_t s_tcp_connect_raced;
42 uint64_t s_tcp_listen_closed_stale;
43};
44
45/* tcp.c */
46int __init rds_tcp_init(void);
47void rds_tcp_exit(void);
48void rds_tcp_tune(struct socket *sock);
49void rds_tcp_nonagle(struct socket *sock);
50void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn);
51void rds_tcp_restore_callbacks(struct socket *sock,
52 struct rds_tcp_connection *tc);
53u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc);
54u32 rds_tcp_snd_una(struct rds_tcp_connection *tc);
55u64 rds_tcp_map_seq(struct rds_tcp_connection *tc, u32 seq);
56extern struct rds_transport rds_tcp_transport;
57
58/* tcp_connect.c */
59int rds_tcp_conn_connect(struct rds_connection *conn);
60void rds_tcp_conn_shutdown(struct rds_connection *conn);
61void rds_tcp_state_change(struct sock *sk);
62
63/* tcp_listen.c */
64int __init rds_tcp_listen_init(void);
65void rds_tcp_listen_stop(void);
66void rds_tcp_listen_data_ready(struct sock *sk, int bytes);
67
68/* tcp_recv.c */
69int __init rds_tcp_recv_init(void);
70void rds_tcp_recv_exit(void);
71void rds_tcp_data_ready(struct sock *sk, int bytes);
72int rds_tcp_recv(struct rds_connection *conn);
73void rds_tcp_inc_purge(struct rds_incoming *inc);
74void rds_tcp_inc_free(struct rds_incoming *inc);
75int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
76 size_t size);
77
78/* tcp_send.c */
79void rds_tcp_xmit_prepare(struct rds_connection *conn);
80void rds_tcp_xmit_complete(struct rds_connection *conn);
81int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
82 unsigned int hdr_off, unsigned int sg, unsigned int off);
83void rds_tcp_write_space(struct sock *sk);
84int rds_tcp_xmit_cong_map(struct rds_connection *conn,
85 struct rds_cong_map *map, unsigned long offset);
86
87/* tcp_stats.c */
88DECLARE_PER_CPU(struct rds_tcp_statistics, rds_tcp_stats);
89#define rds_tcp_stats_inc(member) rds_stats_inc_which(rds_tcp_stats, member)
90unsigned int rds_tcp_stats_info_copy(struct rds_info_iterator *iter,
91 unsigned int avail);
92
93#endif
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
new file mode 100644
index 000000000000..211522f9a9a2
--- /dev/null
+++ b/net/rds/tcp_connect.c
@@ -0,0 +1,153 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <net/tcp.h>
36
37#include "rds.h"
38#include "tcp.h"
39
40void rds_tcp_state_change(struct sock *sk)
41{
42 void (*state_change)(struct sock *sk);
43 struct rds_connection *conn;
44 struct rds_tcp_connection *tc;
45
46 read_lock(&sk->sk_callback_lock);
47 conn = sk->sk_user_data;
48 if (conn == NULL) {
49 state_change = sk->sk_state_change;
50 goto out;
51 }
52 tc = conn->c_transport_data;
53 state_change = tc->t_orig_state_change;
54
55 rdsdebug("sock %p state_change to %d\n", tc->t_sock, sk->sk_state);
56
57 switch(sk->sk_state) {
58 /* ignore connecting sockets as they make progress */
59 case TCP_SYN_SENT:
60 case TCP_SYN_RECV:
61 break;
62 case TCP_ESTABLISHED:
63 rds_connect_complete(conn);
64 break;
65 case TCP_CLOSE:
66 rds_conn_drop(conn);
67 default:
68 break;
69 }
70out:
71 read_unlock(&sk->sk_callback_lock);
72 state_change(sk);
73}
74
75int rds_tcp_conn_connect(struct rds_connection *conn)
76{
77 struct socket *sock = NULL;
78 struct sockaddr_in src, dest;
79 int ret;
80
81 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
82 if (ret < 0)
83 goto out;
84
85 rds_tcp_tune(sock);
86
87 src.sin_family = AF_INET;
88 src.sin_addr.s_addr = (__force u32)conn->c_laddr;
89 src.sin_port = (__force u16)htons(0);
90
91 ret = sock->ops->bind(sock, (struct sockaddr *)&src, sizeof(src));
92 if (ret) {
93 rdsdebug("bind failed with %d at address %u.%u.%u.%u\n",
94 ret, NIPQUAD(conn->c_laddr));
95 goto out;
96 }
97
98 dest.sin_family = AF_INET;
99 dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
100 dest.sin_port = (__force u16)htons(RDS_TCP_PORT);
101
102 /*
103 * once we call connect() we can start getting callbacks and they
104 * own the socket
105 */
106 rds_tcp_set_callbacks(sock, conn);
107 ret = sock->ops->connect(sock, (struct sockaddr *)&dest, sizeof(dest),
108 O_NONBLOCK);
109 sock = NULL;
110
111 rdsdebug("connect to address %u.%u.%u.%u returned %d\n",
112 NIPQUAD(conn->c_faddr), ret);
113 if (ret == -EINPROGRESS)
114 ret = 0;
115
116out:
117 if (sock)
118 sock_release(sock);
119 return ret;
120}
121
122/*
123 * Before killing the tcp socket this needs to serialize with callbacks. The
124 * caller has already grabbed the sending sem so we're serialized with other
125 * senders.
126 *
127 * TCP calls the callbacks with the sock lock so we hold it while we reset the
128 * callbacks to those set by TCP. Our callbacks won't execute again once we
129 * hold the sock lock.
130 */
131void rds_tcp_conn_shutdown(struct rds_connection *conn)
132{
133 struct rds_tcp_connection *tc = conn->c_transport_data;
134 struct socket *sock = tc->t_sock;
135
136 rdsdebug("shutting down conn %p tc %p sock %p\n", conn, tc, sock);
137
138 if (sock) {
139 sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
140 lock_sock(sock->sk);
141 rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */
142
143 release_sock(sock->sk);
144 sock_release(sock);
145 };
146
147 if (tc->t_tinc) {
148 rds_inc_put(&tc->t_tinc->ti_inc);
149 tc->t_tinc = NULL;
150 }
151 tc->t_tinc_hdr_rem = sizeof(struct rds_header);
152 tc->t_tinc_data_rem = 0;
153}
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
new file mode 100644
index 000000000000..24b743eb0b1b
--- /dev/null
+++ b/net/rds/tcp_listen.c
@@ -0,0 +1,199 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <net/tcp.h>
36
37#include "rds.h"
38#include "tcp.h"
39
40/*
41 * cheesy, but simple..
42 */
43static void rds_tcp_accept_worker(struct work_struct *work);
44static DECLARE_WORK(rds_tcp_listen_work, rds_tcp_accept_worker);
45static struct socket *rds_tcp_listen_sock;
46
47static int rds_tcp_accept_one(struct socket *sock)
48{
49 struct socket *new_sock = NULL;
50 struct rds_connection *conn;
51 int ret;
52 struct inet_sock *inet;
53
54 ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
55 sock->sk->sk_protocol, &new_sock);
56 if (ret)
57 goto out;
58
59 new_sock->type = sock->type;
60 new_sock->ops = sock->ops;
61 ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
62 if (ret < 0)
63 goto out;
64
65 rds_tcp_tune(new_sock);
66
67 inet = inet_sk(new_sock->sk);
68
69 rdsdebug("accepted tcp %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u\n",
70 NIPQUAD(inet->saddr), ntohs(inet->sport),
71 NIPQUAD(inet->daddr), ntohs(inet->dport));
72
73 conn = rds_conn_create(inet->saddr, inet->daddr, &rds_tcp_transport,
74 GFP_KERNEL);
75 if (IS_ERR(conn)) {
76 ret = PTR_ERR(conn);
77 goto out;
78 }
79
80 /*
81 * see the comment above rds_queue_delayed_reconnect()
82 */
83 if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
84 if (rds_conn_state(conn) == RDS_CONN_UP)
85 rds_tcp_stats_inc(s_tcp_listen_closed_stale);
86 else
87 rds_tcp_stats_inc(s_tcp_connect_raced);
88 rds_conn_drop(conn);
89 ret = 0;
90 goto out;
91 }
92
93 rds_tcp_set_callbacks(new_sock, conn);
94 rds_connect_complete(conn);
95 new_sock = NULL;
96 ret = 0;
97
98out:
99 if (new_sock)
100 sock_release(new_sock);
101 return ret;
102}
103
104static void rds_tcp_accept_worker(struct work_struct *work)
105{
106 while (rds_tcp_accept_one(rds_tcp_listen_sock) == 0)
107 cond_resched();
108}
109
110void rds_tcp_listen_data_ready(struct sock *sk, int bytes)
111{
112 void (*ready)(struct sock *sk, int bytes);
113
114 rdsdebug("listen data ready sk %p\n", sk);
115
116 read_lock(&sk->sk_callback_lock);
117 ready = sk->sk_user_data;
118 if (ready == NULL) { /* check for teardown race */
119 ready = sk->sk_data_ready;
120 goto out;
121 }
122
123 /*
124 * ->sk_data_ready is also called for a newly established child socket
125 * before it has been accepted and the accepter has set up their
126 * data_ready.. we only want to queue listen work for our listening
127 * socket
128 */
129 if (sk->sk_state == TCP_LISTEN)
130 queue_work(rds_wq, &rds_tcp_listen_work);
131
132out:
133 read_unlock(&sk->sk_callback_lock);
134 ready(sk, bytes);
135}
136
137int __init rds_tcp_listen_init(void)
138{
139 struct sockaddr_in sin;
140 struct socket *sock = NULL;
141 int ret;
142
143 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
144 if (ret < 0)
145 goto out;
146
147 sock->sk->sk_reuse = 1;
148 rds_tcp_nonagle(sock);
149
150 write_lock_bh(&sock->sk->sk_callback_lock);
151 sock->sk->sk_user_data = sock->sk->sk_data_ready;
152 sock->sk->sk_data_ready = rds_tcp_listen_data_ready;
153 write_unlock_bh(&sock->sk->sk_callback_lock);
154
155 sin.sin_family = PF_INET,
156 sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
157 sin.sin_port = (__force u16)htons(RDS_TCP_PORT);
158
159 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
160 if (ret < 0)
161 goto out;
162
163 ret = sock->ops->listen(sock, 64);
164 if (ret < 0)
165 goto out;
166
167 rds_tcp_listen_sock = sock;
168 sock = NULL;
169out:
170 if (sock)
171 sock_release(sock);
172 return ret;
173}
174
175void rds_tcp_listen_stop(void)
176{
177 struct socket *sock = rds_tcp_listen_sock;
178 struct sock *sk;
179
180 if (sock == NULL)
181 return;
182
183 sk = sock->sk;
184
185 /* serialize with and prevent further callbacks */
186 lock_sock(sk);
187 write_lock_bh(&sk->sk_callback_lock);
188 if (sk->sk_user_data) {
189 sk->sk_data_ready = sk->sk_user_data;
190 sk->sk_user_data = NULL;
191 }
192 write_unlock_bh(&sk->sk_callback_lock);
193 release_sock(sk);
194
195 /* wait for accepts to stop and close the socket */
196 flush_workqueue(rds_wq);
197 sock_release(sock);
198 rds_tcp_listen_sock = NULL;
199}
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
new file mode 100644
index 000000000000..c00dafffbb5a
--- /dev/null
+++ b/net/rds/tcp_recv.c
@@ -0,0 +1,356 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <net/tcp.h>
35
36#include "rds.h"
37#include "tcp.h"
38
39static struct kmem_cache *rds_tcp_incoming_slab;
40
41void rds_tcp_inc_purge(struct rds_incoming *inc)
42{
43 struct rds_tcp_incoming *tinc;
44 tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
45 rdsdebug("purging tinc %p inc %p\n", tinc, inc);
46 skb_queue_purge(&tinc->ti_skb_list);
47}
48
49void rds_tcp_inc_free(struct rds_incoming *inc)
50{
51 struct rds_tcp_incoming *tinc;
52 tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
53 rds_tcp_inc_purge(inc);
54 rdsdebug("freeing tinc %p inc %p\n", tinc, inc);
55 kmem_cache_free(rds_tcp_incoming_slab, tinc);
56}
57
58/*
59 * this is pretty lame, but, whatever.
60 */
61int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
62 size_t size)
63{
64 struct rds_tcp_incoming *tinc;
65 struct iovec *iov, tmp;
66 struct sk_buff *skb;
67 unsigned long to_copy, skb_off;
68 int ret = 0;
69
70 if (size == 0)
71 goto out;
72
73 tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
74 iov = first_iov;
75 tmp = *iov;
76
77 skb_queue_walk(&tinc->ti_skb_list, skb) {
78 skb_off = 0;
79 while (skb_off < skb->len) {
80 while (tmp.iov_len == 0) {
81 iov++;
82 tmp = *iov;
83 }
84
85 to_copy = min(tmp.iov_len, size);
86 to_copy = min(to_copy, skb->len - skb_off);
87
88 rdsdebug("ret %d size %zu skb %p skb_off %lu "
89 "skblen %d iov_base %p iov_len %zu cpy %lu\n",
90 ret, size, skb, skb_off, skb->len,
91 tmp.iov_base, tmp.iov_len, to_copy);
92
93 /* modifies tmp as it copies */
94 if (skb_copy_datagram_iovec(skb, skb_off, &tmp,
95 to_copy)) {
96 ret = -EFAULT;
97 goto out;
98 }
99
100 size -= to_copy;
101 ret += to_copy;
102 skb_off += to_copy;
103 if (size == 0)
104 goto out;
105 }
106 }
107out:
108 return ret;
109}
110
111/*
112 * We have a series of skbs that have fragmented pieces of the congestion
113 * bitmap. They must add up to the exact size of the congestion bitmap. We
114 * use the skb helpers to copy those into the pages that make up the in-memory
115 * congestion bitmap for the remote address of this connection. We then tell
116 * the congestion core that the bitmap has been changed so that it can wake up
117 * sleepers.
118 *
119 * This is racing with sending paths which are using test_bit to see if the
120 * bitmap indicates that their recipient is congested.
121 */
122
123static void rds_tcp_cong_recv(struct rds_connection *conn,
124 struct rds_tcp_incoming *tinc)
125{
126 struct sk_buff *skb;
127 unsigned int to_copy, skb_off;
128 unsigned int map_off;
129 unsigned int map_page;
130 struct rds_cong_map *map;
131 int ret;
132
133 /* catch completely corrupt packets */
134 if (be32_to_cpu(tinc->ti_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
135 return;
136
137 map_page = 0;
138 map_off = 0;
139 map = conn->c_fcong;
140
141 skb_queue_walk(&tinc->ti_skb_list, skb) {
142 skb_off = 0;
143 while (skb_off < skb->len) {
144 to_copy = min_t(unsigned int, PAGE_SIZE - map_off,
145 skb->len - skb_off);
146
147 BUG_ON(map_page >= RDS_CONG_MAP_PAGES);
148
149 /* only returns 0 or -error */
150 ret = skb_copy_bits(skb, skb_off,
151 (void *)map->m_page_addrs[map_page] + map_off,
152 to_copy);
153 BUG_ON(ret != 0);
154
155 skb_off += to_copy;
156 map_off += to_copy;
157 if (map_off == PAGE_SIZE) {
158 map_off = 0;
159 map_page++;
160 }
161 }
162 }
163
164 rds_cong_map_updated(map, ~(u64) 0);
165}
166
167struct rds_tcp_desc_arg {
168 struct rds_connection *conn;
169 gfp_t gfp;
170 enum km_type km;
171};
172
173static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
174 unsigned int offset, size_t len)
175{
176 struct rds_tcp_desc_arg *arg = desc->arg.data;
177 struct rds_connection *conn = arg->conn;
178 struct rds_tcp_connection *tc = conn->c_transport_data;
179 struct rds_tcp_incoming *tinc = tc->t_tinc;
180 struct sk_buff *clone;
181 size_t left = len, to_copy;
182
183 rdsdebug("tcp data tc %p skb %p offset %u len %zu\n", tc, skb, offset,
184 len);
185
186 /*
187 * tcp_read_sock() interprets partial progress as an indication to stop
188 * processing.
189 */
190 while (left) {
191 if (tinc == NULL) {
192 tinc = kmem_cache_alloc(rds_tcp_incoming_slab,
193 arg->gfp);
194 if (tinc == NULL) {
195 desc->error = -ENOMEM;
196 goto out;
197 }
198 tc->t_tinc = tinc;
199 rdsdebug("alloced tinc %p\n", tinc);
200 rds_inc_init(&tinc->ti_inc, conn, conn->c_faddr);
201 /*
202 * XXX * we might be able to use the __ variants when
203 * we've already serialized at a higher level.
204 */
205 skb_queue_head_init(&tinc->ti_skb_list);
206 }
207
208 if (left && tc->t_tinc_hdr_rem) {
209 to_copy = min(tc->t_tinc_hdr_rem, left);
210 rdsdebug("copying %zu header from skb %p\n", to_copy,
211 skb);
212 skb_copy_bits(skb, offset,
213 (char *)&tinc->ti_inc.i_hdr +
214 sizeof(struct rds_header) -
215 tc->t_tinc_hdr_rem,
216 to_copy);
217 tc->t_tinc_hdr_rem -= to_copy;
218 left -= to_copy;
219 offset += to_copy;
220
221 if (tc->t_tinc_hdr_rem == 0) {
222 /* could be 0 for a 0 len message */
223 tc->t_tinc_data_rem =
224 be32_to_cpu(tinc->ti_inc.i_hdr.h_len);
225 }
226 }
227
228 if (left && tc->t_tinc_data_rem) {
229 clone = skb_clone(skb, arg->gfp);
230 if (clone == NULL) {
231 desc->error = -ENOMEM;
232 goto out;
233 }
234
235 to_copy = min(tc->t_tinc_data_rem, left);
236 pskb_pull(clone, offset);
237 pskb_trim(clone, to_copy);
238 skb_queue_tail(&tinc->ti_skb_list, clone);
239
240 rdsdebug("skb %p data %p len %d off %u to_copy %zu -> "
241 "clone %p data %p len %d\n",
242 skb, skb->data, skb->len, offset, to_copy,
243 clone, clone->data, clone->len);
244
245 tc->t_tinc_data_rem -= to_copy;
246 left -= to_copy;
247 offset += to_copy;
248 }
249
250 if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) {
251 if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
252 rds_tcp_cong_recv(conn, tinc);
253 else
254 rds_recv_incoming(conn, conn->c_faddr,
255 conn->c_laddr, &tinc->ti_inc,
256 arg->gfp, arg->km);
257
258 tc->t_tinc_hdr_rem = sizeof(struct rds_header);
259 tc->t_tinc_data_rem = 0;
260 tc->t_tinc = NULL;
261 rds_inc_put(&tinc->ti_inc);
262 tinc = NULL;
263 }
264 }
265out:
266 rdsdebug("returning len %zu left %zu skb len %d rx queue depth %d\n",
267 len, left, skb->len,
268 skb_queue_len(&tc->t_sock->sk->sk_receive_queue));
269 return len - left;
270}
271
272/* the caller has to hold the sock lock */
273int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp, enum km_type km)
274{
275 struct rds_tcp_connection *tc = conn->c_transport_data;
276 struct socket *sock = tc->t_sock;
277 read_descriptor_t desc;
278 struct rds_tcp_desc_arg arg;
279
280 /* It's like glib in the kernel! */
281 arg.conn = conn;
282 arg.gfp = gfp;
283 arg.km = km;
284 desc.arg.data = &arg;
285 desc.error = 0;
286 desc.count = 1; /* give more than one skb per call */
287
288 tcp_read_sock(sock->sk, &desc, rds_tcp_data_recv);
289 rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp,
290 desc.error);
291
292 return desc.error;
293}
294
295/*
296 * We hold the sock lock to serialize our rds_tcp_recv->tcp_read_sock from
297 * data_ready.
298 *
299 * if we fail to allocate we're in trouble.. blindly wait some time before
300 * trying again to see if the VM can free up something for us.
301 */
302int rds_tcp_recv(struct rds_connection *conn)
303{
304 struct rds_tcp_connection *tc = conn->c_transport_data;
305 struct socket *sock = tc->t_sock;
306 int ret = 0;
307
308 rdsdebug("recv worker conn %p tc %p sock %p\n", conn, tc, sock);
309
310 lock_sock(sock->sk);
311 ret = rds_tcp_read_sock(conn, GFP_KERNEL, KM_USER0);
312 release_sock(sock->sk);
313
314 return ret;
315}
316
317void rds_tcp_data_ready(struct sock *sk, int bytes)
318{
319 void (*ready)(struct sock *sk, int bytes);
320 struct rds_connection *conn;
321 struct rds_tcp_connection *tc;
322
323 rdsdebug("data ready sk %p bytes %d\n", sk, bytes);
324
325 read_lock(&sk->sk_callback_lock);
326 conn = sk->sk_user_data;
327 if (conn == NULL) { /* check for teardown race */
328 ready = sk->sk_data_ready;
329 goto out;
330 }
331
332 tc = conn->c_transport_data;
333 ready = tc->t_orig_data_ready;
334 rds_tcp_stats_inc(s_tcp_data_ready_calls);
335
336 if (rds_tcp_read_sock(conn, GFP_ATOMIC, KM_SOFTIRQ0) == -ENOMEM)
337 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
338out:
339 read_unlock(&sk->sk_callback_lock);
340 ready(sk, bytes);
341}
342
343int __init rds_tcp_recv_init(void)
344{
345 rds_tcp_incoming_slab = kmem_cache_create("rds_tcp_incoming",
346 sizeof(struct rds_tcp_incoming),
347 0, 0, NULL);
348 if (rds_tcp_incoming_slab == NULL)
349 return -ENOMEM;
350 return 0;
351}
352
353void rds_tcp_recv_exit(void)
354{
355 kmem_cache_destroy(rds_tcp_incoming_slab);
356}
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
new file mode 100644
index 000000000000..ab545e0cd5d6
--- /dev/null
+++ b/net/rds/tcp_send.c
@@ -0,0 +1,263 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <net/tcp.h>
36
37#include "rds.h"
38#include "tcp.h"
39
40static void rds_tcp_cork(struct socket *sock, int val)
41{
42 mm_segment_t oldfs;
43
44 oldfs = get_fs();
45 set_fs(KERNEL_DS);
46 sock->ops->setsockopt(sock, SOL_TCP, TCP_CORK, (char __user *)&val,
47 sizeof(val));
48 set_fs(oldfs);
49}
50
51void rds_tcp_xmit_prepare(struct rds_connection *conn)
52{
53 struct rds_tcp_connection *tc = conn->c_transport_data;
54
55 rds_tcp_cork(tc->t_sock, 1);
56}
57
58void rds_tcp_xmit_complete(struct rds_connection *conn)
59{
60 struct rds_tcp_connection *tc = conn->c_transport_data;
61
62 rds_tcp_cork(tc->t_sock, 0);
63}
64
65/* the core send_sem serializes this with other xmit and shutdown */
66int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
67{
68 struct kvec vec = {
69 .iov_base = data,
70 .iov_len = len,
71 };
72 struct msghdr msg = {
73 .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
74 };
75
76 return kernel_sendmsg(sock, &msg, &vec, 1, vec.iov_len);
77}
78
79/* the core send_sem serializes this with other xmit and shutdown */
80int rds_tcp_xmit_cong_map(struct rds_connection *conn,
81 struct rds_cong_map *map, unsigned long offset)
82{
83 static struct rds_header rds_tcp_map_header = {
84 .h_flags = RDS_FLAG_CONG_BITMAP,
85 };
86 struct rds_tcp_connection *tc = conn->c_transport_data;
87 unsigned long i;
88 int ret;
89 int copied = 0;
90
91 /* Some problem claims cpu_to_be32(constant) isn't a constant. */
92 rds_tcp_map_header.h_len = cpu_to_be32(RDS_CONG_MAP_BYTES);
93
94 if (offset < sizeof(struct rds_header)) {
95 ret = rds_tcp_sendmsg(tc->t_sock,
96 (void *)&rds_tcp_map_header + offset,
97 sizeof(struct rds_header) - offset);
98 if (ret <= 0)
99 return ret;
100 offset += ret;
101 copied = ret;
102 if (offset < sizeof(struct rds_header))
103 return ret;
104 }
105
106 offset -= sizeof(struct rds_header);
107 i = offset / PAGE_SIZE;
108 offset = offset % PAGE_SIZE;
109 BUG_ON(i >= RDS_CONG_MAP_PAGES);
110
111 do {
112 ret = tc->t_sock->ops->sendpage(tc->t_sock,
113 virt_to_page(map->m_page_addrs[i]),
114 offset, PAGE_SIZE - offset,
115 MSG_DONTWAIT);
116 if (ret <= 0)
117 break;
118 copied += ret;
119 offset += ret;
120 if (offset == PAGE_SIZE) {
121 offset = 0;
122 i++;
123 }
124 } while (i < RDS_CONG_MAP_PAGES);
125
126 return copied ? copied : ret;
127}
128
129/* the core send_sem serializes this with other xmit and shutdown */
130int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
131 unsigned int hdr_off, unsigned int sg, unsigned int off)
132{
133 struct rds_tcp_connection *tc = conn->c_transport_data;
134 int done = 0;
135 int ret = 0;
136
137 if (hdr_off == 0) {
138 /*
139 * m_ack_seq is set to the sequence number of the last byte of
140 * header and data. see rds_tcp_is_acked().
141 */
142 tc->t_last_sent_nxt = rds_tcp_snd_nxt(tc);
143 rm->m_ack_seq = tc->t_last_sent_nxt +
144 sizeof(struct rds_header) +
145 be32_to_cpu(rm->m_inc.i_hdr.h_len) - 1;
146 smp_mb__before_clear_bit();
147 set_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags);
148 tc->t_last_expected_una = rm->m_ack_seq + 1;
149
150 rdsdebug("rm %p tcp nxt %u ack_seq %llu\n",
151 rm, rds_tcp_snd_nxt(tc),
152 (unsigned long long)rm->m_ack_seq);
153 }
154
155 if (hdr_off < sizeof(struct rds_header)) {
156 /* see rds_tcp_write_space() */
157 set_bit(SOCK_NOSPACE, &tc->t_sock->sk->sk_socket->flags);
158
159 ret = rds_tcp_sendmsg(tc->t_sock,
160 (void *)&rm->m_inc.i_hdr + hdr_off,
161 sizeof(rm->m_inc.i_hdr) - hdr_off);
162 if (ret < 0)
163 goto out;
164 done += ret;
165 if (hdr_off + done != sizeof(struct rds_header))
166 goto out;
167 }
168
169 while (sg < rm->m_nents) {
170 ret = tc->t_sock->ops->sendpage(tc->t_sock,
171 sg_page(&rm->m_sg[sg]),
172 rm->m_sg[sg].offset + off,
173 rm->m_sg[sg].length - off,
174 MSG_DONTWAIT|MSG_NOSIGNAL);
175 rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->m_sg[sg]),
176 rm->m_sg[sg].offset + off, rm->m_sg[sg].length - off,
177 ret);
178 if (ret <= 0)
179 break;
180
181 off += ret;
182 done += ret;
183 if (off == rm->m_sg[sg].length) {
184 off = 0;
185 sg++;
186 }
187 }
188
189out:
190 if (ret <= 0) {
191 /* write_space will hit after EAGAIN, all else fatal */
192 if (ret == -EAGAIN) {
193 rds_tcp_stats_inc(s_tcp_sndbuf_full);
194 ret = 0;
195 } else {
196 printk(KERN_WARNING "RDS/tcp: send to %u.%u.%u.%u "
197 "returned %d, disconnecting and reconnecting\n",
198 NIPQUAD(conn->c_faddr), ret);
199 rds_conn_drop(conn);
200 }
201 }
202 if (done == 0)
203 done = ret;
204 return done;
205}
206
207/*
208 * rm->m_ack_seq is set to the tcp sequence number that corresponds to the
209 * last byte of the message, including the header. This means that the
210 * entire message has been received if rm->m_ack_seq is "before" the next
211 * unacked byte of the TCP sequence space. We have to do very careful
212 * wrapping 32bit comparisons here.
213 */
214static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
215{
216 if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags))
217 return 0;
218 return (__s32)((u32)rm->m_ack_seq - (u32)ack) < 0;
219}
220
221void rds_tcp_write_space(struct sock *sk)
222{
223 void (*write_space)(struct sock *sk);
224 struct rds_connection *conn;
225 struct rds_tcp_connection *tc;
226
227 read_lock(&sk->sk_callback_lock);
228 conn = sk->sk_user_data;
229 if (conn == NULL) {
230 write_space = sk->sk_write_space;
231 goto out;
232 }
233
234 tc = conn->c_transport_data;
235 rdsdebug("write_space for tc %p\n", tc);
236 write_space = tc->t_orig_write_space;
237 rds_tcp_stats_inc(s_tcp_write_space_calls);
238
239 rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc));
240 tc->t_last_seen_una = rds_tcp_snd_una(tc);
241 rds_send_drop_acked(conn, rds_tcp_snd_una(tc), rds_tcp_is_acked);
242
243 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
244out:
245 read_unlock(&sk->sk_callback_lock);
246
247 /*
248 * write_space is only called when data leaves tcp's send queue if
249 * SOCK_NOSPACE is set. We set SOCK_NOSPACE every time we put
250 * data in tcp's send queue because we use write_space to parse the
251 * sequence numbers and notice that rds messages have been fully
252 * received.
253 *
254 * tcp's write_space clears SOCK_NOSPACE if the send queue has more
255 * than a certain amount of space. So we need to set it again *after*
256 * we call tcp's write_space or else we might only get called on the
257 * first of a series of incoming tcp acks.
258 */
259 write_space(sk);
260
261 if (sk->sk_socket)
262 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
263}
diff --git a/net/rds/tcp_stats.c b/net/rds/tcp_stats.c
new file mode 100644
index 000000000000..d5898d03cd68
--- /dev/null
+++ b/net/rds/tcp_stats.c
@@ -0,0 +1,74 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/percpu.h>
34#include <linux/seq_file.h>
35#include <linux/proc_fs.h>
36
37#include "rds.h"
38#include "tcp.h"
39
40DEFINE_PER_CPU(struct rds_tcp_statistics, rds_tcp_stats)
41 ____cacheline_aligned;
42
43static const char const *rds_tcp_stat_names[] = {
44 "tcp_data_ready_calls",
45 "tcp_write_space_calls",
46 "tcp_sndbuf_full",
47 "tcp_connect_raced",
48 "tcp_listen_closed_stale",
49};
50
51unsigned int rds_tcp_stats_info_copy(struct rds_info_iterator *iter,
52 unsigned int avail)
53{
54 struct rds_tcp_statistics stats = {0, };
55 uint64_t *src;
56 uint64_t *sum;
57 size_t i;
58 int cpu;
59
60 if (avail < ARRAY_SIZE(rds_tcp_stat_names))
61 goto out;
62
63 for_each_online_cpu(cpu) {
64 src = (uint64_t *)&(per_cpu(rds_tcp_stats, cpu));
65 sum = (uint64_t *)&stats;
66 for (i = 0; i < sizeof(stats) / sizeof(uint64_t); i++)
67 *(sum++) += *(src++);
68 }
69
70 rds_stats_info_copy(iter, (uint64_t *)&stats, rds_tcp_stat_names,
71 ARRAY_SIZE(rds_tcp_stat_names));
72out:
73 return ARRAY_SIZE(rds_tcp_stat_names);
74}
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 828a1bf9ea92..dd7e0cad1e7c 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -68,6 +68,7 @@
68 * (TCP, IB/RDMA) to provide the necessary synchronisation. 68 * (TCP, IB/RDMA) to provide the necessary synchronisation.
69 */ 69 */
70struct workqueue_struct *rds_wq; 70struct workqueue_struct *rds_wq;
71EXPORT_SYMBOL_GPL(rds_wq);
71 72
72void rds_connect_complete(struct rds_connection *conn) 73void rds_connect_complete(struct rds_connection *conn)
73{ 74{
@@ -89,6 +90,7 @@ void rds_connect_complete(struct rds_connection *conn)
89 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 90 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
90 queue_delayed_work(rds_wq, &conn->c_recv_w, 0); 91 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
91} 92}
93EXPORT_SYMBOL_GPL(rds_connect_complete);
92 94
93/* 95/*
94 * This random exponential backoff is relied on to eventually resolve racing 96 * This random exponential backoff is relied on to eventually resolve racing
diff --git a/net/rds/transport.c b/net/rds/transport.c
index 767da61ad2f3..7e1067901353 100644
--- a/net/rds/transport.c
+++ b/net/rds/transport.c
@@ -37,7 +37,7 @@
37#include "rds.h" 37#include "rds.h"
38#include "loop.h" 38#include "loop.h"
39 39
40static LIST_HEAD(rds_transports); 40static struct rds_transport *transports[RDS_TRANS_COUNT];
41static DECLARE_RWSEM(rds_trans_sem); 41static DECLARE_RWSEM(rds_trans_sem);
42 42
43int rds_trans_register(struct rds_transport *trans) 43int rds_trans_register(struct rds_transport *trans)
@@ -46,36 +46,44 @@ int rds_trans_register(struct rds_transport *trans)
46 46
47 down_write(&rds_trans_sem); 47 down_write(&rds_trans_sem);
48 48
49 list_add_tail(&trans->t_item, &rds_transports); 49 if (transports[trans->t_type])
50 printk(KERN_INFO "Registered RDS/%s transport\n", trans->t_name); 50 printk(KERN_ERR "RDS Transport type %d already registered\n",
51 trans->t_type);
52 else {
53 transports[trans->t_type] = trans;
54 printk(KERN_INFO "Registered RDS/%s transport\n", trans->t_name);
55 }
51 56
52 up_write(&rds_trans_sem); 57 up_write(&rds_trans_sem);
53 58
54 return 0; 59 return 0;
55} 60}
61EXPORT_SYMBOL_GPL(rds_trans_register);
56 62
57void rds_trans_unregister(struct rds_transport *trans) 63void rds_trans_unregister(struct rds_transport *trans)
58{ 64{
59 down_write(&rds_trans_sem); 65 down_write(&rds_trans_sem);
60 66
61 list_del_init(&trans->t_item); 67 transports[trans->t_type] = NULL;
62 printk(KERN_INFO "Unregistered RDS/%s transport\n", trans->t_name); 68 printk(KERN_INFO "Unregistered RDS/%s transport\n", trans->t_name);
63 69
64 up_write(&rds_trans_sem); 70 up_write(&rds_trans_sem);
65} 71}
72EXPORT_SYMBOL_GPL(rds_trans_unregister);
66 73
67struct rds_transport *rds_trans_get_preferred(__be32 addr) 74struct rds_transport *rds_trans_get_preferred(__be32 addr)
68{ 75{
69 struct rds_transport *trans;
70 struct rds_transport *ret = NULL; 76 struct rds_transport *ret = NULL;
77 int i;
71 78
72 if (IN_LOOPBACK(ntohl(addr))) 79 if (IN_LOOPBACK(ntohl(addr)))
73 return &rds_loop_transport; 80 return &rds_loop_transport;
74 81
75 down_read(&rds_trans_sem); 82 down_read(&rds_trans_sem);
76 list_for_each_entry(trans, &rds_transports, t_item) { 83 for (i = 0; i < RDS_TRANS_COUNT; i++)
77 if (trans->laddr_check(addr) == 0) { 84 {
78 ret = trans; 85 if (transports[i] && (transports[i]->laddr_check(addr) == 0)) {
86 ret = transports[i];
79 break; 87 break;
80 } 88 }
81 } 89 }
@@ -97,12 +105,15 @@ unsigned int rds_trans_stats_info_copy(struct rds_info_iterator *iter,
97 struct rds_transport *trans; 105 struct rds_transport *trans;
98 unsigned int total = 0; 106 unsigned int total = 0;
99 unsigned int part; 107 unsigned int part;
108 int i;
100 109
101 rds_info_iter_unmap(iter); 110 rds_info_iter_unmap(iter);
102 down_read(&rds_trans_sem); 111 down_read(&rds_trans_sem);
103 112
104 list_for_each_entry(trans, &rds_transports, t_item) { 113 for (i = 0; i < RDS_TRANS_COUNT; i++)
105 if (trans->stats_info_copy == NULL) 114 {
115 trans = transports[i];
116 if (!trans || !trans->stats_info_copy)
106 continue; 117 continue;
107 118
108 part = trans->stats_info_copy(iter, avail); 119 part = trans->stats_info_copy(iter, avail);