aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-sctp.c
diff options
context:
space:
mode:
authorPatrick Caulfield <pcaulfie@redhat.com>2006-11-02 11:19:21 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2006-11-30 10:35:00 -0500
commitfdda387f73947e6ae511ec601f5b3c6fbb582aac (patch)
tree77ba5e6a34c801fbf25fd31224fc64b84ca5403d /fs/dlm/lowcomms-sctp.c
parent61057c6bb3a3d14cf2bea6ca20dc6d367e1d852e (diff)
[DLM] Add support for tcp communications
The following patch adds a TCP based communications layer to the DLM which is compile time selectable. The existing SCTP layer gives the advantage of allowing multihoming, whereas the TCP layer has been heavily tested in previous versions of the DLM and is known to be robust and therefore can be used as a baseline for performance testing. Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lowcomms-sctp.c')
-rw-r--r--fs/dlm/lowcomms-sctp.c1239
1 files changed, 1239 insertions, 0 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
new file mode 100644
index 000000000000..6da6b14d5a61
--- /dev/null
+++ b/fs/dlm/lowcomms-sctp.c
@@ -0,0 +1,1239 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <net/sctp/user.h>
49#include <linux/pagemap.h>
50#include <linux/socket.h>
51#include <linux/idr.h>
52
53#include "dlm_internal.h"
54#include "lowcomms.h"
55#include "config.h"
56#include "midcomms.h"
57
58static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59static int dlm_local_count;
60static int dlm_local_nodeid;
61
62/* One of these per connected node */
63
64#define NI_INIT_PENDING 1
65#define NI_WRITE_PENDING 2
66
67struct nodeinfo {
68 spinlock_t lock;
69 sctp_assoc_t assoc_id;
70 unsigned long flags;
71 struct list_head write_list; /* nodes with pending writes */
72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock;
74 int nodeid;
75};
76
77static DEFINE_IDR(nodeinfo_idr);
78static struct rw_semaphore nodeinfo_lock;
79static int max_nodeid;
80
81struct cbuf {
82 unsigned base;
83 unsigned len;
84 unsigned mask;
85};
86
87/* Just the one of these, now. But this struct keeps
88 the connection-specific variables together */
89
90#define CF_READ_PENDING 1
91
92struct connection {
93 struct socket *sock;
94 unsigned long flags;
95 struct page *rx_page;
96 atomic_t waiting_requests;
97 struct cbuf cb;
98 int eagain_flag;
99};
100
101/* An entry waiting to be sent */
102
103struct writequeue_entry {
104 struct list_head list;
105 struct page *page;
106 int offset;
107 int len;
108 int end;
109 int users;
110 struct nodeinfo *ni;
111};
112
113#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
114#define CBUF_EMPTY(cb) ((cb)->len == 0)
115#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
116#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
117
118#define CBUF_INIT(cb, size) \
119do { \
120 (cb)->base = (cb)->len = 0; \
121 (cb)->mask = ((size)-1); \
122} while(0)
123
124#define CBUF_EAT(cb, n) \
125do { \
126 (cb)->len -= (n); \
127 (cb)->base += (n); \
128 (cb)->base &= (cb)->mask; \
129} while(0)
130
131
132/* List of nodes which have writes pending */
133static struct list_head write_nodes;
134static spinlock_t write_nodes_lock;
135
136/* Maximum number of incoming messages to process before
137 * doing a schedule()
138 */
139#define MAX_RX_MSG_COUNT 25
140
141/* Manage daemons */
142static struct task_struct *recv_task;
143static struct task_struct *send_task;
144static wait_queue_head_t lowcomms_recv_wait;
145static atomic_t accepting;
146
147/* The SCTP connection */
148static struct connection sctp_con;
149
150
151static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
152{
153 struct sockaddr_storage addr;
154 int error;
155
156 if (!dlm_local_count)
157 return -1;
158
159 error = dlm_nodeid_to_addr(nodeid, &addr);
160 if (error)
161 return error;
162
163 if (dlm_local_addr[0]->ss_family == AF_INET) {
164 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
165 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
166 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
167 } else {
168 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
169 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
170 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
171 sizeof(in6->sin6_addr));
172 }
173
174 return 0;
175}
176
177static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
178{
179 struct nodeinfo *ni;
180 int r;
181 int n;
182
183 down_read(&nodeinfo_lock);
184 ni = idr_find(&nodeinfo_idr, nodeid);
185 up_read(&nodeinfo_lock);
186
187 if (!ni && alloc) {
188 down_write(&nodeinfo_lock);
189
190 ni = idr_find(&nodeinfo_idr, nodeid);
191 if (ni)
192 goto out_up;
193
194 r = idr_pre_get(&nodeinfo_idr, alloc);
195 if (!r)
196 goto out_up;
197
198 ni = kmalloc(sizeof(struct nodeinfo), alloc);
199 if (!ni)
200 goto out_up;
201
202 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
203 if (r) {
204 kfree(ni);
205 ni = NULL;
206 goto out_up;
207 }
208 if (n != nodeid) {
209 idr_remove(&nodeinfo_idr, n);
210 kfree(ni);
211 ni = NULL;
212 goto out_up;
213 }
214 memset(ni, 0, sizeof(struct nodeinfo));
215 spin_lock_init(&ni->lock);
216 INIT_LIST_HEAD(&ni->writequeue);
217 spin_lock_init(&ni->writequeue_lock);
218 ni->nodeid = nodeid;
219
220 if (nodeid > max_nodeid)
221 max_nodeid = nodeid;
222 out_up:
223 up_write(&nodeinfo_lock);
224 }
225
226 return ni;
227}
228
229/* Don't call this too often... */
230static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
231{
232 int i;
233 struct nodeinfo *ni;
234
235 for (i=1; i<=max_nodeid; i++) {
236 ni = nodeid2nodeinfo(i, 0);
237 if (ni && ni->assoc_id == assoc)
238 return ni;
239 }
240 return NULL;
241}
242
243/* Data or notification available on socket */
244static void lowcomms_data_ready(struct sock *sk, int count_unused)
245{
246 atomic_inc(&sctp_con.waiting_requests);
247 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
248 return;
249
250 wake_up_interruptible(&lowcomms_recv_wait);
251}
252
253
254/* Add the port number to an IP6 or 4 sockaddr and return the address length.
255 Also padd out the struct with zeros to make comparisons meaningful */
256
257static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
258 int *addr_len)
259{
260 struct sockaddr_in *local4_addr;
261 struct sockaddr_in6 *local6_addr;
262
263 if (!dlm_local_count)
264 return;
265
266 if (!port) {
267 if (dlm_local_addr[0]->ss_family == AF_INET) {
268 local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
269 port = be16_to_cpu(local4_addr->sin_port);
270 } else {
271 local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
272 port = be16_to_cpu(local6_addr->sin6_port);
273 }
274 }
275
276 saddr->ss_family = dlm_local_addr[0]->ss_family;
277 if (dlm_local_addr[0]->ss_family == AF_INET) {
278 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
279 in4_addr->sin_port = cpu_to_be16(port);
280 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
281 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
282 sizeof(struct sockaddr_in));
283 *addr_len = sizeof(struct sockaddr_in);
284 } else {
285 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
286 in6_addr->sin6_port = cpu_to_be16(port);
287 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
288 sizeof(struct sockaddr_in6));
289 *addr_len = sizeof(struct sockaddr_in6);
290 }
291}
292
293/* Close the connection and tidy up */
294static void close_connection(void)
295{
296 if (sctp_con.sock) {
297 sock_release(sctp_con.sock);
298 sctp_con.sock = NULL;
299 }
300
301 if (sctp_con.rx_page) {
302 __free_page(sctp_con.rx_page);
303 sctp_con.rx_page = NULL;
304 }
305}
306
307/* We only send shutdown messages to nodes that are not part of the cluster */
308static void send_shutdown(sctp_assoc_t associd)
309{
310 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
311 struct msghdr outmessage;
312 struct cmsghdr *cmsg;
313 struct sctp_sndrcvinfo *sinfo;
314 int ret;
315
316 outmessage.msg_name = NULL;
317 outmessage.msg_namelen = 0;
318 outmessage.msg_control = outcmsg;
319 outmessage.msg_controllen = sizeof(outcmsg);
320 outmessage.msg_flags = MSG_EOR;
321
322 cmsg = CMSG_FIRSTHDR(&outmessage);
323 cmsg->cmsg_level = IPPROTO_SCTP;
324 cmsg->cmsg_type = SCTP_SNDRCV;
325 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
326 outmessage.msg_controllen = cmsg->cmsg_len;
327 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
328 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
329
330 sinfo->sinfo_flags |= MSG_EOF;
331 sinfo->sinfo_assoc_id = associd;
332
333 ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
334
335 if (ret != 0)
336 log_print("send EOF to node failed: %d", ret);
337}
338
339
340/* INIT failed but we don't know which node...
341 restart INIT on all pending nodes */
342static void init_failed(void)
343{
344 int i;
345 struct nodeinfo *ni;
346
347 for (i=1; i<=max_nodeid; i++) {
348 ni = nodeid2nodeinfo(i, 0);
349 if (!ni)
350 continue;
351
352 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
353 ni->assoc_id = 0;
354 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
355 spin_lock_bh(&write_nodes_lock);
356 list_add_tail(&ni->write_list, &write_nodes);
357 spin_unlock_bh(&write_nodes_lock);
358 }
359 }
360 }
361 wake_up_process(send_task);
362}
363
364/* Something happened to an association */
365static void process_sctp_notification(struct msghdr *msg, char *buf)
366{
367 union sctp_notification *sn = (union sctp_notification *)buf;
368
369 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
370 switch (sn->sn_assoc_change.sac_state) {
371
372 case SCTP_COMM_UP:
373 case SCTP_RESTART:
374 {
375 /* Check that the new node is in the lockspace */
376 struct sctp_prim prim;
377 mm_segment_t fs;
378 int nodeid;
379 int prim_len, ret;
380 int addr_len;
381 struct nodeinfo *ni;
382
383 /* This seems to happen when we received a connection
384 * too early... or something... anyway, it happens but
385 * we always seem to get a real message too, see
386 * receive_from_sock */
387
388 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
389 log_print("COMM_UP for invalid assoc ID %d",
390 (int)sn->sn_assoc_change.sac_assoc_id);
391 init_failed();
392 return;
393 }
394 memset(&prim, 0, sizeof(struct sctp_prim));
395 prim_len = sizeof(struct sctp_prim);
396 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
397
398 fs = get_fs();
399 set_fs(get_ds());
400 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
401 IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
402 (char*)&prim, &prim_len);
403 set_fs(fs);
404 if (ret < 0) {
405 struct nodeinfo *ni;
406
407 log_print("getsockopt/sctp_primary_addr on "
408 "new assoc %d failed : %d",
409 (int)sn->sn_assoc_change.sac_assoc_id, ret);
410
411 /* Retry INIT later */
412 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
413 if (ni)
414 clear_bit(NI_INIT_PENDING, &ni->flags);
415 return;
416 }
417 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
418 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
419 log_print("reject connect from unknown addr");
420 send_shutdown(prim.ssp_assoc_id);
421 return;
422 }
423
424 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
425 if (!ni)
426 return;
427
428 /* Save the assoc ID */
429 spin_lock(&ni->lock);
430 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
431 spin_unlock(&ni->lock);
432
433 log_print("got new/restarted association %d nodeid %d",
434 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
435
436 /* Send any pending writes */
437 clear_bit(NI_INIT_PENDING, &ni->flags);
438 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
439 spin_lock_bh(&write_nodes_lock);
440 list_add_tail(&ni->write_list, &write_nodes);
441 spin_unlock_bh(&write_nodes_lock);
442 }
443 wake_up_process(send_task);
444 }
445 break;
446
447 case SCTP_COMM_LOST:
448 case SCTP_SHUTDOWN_COMP:
449 {
450 struct nodeinfo *ni;
451
452 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
453 if (ni) {
454 spin_lock(&ni->lock);
455 ni->assoc_id = 0;
456 spin_unlock(&ni->lock);
457 }
458 }
459 break;
460
461 /* We don't know which INIT failed, so clear the PENDING flags
462 * on them all. if assoc_id is zero then it will then try
463 * again */
464
465 case SCTP_CANT_STR_ASSOC:
466 {
467 log_print("Can't start SCTP association - retrying");
468 init_failed();
469 }
470 break;
471
472 default:
473 log_print("unexpected SCTP assoc change id=%d state=%d",
474 (int)sn->sn_assoc_change.sac_assoc_id,
475 sn->sn_assoc_change.sac_state);
476 }
477 }
478}
479
480/* Data received from remote end */
481static int receive_from_sock(void)
482{
483 int ret = 0;
484 struct msghdr msg;
485 struct kvec iov[2];
486 unsigned len;
487 int r;
488 struct sctp_sndrcvinfo *sinfo;
489 struct cmsghdr *cmsg;
490 struct nodeinfo *ni;
491
492 /* These two are marginally too big for stack allocation, but this
493 * function is (currently) only called by dlm_recvd so static should be
494 * OK.
495 */
496 static struct sockaddr_storage msgname;
497 static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
498
499 if (sctp_con.sock == NULL)
500 goto out;
501
502 if (sctp_con.rx_page == NULL) {
503 /*
504 * This doesn't need to be atomic, but I think it should
505 * improve performance if it is.
506 */
507 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
508 if (sctp_con.rx_page == NULL)
509 goto out_resched;
510 CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE);
511 }
512
513 memset(&incmsg, 0, sizeof(incmsg));
514 memset(&msgname, 0, sizeof(msgname));
515
516 memset(incmsg, 0, sizeof(incmsg));
517 msg.msg_name = &msgname;
518 msg.msg_namelen = sizeof(msgname);
519 msg.msg_flags = 0;
520 msg.msg_control = incmsg;
521 msg.msg_controllen = sizeof(incmsg);
522 msg.msg_iovlen = 1;
523
524 /* I don't see why this circular buffer stuff is necessary for SCTP
525 * which is a packet-based protocol, but the whole thing breaks under
526 * load without it! The overhead is minimal (and is in the TCP lowcomms
527 * anyway, of course) so I'll leave it in until I can figure out what's
528 * really happening.
529 */
530
531 /*
532 * iov[0] is the bit of the circular buffer between the current end
533 * point (cb.base + cb.len) and the end of the buffer.
534 */
535 iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb);
536 iov[0].iov_base = page_address(sctp_con.rx_page) +
537 CBUF_DATA(&sctp_con.cb);
538 iov[1].iov_len = 0;
539
540 /*
541 * iov[1] is the bit of the circular buffer between the start of the
542 * buffer and the start of the currently used section (cb.base)
543 */
544 if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) {
545 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb);
546 iov[1].iov_len = sctp_con.cb.base;
547 iov[1].iov_base = page_address(sctp_con.rx_page);
548 msg.msg_iovlen = 2;
549 }
550 len = iov[0].iov_len + iov[1].iov_len;
551
552 r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
553 MSG_NOSIGNAL | MSG_DONTWAIT);
554 if (ret <= 0)
555 goto out_close;
556
557 msg.msg_control = incmsg;
558 msg.msg_controllen = sizeof(incmsg);
559 cmsg = CMSG_FIRSTHDR(&msg);
560 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
561
562 if (msg.msg_flags & MSG_NOTIFICATION) {
563 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
564 return 0;
565 }
566
567 /* Is this a new association ? */
568 ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
569 if (ni) {
570 ni->assoc_id = sinfo->sinfo_assoc_id;
571 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
572
573 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
574 spin_lock_bh(&write_nodes_lock);
575 list_add_tail(&ni->write_list, &write_nodes);
576 spin_unlock_bh(&write_nodes_lock);
577 }
578 wake_up_process(send_task);
579 }
580 }
581
582 /* INIT sends a message with length of 1 - ignore it */
583 if (r == 1)
584 return 0;
585
586 CBUF_ADD(&sctp_con.cb, ret);
587 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
588 page_address(sctp_con.rx_page),
589 sctp_con.cb.base, sctp_con.cb.len,
590 PAGE_CACHE_SIZE);
591 if (ret < 0)
592 goto out_close;
593 CBUF_EAT(&sctp_con.cb, ret);
594
595 out:
596 ret = 0;
597 goto out_ret;
598
599 out_resched:
600 lowcomms_data_ready(sctp_con.sock->sk, 0);
601 ret = 0;
602 schedule();
603 goto out_ret;
604
605 out_close:
606 if (ret != -EAGAIN)
607 log_print("error reading from sctp socket: %d", ret);
608 out_ret:
609 return ret;
610}
611
612/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
613static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
614{
615 mm_segment_t fs;
616 int result = 0;
617
618 fs = get_fs();
619 set_fs(get_ds());
620 if (num == 1)
621 result = sctp_con.sock->ops->bind(sctp_con.sock,
622 (struct sockaddr *) addr, addr_len);
623 else
624 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
625 SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len);
626 set_fs(fs);
627
628 if (result < 0)
629 log_print("Can't bind to port %d addr number %d",
630 dlm_config.tcp_port, num);
631
632 return result;
633}
634
635static void init_local(void)
636{
637 struct sockaddr_storage sas, *addr;
638 int i;
639
640 dlm_local_nodeid = dlm_our_nodeid();
641
642 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
643 if (dlm_our_addr(&sas, i))
644 break;
645
646 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
647 if (!addr)
648 break;
649 memcpy(addr, &sas, sizeof(*addr));
650 dlm_local_addr[dlm_local_count++] = addr;
651 }
652}
653
654/* Initialise SCTP socket and bind to all interfaces */
655static int init_sock(void)
656{
657 mm_segment_t fs;
658 struct socket *sock = NULL;
659 struct sockaddr_storage localaddr;
660 struct sctp_event_subscribe subscribe;
661 int result = -EINVAL, num = 1, i, addr_len;
662
663 if (!dlm_local_count) {
664 init_local();
665 if (!dlm_local_count) {
666 log_print("no local IP address has been set");
667 goto out;
668 }
669 }
670
671 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
672 IPPROTO_SCTP, &sock);
673 if (result < 0) {
674 log_print("Can't create comms socket, check SCTP is loaded");
675 goto out;
676 }
677
678 /* Listen for events */
679 memset(&subscribe, 0, sizeof(subscribe));
680 subscribe.sctp_data_io_event = 1;
681 subscribe.sctp_association_event = 1;
682 subscribe.sctp_send_failure_event = 1;
683 subscribe.sctp_shutdown_event = 1;
684 subscribe.sctp_partial_delivery_event = 1;
685
686 fs = get_fs();
687 set_fs(get_ds());
688 result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
689 (char *)&subscribe, sizeof(subscribe));
690 set_fs(fs);
691
692 if (result < 0) {
693 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
694 result);
695 goto create_delsock;
696 }
697
698 /* Init con struct */
699 sock->sk->sk_user_data = &sctp_con;
700 sctp_con.sock = sock;
701 sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
702
703 /* Bind to all interfaces. */
704 for (i = 0; i < dlm_local_count; i++) {
705 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
706 make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len);
707
708 result = add_bind_addr(&localaddr, addr_len, num);
709 if (result)
710 goto create_delsock;
711 ++num;
712 }
713
714 result = sock->ops->listen(sock, 5);
715 if (result < 0) {
716 log_print("Can't set socket listening");
717 goto create_delsock;
718 }
719
720 return 0;
721
722 create_delsock:
723 sock_release(sock);
724 sctp_con.sock = NULL;
725 out:
726 return result;
727}
728
729
730static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
731{
732 struct writequeue_entry *entry;
733
734 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
735 if (!entry)
736 return NULL;
737
738 entry->page = alloc_page(allocation);
739 if (!entry->page) {
740 kfree(entry);
741 return NULL;
742 }
743
744 entry->offset = 0;
745 entry->len = 0;
746 entry->end = 0;
747 entry->users = 0;
748
749 return entry;
750}
751
752void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
753{
754 struct writequeue_entry *e;
755 int offset = 0;
756 int users = 0;
757 struct nodeinfo *ni;
758
759 if (!atomic_read(&accepting))
760 return NULL;
761
762 ni = nodeid2nodeinfo(nodeid, allocation);
763 if (!ni)
764 return NULL;
765
766 spin_lock(&ni->writequeue_lock);
767 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
768 if (((struct list_head *) e == &ni->writequeue) ||
769 (PAGE_CACHE_SIZE - e->end < len)) {
770 e = NULL;
771 } else {
772 offset = e->end;
773 e->end += len;
774 users = e->users++;
775 }
776 spin_unlock(&ni->writequeue_lock);
777
778 if (e) {
779 got_one:
780 if (users == 0)
781 kmap(e->page);
782 *ppc = page_address(e->page) + offset;
783 return e;
784 }
785
786 e = new_writequeue_entry(allocation);
787 if (e) {
788 spin_lock(&ni->writequeue_lock);
789 offset = e->end;
790 e->end += len;
791 e->ni = ni;
792 users = e->users++;
793 list_add_tail(&e->list, &ni->writequeue);
794 spin_unlock(&ni->writequeue_lock);
795 goto got_one;
796 }
797 return NULL;
798}
799
800void dlm_lowcomms_commit_buffer(void *arg)
801{
802 struct writequeue_entry *e = (struct writequeue_entry *) arg;
803 int users;
804 struct nodeinfo *ni = e->ni;
805
806 if (!atomic_read(&accepting))
807 return;
808
809 spin_lock(&ni->writequeue_lock);
810 users = --e->users;
811 if (users)
812 goto out;
813 e->len = e->end - e->offset;
814 kunmap(e->page);
815 spin_unlock(&ni->writequeue_lock);
816
817 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
818 spin_lock_bh(&write_nodes_lock);
819 list_add_tail(&ni->write_list, &write_nodes);
820 spin_unlock_bh(&write_nodes_lock);
821 wake_up_process(send_task);
822 }
823 return;
824
825 out:
826 spin_unlock(&ni->writequeue_lock);
827 return;
828}
829
830static void free_entry(struct writequeue_entry *e)
831{
832 __free_page(e->page);
833 kfree(e);
834}
835
836/* Initiate an SCTP association. In theory we could just use sendmsg() on
837 the first IP address and it should work, but this allows us to set up the
838 association before sending any valuable data that we can't afford to lose.
839 It also keeps the send path clean as it can now always use the association ID */
840static void initiate_association(int nodeid)
841{
842 struct sockaddr_storage rem_addr;
843 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
844 struct msghdr outmessage;
845 struct cmsghdr *cmsg;
846 struct sctp_sndrcvinfo *sinfo;
847 int ret;
848 int addrlen;
849 char buf[1];
850 struct kvec iov[1];
851 struct nodeinfo *ni;
852
853 log_print("Initiating association with node %d", nodeid);
854
855 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
856 if (!ni)
857 return;
858
859 if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
860 log_print("no address for nodeid %d", nodeid);
861 return;
862 }
863
864 make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen);
865
866 outmessage.msg_name = &rem_addr;
867 outmessage.msg_namelen = addrlen;
868 outmessage.msg_control = outcmsg;
869 outmessage.msg_controllen = sizeof(outcmsg);
870 outmessage.msg_flags = MSG_EOR;
871
872 iov[0].iov_base = buf;
873 iov[0].iov_len = 1;
874
875 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
876 we can afford to lose */
877 cmsg = CMSG_FIRSTHDR(&outmessage);
878 cmsg->cmsg_level = IPPROTO_SCTP;
879 cmsg->cmsg_type = SCTP_SNDRCV;
880 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
881 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
882 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
883 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
884
885 outmessage.msg_controllen = cmsg->cmsg_len;
886 ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
887 if (ret < 0) {
888 log_print("send INIT to node failed: %d", ret);
889 /* Try again later */
890 clear_bit(NI_INIT_PENDING, &ni->flags);
891 }
892}
893
894/* Send a message */
895static int send_to_sock(struct nodeinfo *ni)
896{
897 int ret = 0;
898 struct writequeue_entry *e;
899 int len, offset;
900 struct msghdr outmsg;
901 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
902 struct cmsghdr *cmsg;
903 struct sctp_sndrcvinfo *sinfo;
904 struct kvec iov;
905
906 /* See if we need to init an association before we start
907 sending precious messages */
908 spin_lock(&ni->lock);
909 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
910 spin_unlock(&ni->lock);
911 initiate_association(ni->nodeid);
912 return 0;
913 }
914 spin_unlock(&ni->lock);
915
916 outmsg.msg_name = NULL; /* We use assoc_id */
917 outmsg.msg_namelen = 0;
918 outmsg.msg_control = outcmsg;
919 outmsg.msg_controllen = sizeof(outcmsg);
920 outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
921
922 cmsg = CMSG_FIRSTHDR(&outmsg);
923 cmsg->cmsg_level = IPPROTO_SCTP;
924 cmsg->cmsg_type = SCTP_SNDRCV;
925 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
926 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
927 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
928 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
929 sinfo->sinfo_assoc_id = ni->assoc_id;
930 outmsg.msg_controllen = cmsg->cmsg_len;
931
932 spin_lock(&ni->writequeue_lock);
933 for (;;) {
934 if (list_empty(&ni->writequeue))
935 break;
936 e = list_entry(ni->writequeue.next, struct writequeue_entry,
937 list);
938 len = e->len;
939 offset = e->offset;
940 BUG_ON(len == 0 && e->users == 0);
941 spin_unlock(&ni->writequeue_lock);
942 kmap(e->page);
943
944 ret = 0;
945 if (len) {
946 iov.iov_base = page_address(e->page)+offset;
947 iov.iov_len = len;
948
949 ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
950 len);
951 if (ret == -EAGAIN) {
952 sctp_con.eagain_flag = 1;
953 goto out;
954 } else if (ret < 0)
955 goto send_error;
956 } else {
957 /* Don't starve people filling buffers */
958 schedule();
959 }
960
961 spin_lock(&ni->writequeue_lock);
962 e->offset += ret;
963 e->len -= ret;
964
965 if (e->len == 0 && e->users == 0) {
966 list_del(&e->list);
967 free_entry(e);
968 continue;
969 }
970 }
971 spin_unlock(&ni->writequeue_lock);
972 out:
973 return ret;
974
975 send_error:
976 log_print("Error sending to node %d %d", ni->nodeid, ret);
977 spin_lock(&ni->lock);
978 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
979 ni->assoc_id = 0;
980 spin_unlock(&ni->lock);
981 initiate_association(ni->nodeid);
982 } else
983 spin_unlock(&ni->lock);
984
985 return ret;
986}
987
988/* Try to send any messages that are pending */
989static void process_output_queue(void)
990{
991 struct list_head *list;
992 struct list_head *temp;
993
994 spin_lock_bh(&write_nodes_lock);
995 list_for_each_safe(list, temp, &write_nodes) {
996 struct nodeinfo *ni =
997 list_entry(list, struct nodeinfo, write_list);
998 clear_bit(NI_WRITE_PENDING, &ni->flags);
999 list_del(&ni->write_list);
1000
1001 spin_unlock_bh(&write_nodes_lock);
1002
1003 send_to_sock(ni);
1004 spin_lock_bh(&write_nodes_lock);
1005 }
1006 spin_unlock_bh(&write_nodes_lock);
1007}
1008
1009/* Called after we've had -EAGAIN and been woken up */
1010static void refill_write_queue(void)
1011{
1012 int i;
1013
1014 for (i=1; i<=max_nodeid; i++) {
1015 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1016
1017 if (ni) {
1018 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1019 spin_lock_bh(&write_nodes_lock);
1020 list_add_tail(&ni->write_list, &write_nodes);
1021 spin_unlock_bh(&write_nodes_lock);
1022 }
1023 }
1024 }
1025}
1026
1027static void clean_one_writequeue(struct nodeinfo *ni)
1028{
1029 struct list_head *list;
1030 struct list_head *temp;
1031
1032 spin_lock(&ni->writequeue_lock);
1033 list_for_each_safe(list, temp, &ni->writequeue) {
1034 struct writequeue_entry *e =
1035 list_entry(list, struct writequeue_entry, list);
1036 list_del(&e->list);
1037 free_entry(e);
1038 }
1039 spin_unlock(&ni->writequeue_lock);
1040}
1041
1042static void clean_writequeues(void)
1043{
1044 int i;
1045
1046 for (i=1; i<=max_nodeid; i++) {
1047 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1048 if (ni)
1049 clean_one_writequeue(ni);
1050 }
1051}
1052
1053
1054static void dealloc_nodeinfo(void)
1055{
1056 int i;
1057
1058 for (i=1; i<=max_nodeid; i++) {
1059 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1060 if (ni) {
1061 idr_remove(&nodeinfo_idr, i);
1062 kfree(ni);
1063 }
1064 }
1065}
1066
1067int dlm_lowcomms_close(int nodeid)
1068{
1069 struct nodeinfo *ni;
1070
1071 ni = nodeid2nodeinfo(nodeid, 0);
1072 if (!ni)
1073 return -1;
1074
1075 spin_lock(&ni->lock);
1076 if (ni->assoc_id) {
1077 ni->assoc_id = 0;
1078 /* Don't send shutdown here, sctp will just queue it
1079 till the node comes back up! */
1080 }
1081 spin_unlock(&ni->lock);
1082
1083 clean_one_writequeue(ni);
1084 clear_bit(NI_INIT_PENDING, &ni->flags);
1085 return 0;
1086}
1087
1088static int write_list_empty(void)
1089{
1090 int status;
1091
1092 spin_lock_bh(&write_nodes_lock);
1093 status = list_empty(&write_nodes);
1094 spin_unlock_bh(&write_nodes_lock);
1095
1096 return status;
1097}
1098
1099static int dlm_recvd(void *data)
1100{
1101 DECLARE_WAITQUEUE(wait, current);
1102
1103 while (!kthread_should_stop()) {
1104 int count = 0;
1105
1106 set_current_state(TASK_INTERRUPTIBLE);
1107 add_wait_queue(&lowcomms_recv_wait, &wait);
1108 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1109 schedule();
1110 remove_wait_queue(&lowcomms_recv_wait, &wait);
1111 set_current_state(TASK_RUNNING);
1112
1113 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1114 int ret;
1115
1116 do {
1117 ret = receive_from_sock();
1118
1119 /* Don't starve out everyone else */
1120 if (++count >= MAX_RX_MSG_COUNT) {
1121 schedule();
1122 count = 0;
1123 }
1124 } while (!kthread_should_stop() && ret >=0);
1125 }
1126 schedule();
1127 }
1128
1129 return 0;
1130}
1131
1132static int dlm_sendd(void *data)
1133{
1134 DECLARE_WAITQUEUE(wait, current);
1135
1136 add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1137
1138 while (!kthread_should_stop()) {
1139 set_current_state(TASK_INTERRUPTIBLE);
1140 if (write_list_empty())
1141 schedule();
1142 set_current_state(TASK_RUNNING);
1143
1144 if (sctp_con.eagain_flag) {
1145 sctp_con.eagain_flag = 0;
1146 refill_write_queue();
1147 }
1148 process_output_queue();
1149 }
1150
1151 remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1152
1153 return 0;
1154}
1155
1156static void daemons_stop(void)
1157{
1158 kthread_stop(recv_task);
1159 kthread_stop(send_task);
1160}
1161
1162static int daemons_start(void)
1163{
1164 struct task_struct *p;
1165 int error;
1166
1167 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1168 error = IS_ERR(p);
1169 if (error) {
1170 log_print("can't start dlm_recvd %d", error);
1171 return error;
1172 }
1173 recv_task = p;
1174
1175 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1176 error = IS_ERR(p);
1177 if (error) {
1178 log_print("can't start dlm_sendd %d", error);
1179 kthread_stop(recv_task);
1180 return error;
1181 }
1182 send_task = p;
1183
1184 return 0;
1185}
1186
1187/*
1188 * This is quite likely to sleep...
1189 */
1190int dlm_lowcomms_start(void)
1191{
1192 int error;
1193
1194 error = init_sock();
1195 if (error)
1196 goto fail_sock;
1197 error = daemons_start();
1198 if (error)
1199 goto fail_sock;
1200 atomic_set(&accepting, 1);
1201 return 0;
1202
1203 fail_sock:
1204 close_connection();
1205 return error;
1206}
1207
1208/* Set all the activity flags to prevent any socket activity. */
1209
1210void dlm_lowcomms_stop(void)
1211{
1212 atomic_set(&accepting, 0);
1213 sctp_con.flags = 0x7;
1214 daemons_stop();
1215 clean_writequeues();
1216 close_connection();
1217 dealloc_nodeinfo();
1218 max_nodeid = 0;
1219}
1220
1221int dlm_lowcomms_init(void)
1222{
1223 init_waitqueue_head(&lowcomms_recv_wait);
1224 spin_lock_init(&write_nodes_lock);
1225 INIT_LIST_HEAD(&write_nodes);
1226 init_rwsem(&nodeinfo_lock);
1227 return 0;
1228}
1229
1230void dlm_lowcomms_exit(void)
1231{
1232 int i;
1233
1234 for (i = 0; i < dlm_local_count; i++)
1235 kfree(dlm_local_addr[i]);
1236 dlm_local_count = 0;
1237 dlm_local_nodeid = 0;
1238}
1239