aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrick Caulfield <pcaulfie@redhat.com>2007-04-17 10:39:57 -0400
committerSteven Whitehouse <swhiteho@redhat.com>2007-05-01 04:11:23 -0400
commit6ed7257b46709e87d79ac2b6b819b7e0c9184998 (patch)
tree502f68849175f8fb52bb141501df2df9efc8e06c
parentfc7c44f03d95f20b5446d06f5bb9605cddd53203 (diff)
[DLM] Consolidate transport protocols
This patch consolidates the TCP & SCTP protocols for the DLM into a single file and makes it switchable at run-time (well, at least before the DLM actually starts up!) For RHEL5 this patch requires Neil Horman's patch that expands the in-kernel socket API but that has already been twice ACKed so it should be OK. The patch adds a new lowcomms.c file that replaces the existing lowcomms-sctp.c & lowcomms-tcp.c files. Signed-off-By: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
-rw-r--r--fs/dlm/Kconfig31
-rw-r--r--fs/dlm/Makefile6
-rw-r--r--fs/dlm/config.c10
-rw-r--r--fs/dlm/config.h3
-rw-r--r--fs/dlm/lowcomms-sctp.c1210
-rw-r--r--fs/dlm/lowcomms.c (renamed from fs/dlm/lowcomms-tcp.c)742
6 files changed, 621 insertions, 1381 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig
index 6fa7b0d5c043..69a94690e493 100644
--- a/fs/dlm/Kconfig
+++ b/fs/dlm/Kconfig
@@ -3,36 +3,19 @@ menu "Distributed Lock Manager"
3 3
4config DLM 4config DLM
5 tristate "Distributed Lock Manager (DLM)" 5 tristate "Distributed Lock Manager (DLM)"
6 depends on SYSFS && (IPV6 || IPV6=n) 6 depends on IPV6 || IPV6=n
7 select CONFIGFS_FS 7 select CONFIGFS_FS
8 select IP_SCTP if DLM_SCTP 8 select IP_SCTP
9 help 9 help
10 A general purpose distributed lock manager for kernel or userspace 10 A general purpose distributed lock manager for kernel or userspace
11 applications. 11 applications.
12
13choice
14 prompt "Select DLM communications protocol"
15 depends on DLM
16 default DLM_TCP
17 help
18 The DLM Can use TCP or SCTP for it's network communications.
19 SCTP supports multi-homed operations whereas TCP doesn't.
20 However, SCTP seems to have stability problems at the moment.
21
22config DLM_TCP
23 bool "TCP/IP"
24
25config DLM_SCTP
26 bool "SCTP"
27
28endchoice
29 12
30config DLM_DEBUG 13config DLM_DEBUG
31 bool "DLM debugging" 14 bool "DLM debugging"
32 depends on DLM 15 depends on DLM
33 help 16 help
34 Under the debugfs mount point, the name of each lockspace will 17 Under the debugfs mount point, the name of each lockspace will
35 appear as a file in the "dlm" directory. The output is the 18 appear as a file in the "dlm" directory. The output is the
36 list of resource and locks the local node knows about. 19 list of resource and locks the local node knows about.
37 20
38endmenu 21endmenu
diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile
index 65388944eba0..604cf7dc5f39 100644
--- a/fs/dlm/Makefile
+++ b/fs/dlm/Makefile
@@ -8,14 +8,12 @@ dlm-y := ast.o \
8 member.o \ 8 member.o \
9 memory.o \ 9 memory.o \
10 midcomms.o \ 10 midcomms.o \
11 lowcomms.o \
11 rcom.o \ 12 rcom.o \
12 recover.o \ 13 recover.o \
13 recoverd.o \ 14 recoverd.o \
14 requestqueue.o \ 15 requestqueue.o \
15 user.o \ 16 user.o \
16 util.o 17 util.o
17dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o 18dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o
18 19
19dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o
20
21dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o \ No newline at end of file
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 8665c88e5af2..822abdcd1434 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -89,6 +89,7 @@ struct cluster {
89 unsigned int cl_toss_secs; 89 unsigned int cl_toss_secs;
90 unsigned int cl_scan_secs; 90 unsigned int cl_scan_secs;
91 unsigned int cl_log_debug; 91 unsigned int cl_log_debug;
92 unsigned int cl_protocol;
92}; 93};
93 94
94enum { 95enum {
@@ -101,6 +102,7 @@ enum {
101 CLUSTER_ATTR_TOSS_SECS, 102 CLUSTER_ATTR_TOSS_SECS,
102 CLUSTER_ATTR_SCAN_SECS, 103 CLUSTER_ATTR_SCAN_SECS,
103 CLUSTER_ATTR_LOG_DEBUG, 104 CLUSTER_ATTR_LOG_DEBUG,
105 CLUSTER_ATTR_PROTOCOL,
104}; 106};
105 107
106struct cluster_attribute { 108struct cluster_attribute {
@@ -159,6 +161,7 @@ CLUSTER_ATTR(recover_timer, 1);
159CLUSTER_ATTR(toss_secs, 1); 161CLUSTER_ATTR(toss_secs, 1);
160CLUSTER_ATTR(scan_secs, 1); 162CLUSTER_ATTR(scan_secs, 1);
161CLUSTER_ATTR(log_debug, 0); 163CLUSTER_ATTR(log_debug, 0);
164CLUSTER_ATTR(protocol, 0);
162 165
163static struct configfs_attribute *cluster_attrs[] = { 166static struct configfs_attribute *cluster_attrs[] = {
164 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 167 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -170,6 +173,7 @@ static struct configfs_attribute *cluster_attrs[] = {
170 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, 173 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
171 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, 174 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
172 [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, 175 [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
176 [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
173 NULL, 177 NULL,
174}; 178};
175 179
@@ -904,6 +908,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
904#define DEFAULT_TOSS_SECS 10 908#define DEFAULT_TOSS_SECS 10
905#define DEFAULT_SCAN_SECS 5 909#define DEFAULT_SCAN_SECS 5
906#define DEFAULT_LOG_DEBUG 0 910#define DEFAULT_LOG_DEBUG 0
911#define DEFAULT_PROTOCOL 0
907 912
908struct dlm_config_info dlm_config = { 913struct dlm_config_info dlm_config = {
909 .ci_tcp_port = DEFAULT_TCP_PORT, 914 .ci_tcp_port = DEFAULT_TCP_PORT,
@@ -914,6 +919,7 @@ struct dlm_config_info dlm_config = {
914 .ci_recover_timer = DEFAULT_RECOVER_TIMER, 919 .ci_recover_timer = DEFAULT_RECOVER_TIMER,
915 .ci_toss_secs = DEFAULT_TOSS_SECS, 920 .ci_toss_secs = DEFAULT_TOSS_SECS,
916 .ci_scan_secs = DEFAULT_SCAN_SECS, 921 .ci_scan_secs = DEFAULT_SCAN_SECS,
917 .ci_log_debug = DEFAULT_LOG_DEBUG 922 .ci_log_debug = DEFAULT_LOG_DEBUG,
923 .ci_protocol = DEFAULT_PROTOCOL
918}; 924};
919 925
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 1e978611a96e..967cc3d72e5e 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -26,6 +26,7 @@ struct dlm_config_info {
26 int ci_toss_secs; 26 int ci_toss_secs;
27 int ci_scan_secs; 27 int ci_scan_secs;
28 int ci_log_debug; 28 int ci_log_debug;
29 int ci_protocol;
29}; 30};
30 31
31extern struct dlm_config_info dlm_config; 32extern struct dlm_config_info dlm_config;
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
deleted file mode 100644
index dc83a9d979b5..000000000000
--- a/fs/dlm/lowcomms-sctp.c
+++ /dev/null
@@ -1,1210 +0,0 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <net/sctp/user.h>
49#include <linux/pagemap.h>
50#include <linux/socket.h>
51#include <linux/idr.h>
52
53#include "dlm_internal.h"
54#include "lowcomms.h"
55#include "config.h"
56#include "midcomms.h"
57
58static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59static int dlm_local_count;
60static int dlm_local_nodeid;
61
62/* One of these per connected node */
63
64#define NI_INIT_PENDING 1
65#define NI_WRITE_PENDING 2
66
67struct nodeinfo {
68 spinlock_t lock;
69 sctp_assoc_t assoc_id;
70 unsigned long flags;
71 struct list_head write_list; /* nodes with pending writes */
72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock;
74 int nodeid;
75 struct work_struct swork; /* Send workqueue */
76 struct work_struct lwork; /* Locking workqueue */
77};
78
79static DEFINE_IDR(nodeinfo_idr);
80static DECLARE_RWSEM(nodeinfo_lock);
81static int max_nodeid;
82
83struct cbuf {
84 unsigned int base;
85 unsigned int len;
86 unsigned int mask;
87};
88
89/* Just the one of these, now. But this struct keeps
90 the connection-specific variables together */
91
92#define CF_READ_PENDING 1
93
94struct connection {
95 struct socket *sock;
96 unsigned long flags;
97 struct page *rx_page;
98 atomic_t waiting_requests;
99 struct cbuf cb;
100 int eagain_flag;
101 struct work_struct work; /* Send workqueue */
102};
103
104/* An entry waiting to be sent */
105
106struct writequeue_entry {
107 struct list_head list;
108 struct page *page;
109 int offset;
110 int len;
111 int end;
112 int users;
113 struct nodeinfo *ni;
114};
115
116static void cbuf_add(struct cbuf *cb, int n)
117{
118 cb->len += n;
119}
120
121static int cbuf_data(struct cbuf *cb)
122{
123 return ((cb->base + cb->len) & cb->mask);
124}
125
126static void cbuf_init(struct cbuf *cb, int size)
127{
128 cb->base = cb->len = 0;
129 cb->mask = size-1;
130}
131
132static void cbuf_eat(struct cbuf *cb, int n)
133{
134 cb->len -= n;
135 cb->base += n;
136 cb->base &= cb->mask;
137}
138
139/* List of nodes which have writes pending */
140static LIST_HEAD(write_nodes);
141static DEFINE_SPINLOCK(write_nodes_lock);
142
143
144/* Maximum number of incoming messages to process before
145 * doing a schedule()
146 */
147#define MAX_RX_MSG_COUNT 25
148
149/* Work queues */
150static struct workqueue_struct *recv_workqueue;
151static struct workqueue_struct *send_workqueue;
152static struct workqueue_struct *lock_workqueue;
153
154/* The SCTP connection */
155static struct connection sctp_con;
156
157static void process_send_sockets(struct work_struct *work);
158static void process_recv_sockets(struct work_struct *work);
159static void process_lock_request(struct work_struct *work);
160
161static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
162{
163 struct sockaddr_storage addr;
164 int error;
165
166 if (!dlm_local_count)
167 return -1;
168
169 error = dlm_nodeid_to_addr(nodeid, &addr);
170 if (error)
171 return error;
172
173 if (dlm_local_addr[0]->ss_family == AF_INET) {
174 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
175 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
176 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
177 } else {
178 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
179 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
180 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
181 sizeof(in6->sin6_addr));
182 }
183
184 return 0;
185}
186
187/* If alloc is 0 here we will not attempt to allocate a new
188 nodeinfo struct */
189static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
190{
191 struct nodeinfo *ni;
192 int r;
193 int n;
194
195 down_read(&nodeinfo_lock);
196 ni = idr_find(&nodeinfo_idr, nodeid);
197 up_read(&nodeinfo_lock);
198
199 if (ni || !alloc)
200 return ni;
201
202 down_write(&nodeinfo_lock);
203
204 ni = idr_find(&nodeinfo_idr, nodeid);
205 if (ni)
206 goto out_up;
207
208 r = idr_pre_get(&nodeinfo_idr, alloc);
209 if (!r)
210 goto out_up;
211
212 ni = kmalloc(sizeof(struct nodeinfo), alloc);
213 if (!ni)
214 goto out_up;
215
216 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
217 if (r) {
218 kfree(ni);
219 ni = NULL;
220 goto out_up;
221 }
222 if (n != nodeid) {
223 idr_remove(&nodeinfo_idr, n);
224 kfree(ni);
225 ni = NULL;
226 goto out_up;
227 }
228 memset(ni, 0, sizeof(struct nodeinfo));
229 spin_lock_init(&ni->lock);
230 INIT_LIST_HEAD(&ni->writequeue);
231 spin_lock_init(&ni->writequeue_lock);
232 INIT_WORK(&ni->lwork, process_lock_request);
233 INIT_WORK(&ni->swork, process_send_sockets);
234 ni->nodeid = nodeid;
235
236 if (nodeid > max_nodeid)
237 max_nodeid = nodeid;
238out_up:
239 up_write(&nodeinfo_lock);
240
241 return ni;
242}
243
244/* Don't call this too often... */
245static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
246{
247 int i;
248 struct nodeinfo *ni;
249
250 for (i=1; i<=max_nodeid; i++) {
251 ni = nodeid2nodeinfo(i, 0);
252 if (ni && ni->assoc_id == assoc)
253 return ni;
254 }
255 return NULL;
256}
257
258/* Data or notification available on socket */
259static void lowcomms_data_ready(struct sock *sk, int count_unused)
260{
261 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
262 queue_work(recv_workqueue, &sctp_con.work);
263}
264
265
266/* Add the port number to an IP6 or 4 sockaddr and return the address length.
267 Also padd out the struct with zeros to make comparisons meaningful */
268
269static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
270 int *addr_len)
271{
272 struct sockaddr_in *local4_addr;
273 struct sockaddr_in6 *local6_addr;
274
275 if (!dlm_local_count)
276 return;
277
278 if (!port) {
279 if (dlm_local_addr[0]->ss_family == AF_INET) {
280 local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
281 port = be16_to_cpu(local4_addr->sin_port);
282 } else {
283 local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
284 port = be16_to_cpu(local6_addr->sin6_port);
285 }
286 }
287
288 saddr->ss_family = dlm_local_addr[0]->ss_family;
289 if (dlm_local_addr[0]->ss_family == AF_INET) {
290 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
291 in4_addr->sin_port = cpu_to_be16(port);
292 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
293 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
294 sizeof(struct sockaddr_in));
295 *addr_len = sizeof(struct sockaddr_in);
296 } else {
297 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
298 in6_addr->sin6_port = cpu_to_be16(port);
299 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
300 sizeof(struct sockaddr_in6));
301 *addr_len = sizeof(struct sockaddr_in6);
302 }
303}
304
305/* Close the connection and tidy up */
306static void close_connection(void)
307{
308 if (sctp_con.sock) {
309 sock_release(sctp_con.sock);
310 sctp_con.sock = NULL;
311 }
312
313 if (sctp_con.rx_page) {
314 __free_page(sctp_con.rx_page);
315 sctp_con.rx_page = NULL;
316 }
317}
318
319/* We only send shutdown messages to nodes that are not part of the cluster */
320static void send_shutdown(sctp_assoc_t associd)
321{
322 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
323 struct msghdr outmessage;
324 struct cmsghdr *cmsg;
325 struct sctp_sndrcvinfo *sinfo;
326 int ret;
327
328 outmessage.msg_name = NULL;
329 outmessage.msg_namelen = 0;
330 outmessage.msg_control = outcmsg;
331 outmessage.msg_controllen = sizeof(outcmsg);
332 outmessage.msg_flags = MSG_EOR;
333
334 cmsg = CMSG_FIRSTHDR(&outmessage);
335 cmsg->cmsg_level = IPPROTO_SCTP;
336 cmsg->cmsg_type = SCTP_SNDRCV;
337 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
338 outmessage.msg_controllen = cmsg->cmsg_len;
339 sinfo = CMSG_DATA(cmsg);
340 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
341
342 sinfo->sinfo_flags |= MSG_EOF;
343 sinfo->sinfo_assoc_id = associd;
344
345 ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
346
347 if (ret != 0)
348 log_print("send EOF to node failed: %d", ret);
349}
350
351
352/* INIT failed but we don't know which node...
353 restart INIT on all pending nodes */
354static void init_failed(void)
355{
356 int i;
357 struct nodeinfo *ni;
358
359 for (i=1; i<=max_nodeid; i++) {
360 ni = nodeid2nodeinfo(i, 0);
361 if (!ni)
362 continue;
363
364 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
365 ni->assoc_id = 0;
366 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
367 spin_lock_bh(&write_nodes_lock);
368 list_add_tail(&ni->write_list, &write_nodes);
369 spin_unlock_bh(&write_nodes_lock);
370 queue_work(send_workqueue, &ni->swork);
371 }
372 }
373 }
374}
375
376/* Something happened to an association */
377static void process_sctp_notification(struct msghdr *msg, char *buf)
378{
379 union sctp_notification *sn = (union sctp_notification *)buf;
380
381 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
382 switch (sn->sn_assoc_change.sac_state) {
383
384 case SCTP_COMM_UP:
385 case SCTP_RESTART:
386 {
387 /* Check that the new node is in the lockspace */
388 struct sctp_prim prim;
389 mm_segment_t fs;
390 int nodeid;
391 int prim_len, ret;
392 int addr_len;
393 struct nodeinfo *ni;
394
395 /* This seems to happen when we received a connection
396 * too early... or something... anyway, it happens but
397 * we always seem to get a real message too, see
398 * receive_from_sock */
399
400 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
401 log_print("COMM_UP for invalid assoc ID %d",
402 (int)sn->sn_assoc_change.sac_assoc_id);
403 init_failed();
404 return;
405 }
406 memset(&prim, 0, sizeof(struct sctp_prim));
407 prim_len = sizeof(struct sctp_prim);
408 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
409
410 fs = get_fs();
411 set_fs(get_ds());
412 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
413 IPPROTO_SCTP,
414 SCTP_PRIMARY_ADDR,
415 (char*)&prim,
416 &prim_len);
417 set_fs(fs);
418 if (ret < 0) {
419 struct nodeinfo *ni;
420
421 log_print("getsockopt/sctp_primary_addr on "
422 "new assoc %d failed : %d",
423 (int)sn->sn_assoc_change.sac_assoc_id,
424 ret);
425
426 /* Retry INIT later */
427 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
428 if (ni)
429 clear_bit(NI_INIT_PENDING, &ni->flags);
430 return;
431 }
432 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
433 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
434 log_print("reject connect from unknown addr");
435 send_shutdown(prim.ssp_assoc_id);
436 return;
437 }
438
439 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
440 if (!ni)
441 return;
442
443 /* Save the assoc ID */
444 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
445
446 log_print("got new/restarted association %d nodeid %d",
447 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
448
449 /* Send any pending writes */
450 clear_bit(NI_INIT_PENDING, &ni->flags);
451 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
452 spin_lock_bh(&write_nodes_lock);
453 list_add_tail(&ni->write_list, &write_nodes);
454 spin_unlock_bh(&write_nodes_lock);
455 queue_work(send_workqueue, &ni->swork);
456 }
457 }
458 break;
459
460 case SCTP_COMM_LOST:
461 case SCTP_SHUTDOWN_COMP:
462 {
463 struct nodeinfo *ni;
464
465 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
466 if (ni) {
467 spin_lock(&ni->lock);
468 ni->assoc_id = 0;
469 spin_unlock(&ni->lock);
470 }
471 }
472 break;
473
474 /* We don't know which INIT failed, so clear the PENDING flags
475 * on them all. if assoc_id is zero then it will then try
476 * again */
477
478 case SCTP_CANT_STR_ASSOC:
479 {
480 log_print("Can't start SCTP association - retrying");
481 init_failed();
482 }
483 break;
484
485 default:
486 log_print("unexpected SCTP assoc change id=%d state=%d",
487 (int)sn->sn_assoc_change.sac_assoc_id,
488 sn->sn_assoc_change.sac_state);
489 }
490 }
491}
492
493/* Data received from remote end */
494static int receive_from_sock(void)
495{
496 int ret = 0;
497 struct msghdr msg;
498 struct kvec iov[2];
499 unsigned len;
500 int r;
501 struct sctp_sndrcvinfo *sinfo;
502 struct cmsghdr *cmsg;
503 struct nodeinfo *ni;
504
505 /* These two are marginally too big for stack allocation, but this
506 * function is (currently) only called by dlm_recvd so static should be
507 * OK.
508 */
509 static struct sockaddr_storage msgname;
510 static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
511
512 if (sctp_con.sock == NULL)
513 goto out;
514
515 if (sctp_con.rx_page == NULL) {
516 /*
517 * This doesn't need to be atomic, but I think it should
518 * improve performance if it is.
519 */
520 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
521 if (sctp_con.rx_page == NULL)
522 goto out_resched;
523 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
524 }
525
526 memset(&incmsg, 0, sizeof(incmsg));
527 memset(&msgname, 0, sizeof(msgname));
528
529 msg.msg_name = &msgname;
530 msg.msg_namelen = sizeof(msgname);
531 msg.msg_flags = 0;
532 msg.msg_control = incmsg;
533 msg.msg_controllen = sizeof(incmsg);
534 msg.msg_iovlen = 1;
535
536 /* I don't see why this circular buffer stuff is necessary for SCTP
537 * which is a packet-based protocol, but the whole thing breaks under
538 * load without it! The overhead is minimal (and is in the TCP lowcomms
539 * anyway, of course) so I'll leave it in until I can figure out what's
540 * really happening.
541 */
542
543 /*
544 * iov[0] is the bit of the circular buffer between the current end
545 * point (cb.base + cb.len) and the end of the buffer.
546 */
547 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
548 iov[0].iov_base = page_address(sctp_con.rx_page) +
549 cbuf_data(&sctp_con.cb);
550 iov[1].iov_len = 0;
551
552 /*
553 * iov[1] is the bit of the circular buffer between the start of the
554 * buffer and the start of the currently used section (cb.base)
555 */
556 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
557 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
558 iov[1].iov_len = sctp_con.cb.base;
559 iov[1].iov_base = page_address(sctp_con.rx_page);
560 msg.msg_iovlen = 2;
561 }
562 len = iov[0].iov_len + iov[1].iov_len;
563
564 r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
565 MSG_NOSIGNAL | MSG_DONTWAIT);
566 if (ret <= 0)
567 goto out_close;
568
569 msg.msg_control = incmsg;
570 msg.msg_controllen = sizeof(incmsg);
571 cmsg = CMSG_FIRSTHDR(&msg);
572 sinfo = CMSG_DATA(cmsg);
573
574 if (msg.msg_flags & MSG_NOTIFICATION) {
575 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
576 return 0;
577 }
578
579 /* Is this a new association ? */
580 ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
581 if (ni) {
582 ni->assoc_id = sinfo->sinfo_assoc_id;
583 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
584
585 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
586 spin_lock_bh(&write_nodes_lock);
587 list_add_tail(&ni->write_list, &write_nodes);
588 spin_unlock_bh(&write_nodes_lock);
589 queue_work(send_workqueue, &ni->swork);
590 }
591 }
592 }
593
594 /* INIT sends a message with length of 1 - ignore it */
595 if (r == 1)
596 return 0;
597
598 cbuf_add(&sctp_con.cb, ret);
599 // PJC: TODO: Add to node's workqueue....can we ??
600 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
601 page_address(sctp_con.rx_page),
602 sctp_con.cb.base, sctp_con.cb.len,
603 PAGE_CACHE_SIZE);
604 if (ret < 0)
605 goto out_close;
606 cbuf_eat(&sctp_con.cb, ret);
607
608out:
609 ret = 0;
610 goto out_ret;
611
612out_resched:
613 lowcomms_data_ready(sctp_con.sock->sk, 0);
614 ret = 0;
615 cond_resched();
616 goto out_ret;
617
618out_close:
619 if (ret != -EAGAIN)
620 log_print("error reading from sctp socket: %d", ret);
621out_ret:
622 return ret;
623}
624
625/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
626static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
627{
628 mm_segment_t fs;
629 int result = 0;
630
631 fs = get_fs();
632 set_fs(get_ds());
633 if (num == 1)
634 result = sctp_con.sock->ops->bind(sctp_con.sock,
635 (struct sockaddr *) addr,
636 addr_len);
637 else
638 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
639 SCTP_SOCKOPT_BINDX_ADD,
640 (char *)addr, addr_len);
641 set_fs(fs);
642
643 if (result < 0)
644 log_print("Can't bind to port %d addr number %d",
645 dlm_config.ci_tcp_port, num);
646
647 return result;
648}
649
650static void init_local(void)
651{
652 struct sockaddr_storage sas, *addr;
653 int i;
654
655 dlm_local_nodeid = dlm_our_nodeid();
656
657 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
658 if (dlm_our_addr(&sas, i))
659 break;
660
661 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
662 if (!addr)
663 break;
664 memcpy(addr, &sas, sizeof(*addr));
665 dlm_local_addr[dlm_local_count++] = addr;
666 }
667}
668
669/* Initialise SCTP socket and bind to all interfaces */
670static int init_sock(void)
671{
672 mm_segment_t fs;
673 struct socket *sock = NULL;
674 struct sockaddr_storage localaddr;
675 struct sctp_event_subscribe subscribe;
676 int result = -EINVAL, num = 1, i, addr_len;
677
678 if (!dlm_local_count) {
679 init_local();
680 if (!dlm_local_count) {
681 log_print("no local IP address has been set");
682 goto out;
683 }
684 }
685
686 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
687 IPPROTO_SCTP, &sock);
688 if (result < 0) {
689 log_print("Can't create comms socket, check SCTP is loaded");
690 goto out;
691 }
692
693 /* Listen for events */
694 memset(&subscribe, 0, sizeof(subscribe));
695 subscribe.sctp_data_io_event = 1;
696 subscribe.sctp_association_event = 1;
697 subscribe.sctp_send_failure_event = 1;
698 subscribe.sctp_shutdown_event = 1;
699 subscribe.sctp_partial_delivery_event = 1;
700
701 fs = get_fs();
702 set_fs(get_ds());
703 result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
704 (char *)&subscribe, sizeof(subscribe));
705 set_fs(fs);
706
707 if (result < 0) {
708 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
709 result);
710 goto create_delsock;
711 }
712
713 /* Init con struct */
714 sock->sk->sk_user_data = &sctp_con;
715 sctp_con.sock = sock;
716 sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
717
718 /* Bind to all interfaces. */
719 for (i = 0; i < dlm_local_count; i++) {
720 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
721 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
722
723 result = add_bind_addr(&localaddr, addr_len, num);
724 if (result)
725 goto create_delsock;
726 ++num;
727 }
728
729 result = sock->ops->listen(sock, 5);
730 if (result < 0) {
731 log_print("Can't set socket listening");
732 goto create_delsock;
733 }
734
735 return 0;
736
737create_delsock:
738 sock_release(sock);
739 sctp_con.sock = NULL;
740out:
741 return result;
742}
743
744
745static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
746{
747 struct writequeue_entry *entry;
748
749 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
750 if (!entry)
751 return NULL;
752
753 entry->page = alloc_page(allocation);
754 if (!entry->page) {
755 kfree(entry);
756 return NULL;
757 }
758
759 entry->offset = 0;
760 entry->len = 0;
761 entry->end = 0;
762 entry->users = 0;
763
764 return entry;
765}
766
767void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
768{
769 struct writequeue_entry *e;
770 int offset = 0;
771 int users = 0;
772 struct nodeinfo *ni;
773
774 ni = nodeid2nodeinfo(nodeid, allocation);
775 if (!ni)
776 return NULL;
777
778 spin_lock(&ni->writequeue_lock);
779 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
780 if ((&e->list == &ni->writequeue) ||
781 (PAGE_CACHE_SIZE - e->end < len)) {
782 e = NULL;
783 } else {
784 offset = e->end;
785 e->end += len;
786 users = e->users++;
787 }
788 spin_unlock(&ni->writequeue_lock);
789
790 if (e) {
791 got_one:
792 if (users == 0)
793 kmap(e->page);
794 *ppc = page_address(e->page) + offset;
795 return e;
796 }
797
798 e = new_writequeue_entry(allocation);
799 if (e) {
800 spin_lock(&ni->writequeue_lock);
801 offset = e->end;
802 e->end += len;
803 e->ni = ni;
804 users = e->users++;
805 list_add_tail(&e->list, &ni->writequeue);
806 spin_unlock(&ni->writequeue_lock);
807 goto got_one;
808 }
809 return NULL;
810}
811
812void dlm_lowcomms_commit_buffer(void *arg)
813{
814 struct writequeue_entry *e = (struct writequeue_entry *) arg;
815 int users;
816 struct nodeinfo *ni = e->ni;
817
818 spin_lock(&ni->writequeue_lock);
819 users = --e->users;
820 if (users)
821 goto out;
822 e->len = e->end - e->offset;
823 kunmap(e->page);
824 spin_unlock(&ni->writequeue_lock);
825
826 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
827 spin_lock_bh(&write_nodes_lock);
828 list_add_tail(&ni->write_list, &write_nodes);
829 spin_unlock_bh(&write_nodes_lock);
830
831 queue_work(send_workqueue, &ni->swork);
832 }
833 return;
834
835out:
836 spin_unlock(&ni->writequeue_lock);
837 return;
838}
839
840static void free_entry(struct writequeue_entry *e)
841{
842 __free_page(e->page);
843 kfree(e);
844}
845
846/* Initiate an SCTP association. In theory we could just use sendmsg() on
847 the first IP address and it should work, but this allows us to set up the
848 association before sending any valuable data that we can't afford to lose.
849 It also keeps the send path clean as it can now always use the association ID */
850static void initiate_association(int nodeid)
851{
852 struct sockaddr_storage rem_addr;
853 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
854 struct msghdr outmessage;
855 struct cmsghdr *cmsg;
856 struct sctp_sndrcvinfo *sinfo;
857 int ret;
858 int addrlen;
859 char buf[1];
860 struct kvec iov[1];
861 struct nodeinfo *ni;
862
863 log_print("Initiating association with node %d", nodeid);
864
865 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
866 if (!ni)
867 return;
868
869 if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
870 log_print("no address for nodeid %d", nodeid);
871 return;
872 }
873
874 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
875
876 outmessage.msg_name = &rem_addr;
877 outmessage.msg_namelen = addrlen;
878 outmessage.msg_control = outcmsg;
879 outmessage.msg_controllen = sizeof(outcmsg);
880 outmessage.msg_flags = MSG_EOR;
881
882 iov[0].iov_base = buf;
883 iov[0].iov_len = 1;
884
885 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
886 we can afford to lose */
887 cmsg = CMSG_FIRSTHDR(&outmessage);
888 cmsg->cmsg_level = IPPROTO_SCTP;
889 cmsg->cmsg_type = SCTP_SNDRCV;
890 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
891 sinfo = CMSG_DATA(cmsg);
892 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
893 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
894
895 outmessage.msg_controllen = cmsg->cmsg_len;
896 ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
897 if (ret < 0) {
898 log_print("send INIT to node failed: %d", ret);
899 /* Try again later */
900 clear_bit(NI_INIT_PENDING, &ni->flags);
901 }
902}
903
904/* Send a message */
905static void send_to_sock(struct nodeinfo *ni)
906{
907 int ret = 0;
908 struct writequeue_entry *e;
909 int len, offset;
910 struct msghdr outmsg;
911 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
912 struct cmsghdr *cmsg;
913 struct sctp_sndrcvinfo *sinfo;
914 struct kvec iov;
915
916 /* See if we need to init an association before we start
917 sending precious messages */
918 spin_lock(&ni->lock);
919 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
920 spin_unlock(&ni->lock);
921 initiate_association(ni->nodeid);
922 return;
923 }
924 spin_unlock(&ni->lock);
925
926 outmsg.msg_name = NULL; /* We use assoc_id */
927 outmsg.msg_namelen = 0;
928 outmsg.msg_control = outcmsg;
929 outmsg.msg_controllen = sizeof(outcmsg);
930 outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
931
932 cmsg = CMSG_FIRSTHDR(&outmsg);
933 cmsg->cmsg_level = IPPROTO_SCTP;
934 cmsg->cmsg_type = SCTP_SNDRCV;
935 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
936 sinfo = CMSG_DATA(cmsg);
937 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
938 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
939 sinfo->sinfo_assoc_id = ni->assoc_id;
940 outmsg.msg_controllen = cmsg->cmsg_len;
941
942 spin_lock(&ni->writequeue_lock);
943 for (;;) {
944 if (list_empty(&ni->writequeue))
945 break;
946 e = list_entry(ni->writequeue.next, struct writequeue_entry,
947 list);
948 len = e->len;
949 offset = e->offset;
950 BUG_ON(len == 0 && e->users == 0);
951 spin_unlock(&ni->writequeue_lock);
952 kmap(e->page);
953
954 ret = 0;
955 if (len) {
956 iov.iov_base = page_address(e->page)+offset;
957 iov.iov_len = len;
958
959 ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
960 len);
961 if (ret == -EAGAIN) {
962 sctp_con.eagain_flag = 1;
963 goto out;
964 } else if (ret < 0)
965 goto send_error;
966 } else {
967 /* Don't starve people filling buffers */
968 cond_resched();
969 }
970
971 spin_lock(&ni->writequeue_lock);
972 e->offset += ret;
973 e->len -= ret;
974
975 if (e->len == 0 && e->users == 0) {
976 list_del(&e->list);
977 kunmap(e->page);
978 free_entry(e);
979 continue;
980 }
981 }
982 spin_unlock(&ni->writequeue_lock);
983out:
984 return;
985
986send_error:
987 log_print("Error sending to node %d %d", ni->nodeid, ret);
988 spin_lock(&ni->lock);
989 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
990 ni->assoc_id = 0;
991 spin_unlock(&ni->lock);
992 initiate_association(ni->nodeid);
993 } else
994 spin_unlock(&ni->lock);
995
996 return;
997}
998
999/* Try to send any messages that are pending */
1000static void process_output_queue(void)
1001{
1002 struct list_head *list;
1003 struct list_head *temp;
1004
1005 spin_lock_bh(&write_nodes_lock);
1006 list_for_each_safe(list, temp, &write_nodes) {
1007 struct nodeinfo *ni =
1008 list_entry(list, struct nodeinfo, write_list);
1009 clear_bit(NI_WRITE_PENDING, &ni->flags);
1010 list_del(&ni->write_list);
1011
1012 spin_unlock_bh(&write_nodes_lock);
1013
1014 send_to_sock(ni);
1015 spin_lock_bh(&write_nodes_lock);
1016 }
1017 spin_unlock_bh(&write_nodes_lock);
1018}
1019
1020/* Called after we've had -EAGAIN and been woken up */
1021static void refill_write_queue(void)
1022{
1023 int i;
1024
1025 for (i=1; i<=max_nodeid; i++) {
1026 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1027
1028 if (ni) {
1029 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1030 spin_lock_bh(&write_nodes_lock);
1031 list_add_tail(&ni->write_list, &write_nodes);
1032 spin_unlock_bh(&write_nodes_lock);
1033 }
1034 }
1035 }
1036}
1037
1038static void clean_one_writequeue(struct nodeinfo *ni)
1039{
1040 struct list_head *list;
1041 struct list_head *temp;
1042
1043 spin_lock(&ni->writequeue_lock);
1044 list_for_each_safe(list, temp, &ni->writequeue) {
1045 struct writequeue_entry *e =
1046 list_entry(list, struct writequeue_entry, list);
1047 list_del(&e->list);
1048 free_entry(e);
1049 }
1050 spin_unlock(&ni->writequeue_lock);
1051}
1052
1053static void clean_writequeues(void)
1054{
1055 int i;
1056
1057 for (i=1; i<=max_nodeid; i++) {
1058 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1059 if (ni)
1060 clean_one_writequeue(ni);
1061 }
1062}
1063
1064
1065static void dealloc_nodeinfo(void)
1066{
1067 int i;
1068
1069 for (i=1; i<=max_nodeid; i++) {
1070 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1071 if (ni) {
1072 idr_remove(&nodeinfo_idr, i);
1073 kfree(ni);
1074 }
1075 }
1076}
1077
1078int dlm_lowcomms_close(int nodeid)
1079{
1080 struct nodeinfo *ni;
1081
1082 ni = nodeid2nodeinfo(nodeid, 0);
1083 if (!ni)
1084 return -1;
1085
1086 spin_lock(&ni->lock);
1087 if (ni->assoc_id) {
1088 ni->assoc_id = 0;
1089 /* Don't send shutdown here, sctp will just queue it
1090 till the node comes back up! */
1091 }
1092 spin_unlock(&ni->lock);
1093
1094 clean_one_writequeue(ni);
1095 clear_bit(NI_INIT_PENDING, &ni->flags);
1096 return 0;
1097}
1098
1099// PJC: The work queue function for receiving.
1100static void process_recv_sockets(struct work_struct *work)
1101{
1102 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1103 int ret;
1104 int count = 0;
1105
1106 do {
1107 ret = receive_from_sock();
1108
1109 /* Don't starve out everyone else */
1110 if (++count >= MAX_RX_MSG_COUNT) {
1111 cond_resched();
1112 count = 0;
1113 }
1114 } while (!kthread_should_stop() && ret >=0);
1115 }
1116 cond_resched();
1117}
1118
1119// PJC: the work queue function for sending
1120static void process_send_sockets(struct work_struct *work)
1121{
1122 if (sctp_con.eagain_flag) {
1123 sctp_con.eagain_flag = 0;
1124 refill_write_queue();
1125 }
1126 process_output_queue();
1127}
1128
1129// PJC: Process lock requests from a particular node.
1130// TODO: can we optimise this out on UP ??
1131static void process_lock_request(struct work_struct *work)
1132{
1133}
1134
1135static void daemons_stop(void)
1136{
1137 destroy_workqueue(recv_workqueue);
1138 destroy_workqueue(send_workqueue);
1139 destroy_workqueue(lock_workqueue);
1140}
1141
1142static int daemons_start(void)
1143{
1144 int error;
1145 recv_workqueue = create_workqueue("dlm_recv");
1146 error = IS_ERR(recv_workqueue);
1147 if (error) {
1148 log_print("can't start dlm_recv %d", error);
1149 return error;
1150 }
1151
1152 send_workqueue = create_singlethread_workqueue("dlm_send");
1153 error = IS_ERR(send_workqueue);
1154 if (error) {
1155 log_print("can't start dlm_send %d", error);
1156 destroy_workqueue(recv_workqueue);
1157 return error;
1158 }
1159
1160 lock_workqueue = create_workqueue("dlm_rlock");
1161 error = IS_ERR(lock_workqueue);
1162 if (error) {
1163 log_print("can't start dlm_rlock %d", error);
1164 destroy_workqueue(send_workqueue);
1165 destroy_workqueue(recv_workqueue);
1166 return error;
1167 }
1168
1169 return 0;
1170}
1171
1172/*
1173 * This is quite likely to sleep...
1174 */
1175int dlm_lowcomms_start(void)
1176{
1177 int error;
1178
1179 INIT_WORK(&sctp_con.work, process_recv_sockets);
1180
1181 error = init_sock();
1182 if (error)
1183 goto fail_sock;
1184 error = daemons_start();
1185 if (error)
1186 goto fail_sock;
1187 return 0;
1188
1189fail_sock:
1190 close_connection();
1191 return error;
1192}
1193
1194void dlm_lowcomms_stop(void)
1195{
1196 int i;
1197
1198 sctp_con.flags = 0x7;
1199 daemons_stop();
1200 clean_writequeues();
1201 close_connection();
1202 dealloc_nodeinfo();
1203 max_nodeid = 0;
1204
1205 dlm_local_count = 0;
1206 dlm_local_nodeid = 0;
1207
1208 for (i = 0; i < dlm_local_count; i++)
1209 kfree(dlm_local_addr[i]);
1210}
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms.c
index 919e92a6aebb..76399b7819b4 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms.c
@@ -36,30 +36,36 @@
36 * of high load. Also, this way, the sending thread can collect together 36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block. 37 * messages bound for one node and send them in one block.
38 * 38 *
39 * I don't see any problem with the recv thread executing the locking 39 * lowcomms will choose to use wither TCP or SCTP as its transport layer
40 * code on behalf of remote processes as the locking code is 40 * depending on the configuration variable 'protocol'. This should be set
41 * short, efficient and never waits. 41 * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
42 * 44 *
43 */ 45 */
44 46
45
46#include <asm/ioctls.h> 47#include <asm/ioctls.h>
47#include <net/sock.h> 48#include <net/sock.h>
48#include <net/tcp.h> 49#include <net/tcp.h>
49#include <linux/pagemap.h> 50#include <linux/pagemap.h>
51#include <linux/idr.h>
52#include <linux/file.h>
53#include <linux/sctp.h>
54#include <net/sctp/user.h>
50 55
51#include "dlm_internal.h" 56#include "dlm_internal.h"
52#include "lowcomms.h" 57#include "lowcomms.h"
53#include "midcomms.h" 58#include "midcomms.h"
54#include "config.h" 59#include "config.h"
55 60
61#define NEEDED_RMEM (4*1024*1024)
62
56struct cbuf { 63struct cbuf {
57 unsigned int base; 64 unsigned int base;
58 unsigned int len; 65 unsigned int len;
59 unsigned int mask; 66 unsigned int mask;
60}; 67};
61 68
62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n) 69static void cbuf_add(struct cbuf *cb, int n)
64{ 70{
65 cb->len += n; 71 cb->len += n;
@@ -88,28 +94,25 @@ static bool cbuf_empty(struct cbuf *cb)
88 return cb->len == 0; 94 return cb->len == 0;
89} 95}
90 96
91/* Maximum number of incoming messages to process before
92 doing a cond_resched()
93*/
94#define MAX_RX_MSG_COUNT 25
95
96struct connection { 97struct connection {
97 struct socket *sock; /* NULL if not connected */ 98 struct socket *sock; /* NULL if not connected */
98 uint32_t nodeid; /* So we know who we are in the list */ 99 uint32_t nodeid; /* So we know who we are in the list */
99 struct mutex sock_mutex; 100 struct mutex sock_mutex;
100 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 101 unsigned long flags;
101#define CF_READ_PENDING 1 102#define CF_READ_PENDING 1
102#define CF_WRITE_PENDING 2 103#define CF_WRITE_PENDING 2
103#define CF_CONNECT_PENDING 3 104#define CF_CONNECT_PENDING 3
104#define CF_IS_OTHERCON 4 105#define CF_INIT_PENDING 4
106#define CF_IS_OTHERCON 5
105 struct list_head writequeue; /* List of outgoing writequeue_entries */ 107 struct list_head writequeue; /* List of outgoing writequeue_entries */
106 struct list_head listenlist; /* List of allocated listening sockets */
107 spinlock_t writequeue_lock; 108 spinlock_t writequeue_lock;
108 int (*rx_action) (struct connection *); /* What to do when active */ 109 int (*rx_action) (struct connection *); /* What to do when active */
110 void (*connect_action) (struct connection *); /* What to do to connect */
109 struct page *rx_page; 111 struct page *rx_page;
110 struct cbuf cb; 112 struct cbuf cb;
111 int retries; 113 int retries;
112#define MAX_CONNECT_RETRIES 3 114#define MAX_CONNECT_RETRIES 3
115 int sctp_assoc;
113 struct connection *othercon; 116 struct connection *othercon;
114 struct work_struct rwork; /* Receive workqueue */ 117 struct work_struct rwork; /* Receive workqueue */
115 struct work_struct swork; /* Send workqueue */ 118 struct work_struct swork; /* Send workqueue */
@@ -127,68 +130,136 @@ struct writequeue_entry {
127 struct connection *con; 130 struct connection *con;
128}; 131};
129 132
130static struct sockaddr_storage dlm_local_addr; 133static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
134static int dlm_local_count;
131 135
132/* Work queues */ 136/* Work queues */
133static struct workqueue_struct *recv_workqueue; 137static struct workqueue_struct *recv_workqueue;
134static struct workqueue_struct *send_workqueue; 138static struct workqueue_struct *send_workqueue;
135 139
136/* An array of pointers to connections, indexed by NODEID */ 140static DEFINE_IDR(connections_idr);
137static struct connection **connections;
138static DECLARE_MUTEX(connections_lock); 141static DECLARE_MUTEX(connections_lock);
142static int max_nodeid;
139static struct kmem_cache *con_cache; 143static struct kmem_cache *con_cache;
140static int conn_array_size;
141 144
142static void process_recv_sockets(struct work_struct *work); 145static void process_recv_sockets(struct work_struct *work);
143static void process_send_sockets(struct work_struct *work); 146static void process_send_sockets(struct work_struct *work);
144 147
145static struct connection *nodeid2con(int nodeid, gfp_t allocation) 148/*
149 * If 'allocation' is zero then we don't attempt to create a new
150 * connection structure for this node.
151 */
152static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
146{ 153{
147 struct connection *con = NULL; 154 struct connection *con = NULL;
155 int r;
156 int n;
148 157
149 down(&connections_lock); 158 con = idr_find(&connections_idr, nodeid);
150 if (nodeid >= conn_array_size) { 159 if (con || !alloc)
151 int new_size = nodeid + NODE_INCREMENT; 160 return con;
152 struct connection **new_conns;
153 161
154 new_conns = kzalloc(sizeof(struct connection *) * 162 r = idr_pre_get(&connections_idr, alloc);
155 new_size, allocation); 163 if (!r)
156 if (!new_conns) 164 return NULL;
157 goto finish; 165
166 con = kmem_cache_zalloc(con_cache, alloc);
167 if (!con)
168 return NULL;
158 169
159 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); 170 r = idr_get_new_above(&connections_idr, con, nodeid, &n);
160 conn_array_size = new_size; 171 if (r) {
161 kfree(connections); 172 kmem_cache_free(con_cache, con);
162 connections = new_conns; 173 return NULL;
174 }
163 175
176 if (n != nodeid) {
177 idr_remove(&connections_idr, n);
178 kmem_cache_free(con_cache, con);
179 return NULL;
164 } 180 }
165 181
166 con = connections[nodeid]; 182 con->nodeid = nodeid;
167 if (con == NULL && allocation) { 183 mutex_init(&con->sock_mutex);
168 con = kmem_cache_zalloc(con_cache, allocation); 184 INIT_LIST_HEAD(&con->writequeue);
169 if (!con) 185 spin_lock_init(&con->writequeue_lock);
170 goto finish; 186 INIT_WORK(&con->swork, process_send_sockets);
187 INIT_WORK(&con->rwork, process_recv_sockets);
171 188
172 con->nodeid = nodeid; 189 /* Setup action pointers for child sockets */
173 mutex_init(&con->sock_mutex); 190 if (con->nodeid) {
174 INIT_LIST_HEAD(&con->writequeue); 191 struct connection *zerocon = idr_find(&connections_idr, 0);
175 spin_lock_init(&con->writequeue_lock);
176 INIT_WORK(&con->swork, process_send_sockets);
177 INIT_WORK(&con->rwork, process_recv_sockets);
178 192
179 connections[nodeid] = con; 193 con->connect_action = zerocon->connect_action;
194 if (!con->rx_action)
195 con->rx_action = zerocon->rx_action;
180 } 196 }
181 197
182finish: 198 if (nodeid > max_nodeid)
199 max_nodeid = nodeid;
200
201 return con;
202}
203
204static struct connection *nodeid2con(int nodeid, gfp_t allocation)
205{
206 struct connection *con;
207
208 down(&connections_lock);
209 con = __nodeid2con(nodeid, allocation);
183 up(&connections_lock); 210 up(&connections_lock);
211
184 return con; 212 return con;
185} 213}
186 214
215/* This is a bit drastic, but only called when things go wrong */
216static struct connection *assoc2con(int assoc_id)
217{
218 int i;
219 struct connection *con;
220
221 down(&connections_lock);
222 for (i=0; i<max_nodeid; i++) {
223 con = __nodeid2con(i, 0);
224 if (con && con->sctp_assoc == assoc_id) {
225 up(&connections_lock);
226 return con;
227 }
228 }
229 up(&connections_lock);
230 return NULL;
231}
232
233static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
234{
235 struct sockaddr_storage addr;
236 int error;
237
238 if (!dlm_local_count)
239 return -1;
240
241 error = dlm_nodeid_to_addr(nodeid, &addr);
242 if (error)
243 return error;
244
245 if (dlm_local_addr[0]->ss_family == AF_INET) {
246 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
247 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
248 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
249 } else {
250 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
251 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
252 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
253 sizeof(in6->sin6_addr));
254 }
255
256 return 0;
257}
258
187/* Data available on socket or listen socket received a connect */ 259/* Data available on socket or listen socket received a connect */
188static void lowcomms_data_ready(struct sock *sk, int count_unused) 260static void lowcomms_data_ready(struct sock *sk, int count_unused)
189{ 261{
190 struct connection *con = sock2con(sk); 262 struct connection *con = sock2con(sk);
191
192 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 263 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
193 queue_work(recv_workqueue, &con->rwork); 264 queue_work(recv_workqueue, &con->rwork);
194} 265}
@@ -222,20 +293,21 @@ static int add_sock(struct socket *sock, struct connection *con)
222 con->sock->sk->sk_data_ready = lowcomms_data_ready; 293 con->sock->sk->sk_data_ready = lowcomms_data_ready;
223 con->sock->sk->sk_write_space = lowcomms_write_space; 294 con->sock->sk->sk_write_space = lowcomms_write_space;
224 con->sock->sk->sk_state_change = lowcomms_state_change; 295 con->sock->sk->sk_state_change = lowcomms_state_change;
225 296 con->sock->sk->sk_user_data = con;
226 return 0; 297 return 0;
227} 298}
228 299
229/* Add the port number to an IP6 or 4 sockaddr and return the address 300/* Add the port number to an IPv6 or 4 sockaddr and return the address
230 length */ 301 length */
231static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 302static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
232 int *addr_len) 303 int *addr_len)
233{ 304{
234 saddr->ss_family = dlm_local_addr.ss_family; 305 saddr->ss_family = dlm_local_addr[0]->ss_family;
235 if (saddr->ss_family == AF_INET) { 306 if (saddr->ss_family == AF_INET) {
236 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 307 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
237 in4_addr->sin_port = cpu_to_be16(port); 308 in4_addr->sin_port = cpu_to_be16(port);
238 *addr_len = sizeof(struct sockaddr_in); 309 *addr_len = sizeof(struct sockaddr_in);
310 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
239 } else { 311 } else {
240 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 312 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
241 in6_addr->sin6_port = cpu_to_be16(port); 313 in6_addr->sin6_port = cpu_to_be16(port);
@@ -264,6 +336,192 @@ static void close_connection(struct connection *con, bool and_other)
264 mutex_unlock(&con->sock_mutex); 336 mutex_unlock(&con->sock_mutex);
265} 337}
266 338
339/* We only send shutdown messages to nodes that are not part of the cluster */
340static void sctp_send_shutdown(sctp_assoc_t associd)
341{
342 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
343 struct msghdr outmessage;
344 struct cmsghdr *cmsg;
345 struct sctp_sndrcvinfo *sinfo;
346 int ret;
347 struct connection *con;
348
349 con = nodeid2con(0,0);
350 BUG_ON(con == NULL);
351
352 outmessage.msg_name = NULL;
353 outmessage.msg_namelen = 0;
354 outmessage.msg_control = outcmsg;
355 outmessage.msg_controllen = sizeof(outcmsg);
356 outmessage.msg_flags = MSG_EOR;
357
358 cmsg = CMSG_FIRSTHDR(&outmessage);
359 cmsg->cmsg_level = IPPROTO_SCTP;
360 cmsg->cmsg_type = SCTP_SNDRCV;
361 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
362 outmessage.msg_controllen = cmsg->cmsg_len;
363 sinfo = CMSG_DATA(cmsg);
364 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
365
366 sinfo->sinfo_flags |= MSG_EOF;
367 sinfo->sinfo_assoc_id = associd;
368
369 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
370
371 if (ret != 0)
372 log_print("send EOF to node failed: %d", ret);
373}
374
375/* INIT failed but we don't know which node...
376 restart INIT on all pending nodes */
377static void sctp_init_failed(void)
378{
379 int i;
380 struct connection *con;
381
382 down(&connections_lock);
383 for (i=1; i<=max_nodeid; i++) {
384 con = __nodeid2con(i, 0);
385 if (!con)
386 continue;
387 con->sctp_assoc = 0;
388 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
389 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
390 queue_work(send_workqueue, &con->swork);
391 }
392 }
393 }
394 up(&connections_lock);
395}
396
397/* Something happened to an association */
398static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf)
399{
400 union sctp_notification *sn = (union sctp_notification *)buf;
401
402 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
403 switch (sn->sn_assoc_change.sac_state) {
404
405 case SCTP_COMM_UP:
406 case SCTP_RESTART:
407 {
408 /* Check that the new node is in the lockspace */
409 struct sctp_prim prim;
410 int nodeid;
411 int prim_len, ret;
412 int addr_len;
413 struct connection *new_con;
414 struct file *file;
415 sctp_peeloff_arg_t parg;
416 int parglen = sizeof(parg);
417
418 /*
419 * We get this before any data for an association.
420 * We verify that the node is in the cluster and
421 * then peel off a socket for it.
422 */
423 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
424 log_print("COMM_UP for invalid assoc ID %d",
425 (int)sn->sn_assoc_change.sac_assoc_id);
426 sctp_init_failed();
427 return;
428 }
429 memset(&prim, 0, sizeof(struct sctp_prim));
430 prim_len = sizeof(struct sctp_prim);
431 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
432
433 ret = kernel_getsockopt(con->sock,
434 IPPROTO_SCTP,
435 SCTP_PRIMARY_ADDR,
436 (char*)&prim,
437 &prim_len);
438 if (ret < 0) {
439 log_print("getsockopt/sctp_primary_addr on "
440 "new assoc %d failed : %d",
441 (int)sn->sn_assoc_change.sac_assoc_id,
442 ret);
443
444 /* Retry INIT later */
445 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
446 if (new_con)
447 clear_bit(CF_CONNECT_PENDING, &con->flags);
448 return;
449 }
450 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
451 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
452 int i;
453 unsigned char *b=(unsigned char *)&prim.ssp_addr;
454 log_print("reject connect from unknown addr");
455 for (i=0; i<sizeof(struct sockaddr_storage);i++)
456 printk("%02x ", b[i]);
457 printk("\n");
458 sctp_send_shutdown(prim.ssp_assoc_id);
459 return;
460 }
461
462 new_con = nodeid2con(nodeid, GFP_KERNEL);
463 if (!new_con)
464 return;
465
466 /* Peel off a new sock */
467 parg.associd = sn->sn_assoc_change.sac_assoc_id;
468 ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, SCTP_SOCKOPT_PEELOFF,
469 (void *)&parg, &parglen);
470 if (ret < 0) {
471 log_print("Can't peel off a socket for connection %d to node %d: err=%d\n",
472 parg.associd, nodeid, ret);
473 return;
474 }
475
476 file = fget(parg.sd);
477 new_con->sock = SOCKET_I(file->f_dentry->d_inode);
478 add_sock(new_con->sock, new_con);
479 fput(file);
480 put_unused_fd(parg.sd);
481
482 log_print("got new/restarted association %d nodeid %d",
483 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
484
485 /* Send any pending writes */
486 clear_bit(CF_CONNECT_PENDING, &new_con->flags);
487 clear_bit(CF_INIT_PENDING, &con->flags);
488 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
489 queue_work(send_workqueue, &new_con->swork);
490 }
491 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
492 queue_work(recv_workqueue, &new_con->rwork);
493 }
494 break;
495
496 case SCTP_COMM_LOST:
497 case SCTP_SHUTDOWN_COMP:
498 {
499 con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
500 if (con) {
501 con->sctp_assoc = 0;
502 }
503 }
504 break;
505
506 /* We don't know which INIT failed, so clear the PENDING flags
507 * on them all. if assoc_id is zero then it will then try
508 * again */
509
510 case SCTP_CANT_STR_ASSOC:
511 {
512 log_print("Can't start SCTP association - retrying");
513 sctp_init_failed();
514 }
515 break;
516
517 default:
518 log_print("unexpected SCTP assoc change id=%d state=%d",
519 (int)sn->sn_assoc_change.sac_assoc_id,
520 sn->sn_assoc_change.sac_state);
521 }
522 }
523}
524
267/* Data received from remote end */ 525/* Data received from remote end */
268static int receive_from_sock(struct connection *con) 526static int receive_from_sock(struct connection *con)
269{ 527{
@@ -274,6 +532,7 @@ static int receive_from_sock(struct connection *con)
274 int r; 532 int r;
275 int call_again_soon = 0; 533 int call_again_soon = 0;
276 int nvec; 534 int nvec;
535 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
277 536
278 mutex_lock(&con->sock_mutex); 537 mutex_lock(&con->sock_mutex);
279 538
@@ -293,6 +552,11 @@ static int receive_from_sock(struct connection *con)
293 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 552 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
294 } 553 }
295 554
555 /* Only SCTP needs these really */
556 memset(&incmsg, 0, sizeof(incmsg));
557 msg.msg_control = incmsg;
558 msg.msg_controllen = sizeof(incmsg);
559
296 /* 560 /*
297 * iov[0] is the bit of the circular buffer between the current end 561 * iov[0] is the bit of the circular buffer between the current end
298 * point (cb.base + cb.len) and the end of the buffer. 562 * point (cb.base + cb.len) and the end of the buffer.
@@ -316,10 +580,22 @@ static int receive_from_sock(struct connection *con)
316 580
317 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 581 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
318 MSG_DONTWAIT | MSG_NOSIGNAL); 582 MSG_DONTWAIT | MSG_NOSIGNAL);
319
320 if (ret <= 0) 583 if (ret <= 0)
321 goto out_close; 584 goto out_close;
322 585
586 /* Process SCTP notifications */
587 if (msg.msg_flags & MSG_NOTIFICATION) {
588 BUG_ON(con->nodeid != 0);
589 msg.msg_control = incmsg;
590 msg.msg_controllen = sizeof(incmsg);
591
592 process_sctp_notification(con, &msg,
593 page_address(con->rx_page) + con->cb.base);
594 mutex_unlock(&con->sock_mutex);
595 return 0;
596 }
597 BUG_ON(con->nodeid == 0);
598
323 if (ret == len) 599 if (ret == len)
324 call_again_soon = 1; 600 call_again_soon = 1;
325 cbuf_add(&con->cb, ret); 601 cbuf_add(&con->cb, ret);
@@ -367,7 +643,7 @@ out_close:
367} 643}
368 644
369/* Listening socket is busy, accept a connection */ 645/* Listening socket is busy, accept a connection */
370static int accept_from_sock(struct connection *con) 646static int tcp_accept_from_sock(struct connection *con)
371{ 647{
372 int result; 648 int result;
373 struct sockaddr_storage peeraddr; 649 struct sockaddr_storage peeraddr;
@@ -378,7 +654,7 @@ static int accept_from_sock(struct connection *con)
378 struct connection *addcon; 654 struct connection *addcon;
379 655
380 memset(&peeraddr, 0, sizeof(peeraddr)); 656 memset(&peeraddr, 0, sizeof(peeraddr));
381 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, 657 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
382 IPPROTO_TCP, &newsock); 658 IPPROTO_TCP, &newsock);
383 if (result < 0) 659 if (result < 0)
384 return -ENOMEM; 660 return -ENOMEM;
@@ -418,7 +694,6 @@ static int accept_from_sock(struct connection *con)
418 /* Check to see if we already have a connection to this node. This 694 /* Check to see if we already have a connection to this node. This
419 * could happen if the two nodes initiate a connection at roughly 695 * could happen if the two nodes initiate a connection at roughly
420 * the same time and the connections cross on the wire. 696 * the same time and the connections cross on the wire.
421 * TEMPORARY FIX:
422 * In this case we store the incoming one in "othercon" 697 * In this case we store the incoming one in "othercon"
423 */ 698 */
424 newcon = nodeid2con(nodeid, GFP_KERNEL); 699 newcon = nodeid2con(nodeid, GFP_KERNEL);
@@ -480,8 +755,102 @@ accept_err:
480 return result; 755 return result;
481} 756}
482 757
758static void free_entry(struct writequeue_entry *e)
759{
760 __free_page(e->page);
761 kfree(e);
762}
763
764/* Initiate an SCTP association.
765 This is a special case of send_to_sock() in that we don't yet have a
766 peeled-off socket for this association, so we use the listening socket
767 and add the primary IP address of the remote node.
768 */
769static void sctp_init_assoc(struct connection *con)
770{
771 struct sockaddr_storage rem_addr;
772 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
773 struct msghdr outmessage;
774 struct cmsghdr *cmsg;
775 struct sctp_sndrcvinfo *sinfo;
776 struct connection *base_con;
777 struct writequeue_entry *e;
778 int len, offset;
779 int ret;
780 int addrlen;
781 struct kvec iov[1];
782
783 if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
784 return;
785
786 if (con->retries++ > MAX_CONNECT_RETRIES)
787 return;
788
789 log_print("Initiating association with node %d", con->nodeid);
790
791 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
792 log_print("no address for nodeid %d", con->nodeid);
793 return;
794 }
795 base_con = nodeid2con(0, 0);
796 BUG_ON(base_con == NULL);
797
798 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
799
800 outmessage.msg_name = &rem_addr;
801 outmessage.msg_namelen = addrlen;
802 outmessage.msg_control = outcmsg;
803 outmessage.msg_controllen = sizeof(outcmsg);
804 outmessage.msg_flags = MSG_EOR;
805
806 spin_lock(&con->writequeue_lock);
807 e = list_entry(con->writequeue.next, struct writequeue_entry,
808 list);
809
810 BUG_ON((struct list_head *) e == &con->writequeue);
811
812 len = e->len;
813 offset = e->offset;
814 spin_unlock(&con->writequeue_lock);
815 kmap(e->page);
816
817 /* Send the first block off the write queue */
818 iov[0].iov_base = page_address(e->page)+offset;
819 iov[0].iov_len = len;
820
821 cmsg = CMSG_FIRSTHDR(&outmessage);
822 cmsg->cmsg_level = IPPROTO_SCTP;
823 cmsg->cmsg_type = SCTP_SNDRCV;
824 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
825 sinfo = CMSG_DATA(cmsg);
826 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
827 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
828 outmessage.msg_controllen = cmsg->cmsg_len;
829
830 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
831 if (ret < 0) {
832 log_print("Send first packet to node %d failed: %d", con->nodeid, ret);
833
834 /* Try again later */
835 clear_bit(CF_CONNECT_PENDING, &con->flags);
836 clear_bit(CF_INIT_PENDING, &con->flags);
837 }
838 else {
839 spin_lock(&con->writequeue_lock);
840 e->offset += ret;
841 e->len -= ret;
842
843 if (e->len == 0 && e->users == 0) {
844 list_del(&e->list);
845 kunmap(e->page);
846 free_entry(e);
847 }
848 spin_unlock(&con->writequeue_lock);
849 }
850}
851
483/* Connect a new socket to its peer */ 852/* Connect a new socket to its peer */
484static void connect_to_sock(struct connection *con) 853static void tcp_connect_to_sock(struct connection *con)
485{ 854{
486 int result = -EHOSTUNREACH; 855 int result = -EHOSTUNREACH;
487 struct sockaddr_storage saddr; 856 struct sockaddr_storage saddr;
@@ -504,7 +873,7 @@ static void connect_to_sock(struct connection *con)
504 } 873 }
505 874
506 /* Create a socket to communicate with */ 875 /* Create a socket to communicate with */
507 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, 876 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
508 IPPROTO_TCP, &sock); 877 IPPROTO_TCP, &sock);
509 if (result < 0) 878 if (result < 0)
510 goto out_err; 879 goto out_err;
@@ -515,11 +884,11 @@ static void connect_to_sock(struct connection *con)
515 884
516 sock->sk->sk_user_data = con; 885 sock->sk->sk_user_data = con;
517 con->rx_action = receive_from_sock; 886 con->rx_action = receive_from_sock;
887 con->connect_action = tcp_connect_to_sock;
888 add_sock(sock, con);
518 889
519 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 890 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
520 891
521 add_sock(sock, con);
522
523 log_print("connecting to %d", con->nodeid); 892 log_print("connecting to %d", con->nodeid);
524 result = 893 result =
525 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 894 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
@@ -549,38 +918,36 @@ out:
549 return; 918 return;
550} 919}
551 920
552static struct socket *create_listen_sock(struct connection *con, 921static struct socket *tcp_create_listen_sock(struct connection *con,
553 struct sockaddr_storage *saddr) 922 struct sockaddr_storage *saddr)
554{ 923{
555 struct socket *sock = NULL; 924 struct socket *sock = NULL;
556 mm_segment_t fs;
557 int result = 0; 925 int result = 0;
558 int one = 1; 926 int one = 1;
559 int addr_len; 927 int addr_len;
560 928
561 if (dlm_local_addr.ss_family == AF_INET) 929 if (dlm_local_addr[0]->ss_family == AF_INET)
562 addr_len = sizeof(struct sockaddr_in); 930 addr_len = sizeof(struct sockaddr_in);
563 else 931 else
564 addr_len = sizeof(struct sockaddr_in6); 932 addr_len = sizeof(struct sockaddr_in6);
565 933
566 /* Create a socket to communicate with */ 934 /* Create a socket to communicate with */
567 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); 935 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
568 if (result < 0) { 936 if (result < 0) {
569 printk("dlm: Can't create listening comms socket\n"); 937 printk("dlm: Can't create listening comms socket\n");
570 goto create_out; 938 goto create_out;
571 } 939 }
572 940
573 fs = get_fs(); 941 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
574 set_fs(get_ds()); 942 (char *)&one, sizeof(one));
575 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 943
576 (char *)&one, sizeof(one));
577 set_fs(fs);
578 if (result < 0) { 944 if (result < 0) {
579 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n", 945 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
580 result); 946 result);
581 } 947 }
582 sock->sk->sk_user_data = con; 948 sock->sk->sk_user_data = con;
583 con->rx_action = accept_from_sock; 949 con->rx_action = tcp_accept_from_sock;
950 con->connect_action = tcp_connect_to_sock;
584 con->sock = sock; 951 con->sock = sock;
585 952
586 /* Bind to our port */ 953 /* Bind to our port */
@@ -593,13 +960,8 @@ static struct socket *create_listen_sock(struct connection *con,
593 con->sock = NULL; 960 con->sock = NULL;
594 goto create_out; 961 goto create_out;
595 } 962 }
596 963 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
597 fs = get_fs();
598 set_fs(get_ds());
599
600 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
601 (char *)&one, sizeof(one)); 964 (char *)&one, sizeof(one));
602 set_fs(fs);
603 if (result < 0) { 965 if (result < 0) {
604 printk("dlm: Set keepalive failed: %d\n", result); 966 printk("dlm: Set keepalive failed: %d\n", result);
605 } 967 }
@@ -616,18 +978,141 @@ create_out:
616 return sock; 978 return sock;
617} 979}
618 980
981/* Get local addresses */
982static void init_local(void)
983{
984 struct sockaddr_storage sas, *addr;
985 int i;
986
987 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
988 if (dlm_our_addr(&sas, i))
989 break;
990
991 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
992 if (!addr)
993 break;
994 memcpy(addr, &sas, sizeof(*addr));
995 dlm_local_addr[dlm_local_count++] = addr;
996 }
997}
998
999/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
1000static int add_sctp_bind_addr(struct connection *sctp_con, struct sockaddr_storage *addr, int addr_len, int num)
1001{
1002 int result = 0;
1003
1004 if (num == 1)
1005 result = kernel_bind(sctp_con->sock,
1006 (struct sockaddr *) addr,
1007 addr_len);
1008 else
1009 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1010 SCTP_SOCKOPT_BINDX_ADD,
1011 (char *)addr, addr_len);
1012
1013 if (result < 0)
1014 log_print("Can't bind to port %d addr number %d",
1015 dlm_config.ci_tcp_port, num);
1016
1017 return result;
1018}
619 1019
620/* Listen on all interfaces */ 1020/* Initialise SCTP socket and bind to all interfaces */
621static int listen_for_all(void) 1021static int sctp_listen_for_all(void)
1022{
1023 struct socket *sock = NULL;
1024 struct sockaddr_storage localaddr;
1025 struct sctp_event_subscribe subscribe;
1026 int result = -EINVAL, num = 1, i, addr_len;
1027 struct connection *con = nodeid2con(0, GFP_KERNEL);
1028 int bufsize = NEEDED_RMEM;
1029
1030 if (!con)
1031 return -ENOMEM;
1032
1033 log_print("Using SCTP for communications");
1034
1035 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1036 IPPROTO_SCTP, &sock);
1037 if (result < 0) {
1038 log_print("Can't create comms socket, check SCTP is loaded");
1039 goto out;
1040 }
1041
1042 /* Listen for events */
1043 memset(&subscribe, 0, sizeof(subscribe));
1044 subscribe.sctp_data_io_event = 1;
1045 subscribe.sctp_association_event = 1;
1046 subscribe.sctp_send_failure_event = 1;
1047 subscribe.sctp_shutdown_event = 1;
1048 subscribe.sctp_partial_delivery_event = 1;
1049
1050 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
1051 (char *)&bufsize, sizeof(bufsize));
1052 if (result)
1053 log_print("Error increasing buffer space on socket: %d", result);
1054
1055 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1056 (char *)&subscribe, sizeof(subscribe));
1057 if (result < 0) {
1058 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1059 result);
1060 goto create_delsock;
1061 }
1062
1063 /* Init con struct */
1064 sock->sk->sk_user_data = con;
1065 con->sock = sock;
1066 con->sock->sk->sk_data_ready = lowcomms_data_ready;
1067 con->rx_action = receive_from_sock;
1068 con->connect_action = sctp_init_assoc;
1069
1070 /* Bind to all interfaces. */
1071 for (i = 0; i < dlm_local_count; i++) {
1072 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1073 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1074
1075 result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1076 if (result)
1077 goto create_delsock;
1078 ++num;
1079 }
1080
1081 result = sock->ops->listen(sock, 5);
1082 if (result < 0) {
1083 log_print("Can't set socket listening");
1084 goto create_delsock;
1085 }
1086
1087 return 0;
1088
1089create_delsock:
1090 sock_release(sock);
1091 con->sock = NULL;
1092out:
1093 return result;
1094}
1095
1096static int tcp_listen_for_all(void)
622{ 1097{
623 struct socket *sock = NULL; 1098 struct socket *sock = NULL;
624 struct connection *con = nodeid2con(0, GFP_KERNEL); 1099 struct connection *con = nodeid2con(0, GFP_KERNEL);
625 int result = -EINVAL; 1100 int result = -EINVAL;
626 1101
1102 if (!con)
1103 return -ENOMEM;
1104
627 /* We don't support multi-homed hosts */ 1105 /* We don't support multi-homed hosts */
1106 if (dlm_local_addr[1] != NULL) {
1107 log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1108 return -EINVAL;
1109 }
1110
1111 log_print("Using TCP for communications");
1112
628 set_bit(CF_IS_OTHERCON, &con->flags); 1113 set_bit(CF_IS_OTHERCON, &con->flags);
629 1114
630 sock = create_listen_sock(con, &dlm_local_addr); 1115 sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
631 if (sock) { 1116 if (sock) {
632 add_sock(sock, con); 1117 add_sock(sock, con);
633 result = 0; 1118 result = 0;
@@ -734,12 +1219,6 @@ out:
734 return; 1219 return;
735} 1220}
736 1221
737static void free_entry(struct writequeue_entry *e)
738{
739 __free_page(e->page);
740 kfree(e);
741}
742
743/* Send a message */ 1222/* Send a message */
744static void send_to_sock(struct connection *con) 1223static void send_to_sock(struct connection *con)
745{ 1224{
@@ -806,7 +1285,8 @@ send_error:
806 1285
807out_connect: 1286out_connect:
808 mutex_unlock(&con->sock_mutex); 1287 mutex_unlock(&con->sock_mutex);
809 connect_to_sock(con); 1288 if (!test_bit(CF_INIT_PENDING, &con->flags))
1289 lowcomms_connect_sock(con);
810 return; 1290 return;
811} 1291}
812 1292
@@ -831,9 +1311,6 @@ int dlm_lowcomms_close(int nodeid)
831{ 1311{
832 struct connection *con; 1312 struct connection *con;
833 1313
834 if (!connections)
835 goto out;
836
837 log_print("closing connection to node %d", nodeid); 1314 log_print("closing connection to node %d", nodeid);
838 con = nodeid2con(nodeid, 0); 1315 con = nodeid2con(nodeid, 0);
839 if (con) { 1316 if (con) {
@@ -841,12 +1318,9 @@ int dlm_lowcomms_close(int nodeid)
841 close_connection(con, true); 1318 close_connection(con, true);
842 } 1319 }
843 return 0; 1320 return 0;
844
845out:
846 return -1;
847} 1321}
848 1322
849/* Look for activity on active sockets */ 1323/* Receive workqueue function */
850static void process_recv_sockets(struct work_struct *work) 1324static void process_recv_sockets(struct work_struct *work)
851{ 1325{
852 struct connection *con = container_of(work, struct connection, rwork); 1326 struct connection *con = container_of(work, struct connection, rwork);
@@ -858,15 +1332,14 @@ static void process_recv_sockets(struct work_struct *work)
858 } while (!err); 1332 } while (!err);
859} 1333}
860 1334
861 1335/* Send workqueue function */
862static void process_send_sockets(struct work_struct *work) 1336static void process_send_sockets(struct work_struct *work)
863{ 1337{
864 struct connection *con = container_of(work, struct connection, swork); 1338 struct connection *con = container_of(work, struct connection, swork);
865 1339
866 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1340 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
867 connect_to_sock(con); 1341 con->connect_action(con);
868 } 1342 }
869
870 clear_bit(CF_WRITE_PENDING, &con->flags); 1343 clear_bit(CF_WRITE_PENDING, &con->flags);
871 send_to_sock(con); 1344 send_to_sock(con);
872} 1345}
@@ -877,7 +1350,7 @@ static void clean_writequeues(void)
877{ 1350{
878 int nodeid; 1351 int nodeid;
879 1352
880 for (nodeid = 1; nodeid < conn_array_size; nodeid++) { 1353 for (nodeid = 1; nodeid < max_nodeid; nodeid++) {
881 struct connection *con = nodeid2con(nodeid, 0); 1354 struct connection *con = nodeid2con(nodeid, 0);
882 1355
883 if (con) 1356 if (con)
@@ -915,64 +1388,64 @@ static int work_start(void)
915void dlm_lowcomms_stop(void) 1388void dlm_lowcomms_stop(void)
916{ 1389{
917 int i; 1390 int i;
1391 struct connection *con;
918 1392
919 /* Set all the flags to prevent any 1393 /* Set all the flags to prevent any
920 socket activity. 1394 socket activity.
921 */ 1395 */
922 for (i = 0; i < conn_array_size; i++) { 1396 down(&connections_lock);
923 if (connections[i]) 1397 for (i = 0; i < max_nodeid; i++) {
924 connections[i]->flags |= 0xFF; 1398 con = __nodeid2con(i, 0);
1399 if (con)
1400 con->flags |= 0xFF;
925 } 1401 }
1402 up(&connections_lock);
926 1403
927 work_stop(); 1404 work_stop();
1405
1406 down(&connections_lock);
928 clean_writequeues(); 1407 clean_writequeues();
929 1408
930 for (i = 0; i < conn_array_size; i++) { 1409 for (i = 0; i < max_nodeid; i++) {
931 if (connections[i]) { 1410 con = nodeid2con(i, 0);
932 close_connection(connections[i], true); 1411 if (con) {
933 if (connections[i]->othercon) 1412 close_connection(con, true);
934 kmem_cache_free(con_cache, connections[i]->othercon); 1413 if (con->othercon)
935 kmem_cache_free(con_cache, connections[i]); 1414 kmem_cache_free(con_cache, con->othercon);
1415 kmem_cache_free(con_cache, con);
936 } 1416 }
937 } 1417 }
938 1418 up(&connections_lock);
939 kfree(connections);
940 connections = NULL;
941
942 kmem_cache_destroy(con_cache); 1419 kmem_cache_destroy(con_cache);
943} 1420}
944 1421
945/* This is quite likely to sleep... */
946int dlm_lowcomms_start(void) 1422int dlm_lowcomms_start(void)
947{ 1423{
948 int error = 0; 1424 int error = -EINVAL;
949 1425 struct connection *con;
950 error = -ENOMEM;
951 connections = kzalloc(sizeof(struct connection *) *
952 NODE_INCREMENT, GFP_KERNEL);
953 if (!connections)
954 goto out;
955
956 conn_array_size = NODE_INCREMENT;
957 1426
958 if (dlm_our_addr(&dlm_local_addr, 0)) { 1427 init_local();
1428 if (!dlm_local_count) {
959 log_print("no local IP address has been set"); 1429 log_print("no local IP address has been set");
960 goto fail_free_conn; 1430 goto out;
961 }
962 if (!dlm_our_addr(&dlm_local_addr, 1)) {
963 log_print("This dlm comms module does not support multi-homed clustering");
964 goto fail_free_conn;
965 } 1431 }
966 1432
1433 error = -ENOMEM;
967 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1434 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
968 __alignof__(struct connection), 0, 1435 __alignof__(struct connection), 0,
969 NULL, NULL); 1436 NULL, NULL);
970 if (!con_cache) 1437 if (!con_cache)
971 goto fail_free_conn; 1438 goto out;
972 1439
1440 /* Set some sysctl minima */
1441 if (sysctl_rmem_max < NEEDED_RMEM)
1442 sysctl_rmem_max = NEEDED_RMEM;
973 1443
974 /* Start listening */ 1444 /* Start listening */
975 error = listen_for_all(); 1445 if (dlm_config.ci_protocol == 0)
1446 error = tcp_listen_for_all();
1447 else
1448 error = sctp_listen_for_all();
976 if (error) 1449 if (error)
977 goto fail_unlisten; 1450 goto fail_unlisten;
978 1451
@@ -983,24 +1456,13 @@ int dlm_lowcomms_start(void)
983 return 0; 1456 return 0;
984 1457
985fail_unlisten: 1458fail_unlisten:
986 close_connection(connections[0], false); 1459 con = nodeid2con(0,0);
987 kmem_cache_free(con_cache, connections[0]); 1460 if (con) {
1461 close_connection(con, false);
1462 kmem_cache_free(con_cache, con);
1463 }
988 kmem_cache_destroy(con_cache); 1464 kmem_cache_destroy(con_cache);
989 1465
990fail_free_conn:
991 kfree(connections);
992
993out: 1466out:
994 return error; 1467 return error;
995} 1468}
996
997/*
998 * Overrides for Emacs so that we follow Linus's tabbing style.
999 * Emacs will notice this stuff at the end of the file and automatically
1000 * adjust the settings for this buffer only. This must remain at the end
1001 * of the file.
1002 * ---------------------------------------------------------------------------
1003 * Local variables:
1004 * c-file-style: "linux"
1005 * End:
1006 */