aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-tcp.c
diff options
context:
space:
mode:
authorPatrick Caulfield <pcaulfie@redhat.com>2006-11-02 11:19:21 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2006-11-30 10:35:00 -0500
commitfdda387f73947e6ae511ec601f5b3c6fbb582aac (patch)
tree77ba5e6a34c801fbf25fd31224fc64b84ca5403d /fs/dlm/lowcomms-tcp.c
parent61057c6bb3a3d14cf2bea6ca20dc6d367e1d852e (diff)
[DLM] Add support for tcp communications
The following patch adds a TCP based communications layer to the DLM which is compile time selectable. The existing SCTP layer gives the advantage of allowing multihoming, whereas the TCP layer has been heavily tested in previous versions of the DLM and is known to be robust and therefore can be used as a baseline for performance testing. Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r--fs/dlm/lowcomms-tcp.c1263
1 files changed, 1263 insertions, 0 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
new file mode 100644
index 000000000000..7289e59b4bd3
--- /dev/null
+++ b/fs/dlm/lowcomms-tcp.c
@@ -0,0 +1,1263 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never waits.
42 *
43 */
44
45
46#include <asm/ioctls.h>
47#include <net/sock.h>
48#include <net/tcp.h>
49#include <linux/pagemap.h>
50
51#include "dlm_internal.h"
52#include "lowcomms.h"
53#include "midcomms.h"
54#include "config.h"
55
56struct cbuf {
57 unsigned base;
58 unsigned len;
59 unsigned mask;
60};
61
62#ifndef FALSE
63#define FALSE 0
64#define TRUE 1
65#endif
66#define NODE_INCREMENT 32
67
68#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0)
69#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
70#define CBUF_EMPTY(cb) ((cb)->len == 0)
71#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
72#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0)
74#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
75
76/* Maximum number of incoming messages to process before
77 doing a schedule()
78*/
79#define MAX_RX_MSG_COUNT 25
80
81struct connection {
82 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89#define CF_READ_PENDING 1
90#define CF_WRITE_PENDING 2
91#define CF_CONNECT_PENDING 3
92#define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page;
98 struct cbuf cb;
99 int retries;
100 atomic_t waiting_requests;
101#define MAX_CONNECT_RETRIES 3
102 struct connection *othercon;
103};
104#define sock2con(x) ((struct connection *)(x)->sk_user_data)
105
106/* An entry waiting to be sent */
107struct writequeue_entry {
108 struct list_head list;
109 struct page *page;
110 int offset;
111 int len;
112 int end;
113 int users;
114 struct connection *con;
115};
116
117static struct sockaddr_storage dlm_local_addr;
118
119/* Manage daemons */
120static struct task_struct *recv_task;
121static struct task_struct *send_task;
122
123static wait_queue_t lowcomms_send_waitq_head;
124static wait_queue_head_t lowcomms_send_waitq;
125static wait_queue_t lowcomms_recv_waitq_head;
126static wait_queue_head_t lowcomms_recv_waitq;
127
128/* An array of pointers to connections, indexed by NODEID */
129static struct connection **connections;
130static struct semaphore connections_lock;
131static kmem_cache_t *con_cache;
132static int conn_array_size;
133static atomic_t accepting;
134
135/* List of sockets that have reads pending */
136static struct list_head read_sockets;
137static spinlock_t read_sockets_lock;
138
139/* List of sockets which have writes pending */
140static struct list_head write_sockets;
141static spinlock_t write_sockets_lock;
142
143/* List of sockets which have connects pending */
144static struct list_head state_sockets;
145static spinlock_t state_sockets_lock;
146
147static struct connection *nodeid2con(int nodeid, gfp_t allocation)
148{
149 struct connection *con = NULL;
150
151 down(&connections_lock);
152 if (nodeid >= conn_array_size) {
153 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns;
155
156 new_conns = kmalloc(sizeof(struct connection *) *
157 new_size, allocation);
158 if (!new_conns)
159 goto finish;
160
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size;
164 kfree(connections);
165 connections = new_conns;
166
167 }
168
169 con = connections[nodeid];
170 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation);
172 if (!con)
173 goto finish;
174
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue);
179 spin_lock_init(&con->writequeue_lock);
180
181 connections[nodeid] = con;
182 }
183
184 finish:
185 up(&connections_lock);
186 return con;
187}
188
189/* Data available on socket or listen socket received a connect */
190static void lowcomms_data_ready(struct sock *sk, int count_unused)
191{
192 struct connection *con = sock2con(sk);
193
194 atomic_inc(&con->waiting_requests);
195 if (test_and_set_bit(CF_READ_PENDING, &con->flags))
196 return;
197
198 spin_lock_bh(&read_sockets_lock);
199 list_add_tail(&con->read_list, &read_sockets);
200 spin_unlock_bh(&read_sockets_lock);
201
202 wake_up_interruptible(&lowcomms_recv_waitq);
203}
204
205static void lowcomms_write_space(struct sock *sk)
206{
207 struct connection *con = sock2con(sk);
208
209 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
210 return;
211
212 spin_lock_bh(&write_sockets_lock);
213 list_add_tail(&con->write_list, &write_sockets);
214 spin_unlock_bh(&write_sockets_lock);
215
216 wake_up_interruptible(&lowcomms_send_waitq);
217}
218
219static inline void lowcomms_connect_sock(struct connection *con)
220{
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return;
223 if (!atomic_read(&accepting))
224 return;
225
226 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets);
228 spin_unlock_bh(&state_sockets_lock);
229
230 wake_up_interruptible(&lowcomms_send_waitq);
231}
232
233static void lowcomms_state_change(struct sock *sk)
234{
235/* struct connection *con = sock2con(sk); */
236
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk);
240 break;
241
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
252 */
253 /* lowcomms_connect_sock(con); */
254 break;
255
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
259 }
260}
261
262/* Make a socket active */
263static int add_sock(struct socket *sock, struct connection *con)
264{
265 con->sock = sock;
266
267 /* Install a data_ready callback */
268 con->sock->sk->sk_data_ready = lowcomms_data_ready;
269 con->sock->sk->sk_write_space = lowcomms_write_space;
270 con->sock->sk->sk_state_change = lowcomms_state_change;
271
272 return 0;
273}
274
275/* Add the port number to an IP6 or 4 sockaddr and return the address
276 length */
277static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len)
279{
280 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in);
285 }
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6);
290 }
291}
292
293/* Close a remote connection and tidy up */
294static void close_connection(struct connection *con, int and_other)
295{
296 down_write(&con->sock_sem);
297
298 if (con->sock) {
299 sock_release(con->sock);
300 con->sock = NULL;
301 }
302 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code!
304 Actually, this isn't a list so it
305 will only re-enter once.
306 */
307 close_connection(con->othercon, FALSE);
308 }
309 if (con->rx_page) {
310 __free_page(con->rx_page);
311 con->rx_page = NULL;
312 }
313 con->retries = 0;
314 up_write(&con->sock_sem);
315}
316
317/* Data received from remote end */
318static int receive_from_sock(struct connection *con)
319{
320 int ret = 0;
321 struct msghdr msg;
322 struct iovec iov[2];
323 mm_segment_t fs;
324 unsigned len;
325 int r;
326 int call_again_soon = 0;
327
328 down_read(&con->sock_sem);
329
330 if (con->sock == NULL)
331 goto out;
332 if (con->rx_page == NULL) {
333 /*
334 * This doesn't need to be atomic, but I think it should
335 * improve performance if it is.
336 */
337 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL)
339 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE);
341 }
342
343 msg.msg_control = NULL;
344 msg.msg_controllen = 0;
345 msg.msg_iovlen = 1;
346 msg.msg_iov = iov;
347 msg.msg_name = NULL;
348 msg.msg_namelen = 0;
349 msg.msg_flags = 0;
350
351 /*
352 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer.
354 */
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb);
357 iov[1].iov_len = 0;
358
359 /*
360 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base)
362 */
363 if (CBUF_DATA(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb);
365 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2;
368 }
369 len = iov[0].iov_len + iov[1].iov_len;
370
371 fs = get_fs();
372 set_fs(get_ds());
373 r = ret = sock_recvmsg(con->sock, &msg, len,
374 MSG_DONTWAIT | MSG_NOSIGNAL);
375 set_fs(fs);
376
377 if (ret <= 0)
378 goto out_close;
379 if (ret == len)
380 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page),
384 con->cb.base, con->cb.len,
385 PAGE_CACHE_SIZE);
386 if (ret == -EBADMSG) {
387 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
388 "iov_len=%u, iov_base[0]=%p, read=%d\n",
389 page_address(con->rx_page), con->cb.base, con->cb.len,
390 len, iov[0].iov_base, r);
391 }
392 if (ret < 0)
393 goto out_close;
394 CBUF_EAT(&con->cb, ret);
395
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page);
398 con->rx_page = NULL;
399 }
400
401 out:
402 if (call_again_soon)
403 goto out_resched;
404 up_read(&con->sock_sem);
405 ret = 0;
406 goto out_ret;
407
408 out_resched:
409 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem);
411 ret = 0;
412 schedule();
413 goto out_ret;
414
415 out_close:
416 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE);
419 /* Reconnect when there is something to send */
420 }
421
422 out_ret:
423 return ret;
424}
425
426/* Listening socket is busy, accept a connection */
427static int accept_from_sock(struct connection *con)
428{
429 int result;
430 struct sockaddr_storage peeraddr;
431 struct socket *newsock;
432 int len;
433 int nodeid;
434 struct connection *newcon;
435
436 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock);
438 if (result < 0)
439 return -ENOMEM;
440
441 down_read(&con->sock_sem);
442
443 result = -ENOTCONN;
444 if (con->sock == NULL)
445 goto accept_err;
446
447 newsock->type = con->sock->type;
448 newsock->ops = con->sock->ops;
449
450 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
451 if (result < 0)
452 goto accept_err;
453
454 /* Get the connected socket's peer */
455 memset(&peeraddr, 0, sizeof(peeraddr));
456 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
457 &len, 2)) {
458 result = -ECONNABORTED;
459 goto accept_err;
460 }
461
462 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock);
467 up_read(&con->sock_sem);
468 return -1;
469 }
470
471 log_print("got connection from %d", nodeid);
472
473 /* Check to see if we already have a connection to this node. This
474 * could happen if the two nodes initiate a connection at roughly
475 * the same time and the connections cross on the wire.
476 * TEMPORARY FIX:
477 * In this case we store the incoming one in "othercon"
478 */
479 newcon = nodeid2con(nodeid, GFP_KERNEL);
480 if (!newcon) {
481 result = -ENOMEM;
482 goto accept_err;
483 }
484 down_write(&newcon->sock_sem);
485 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon;
487
488 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL);
490 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem);
493 result = -ENOMEM;
494 goto accept_err;
495 }
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem);
500 set_bit(CF_IS_OTHERCON, &othercon->flags);
501 newcon->othercon = othercon;
502 }
503 othercon->sock = newsock;
504 newsock->sk->sk_user_data = othercon;
505 add_sock(newsock, othercon);
506 }
507 else {
508 newsock->sk->sk_user_data = newcon;
509 newcon->rx_action = receive_from_sock;
510 add_sock(newsock, newcon);
511
512 }
513
514 up_write(&newcon->sock_sem);
515
516 /*
517 * Add it to the active queue in case we got data
518 * beween processing the accept adding the socket
519 * to the read_sockets list
520 */
521 lowcomms_data_ready(newsock->sk, 0);
522 up_read(&con->sock_sem);
523
524 return 0;
525
526 accept_err:
527 up_read(&con->sock_sem);
528 sock_release(newsock);
529
530 if (result != -EAGAIN)
531 printk("dlm: error accepting connection from node: %d\n", result);
532 return result;
533}
534
535/* Connect a new socket to its peer */
536static int connect_to_sock(struct connection *con)
537{
538 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr;
540 int addr_len;
541 struct socket *sock;
542
543 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled");
545 return 0;
546 }
547
548 down_write(&con->sock_sem);
549 if (con->retries++ > MAX_CONNECT_RETRIES)
550 goto out;
551
552 /* Some odd races can cause double-connects, ignore them */
553 if (con->sock) {
554 result = 0;
555 goto out;
556 }
557
558 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
560 if (result < 0)
561 goto out_err;
562
563 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err;
566
567 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock;
569
570 make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
571
572 add_sock(sock, con);
573
574 log_print("connecting to %d", con->nodeid);
575 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK);
578 if (result == -EINPROGRESS)
579 result = 0;
580 if (result != 0)
581 goto out_err;
582
583 out:
584 up_write(&con->sock_sem);
585 /*
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
589 */
590 return result;
591
592 out_err:
593 if (con->sock) {
594 sock_release(con->sock);
595 con->sock = NULL;
596 }
597 /*
598 * Some errors are fatal and this list might need adjusting. For other
599 * errors we try again until the max number of retries is reached.
600 */
601 if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
602 result != -ENETDOWN && result != EINVAL
603 && result != -EPROTONOSUPPORT) {
604 lowcomms_connect_sock(con);
605 result = 0;
606 }
607 goto out;
608}
609
610static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr)
611{
612 struct socket *sock = NULL;
613 mm_segment_t fs;
614 int result = 0;
615 int one = 1;
616 int addr_len;
617
618 if (dlm_local_addr.ss_family == AF_INET)
619 addr_len = sizeof(struct sockaddr_in);
620 else
621 addr_len = sizeof(struct sockaddr_in6);
622
623 /* Create a socket to communicate with */
624 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
625 if (result < 0) {
626 printk("dlm: Can't create listening comms socket\n");
627 goto create_out;
628 }
629
630 fs = get_fs();
631 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
633 set_fs(fs);
634 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result);
636 }
637 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock;
639 con->sock = sock;
640
641 /* Bind to our port */
642 make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
643 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
644 if (result < 0) {
645 printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
646 sock_release(sock);
647 sock = NULL;
648 con->sock = NULL;
649 goto create_out;
650 }
651
652 fs = get_fs();
653 set_fs(get_ds());
654
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one));
656 set_fs(fs);
657 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result);
659 }
660
661 result = sock->ops->listen(sock, 5);
662 if (result < 0) {
663 printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
664 sock_release(sock);
665 sock = NULL;
666 goto create_out;
667 }
668
669 create_out:
670 return sock;
671}
672
673
674/* Listen on all interfaces */
675static int listen_for_all(void)
676{
677 struct socket *sock = NULL;
678 struct connection *con = nodeid2con(0, GFP_KERNEL);
679 int result = -EINVAL;
680
681 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags);
687
688 sock = create_listen_sock(con, &dlm_local_addr);
689 if (sock) {
690 add_sock(sock, con);
691 result = 0;
692 }
693 else {
694 result = -EADDRINUSE;
695 }
696
697 return result;
698}
699
700
701
702static struct writequeue_entry *new_writequeue_entry(struct connection *con,
703 gfp_t allocation)
704{
705 struct writequeue_entry *entry;
706
707 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
708 if (!entry)
709 return NULL;
710
711 entry->page = alloc_page(allocation);
712 if (!entry->page) {
713 kfree(entry);
714 return NULL;
715 }
716
717 entry->offset = 0;
718 entry->len = 0;
719 entry->end = 0;
720 entry->users = 0;
721 entry->con = con;
722
723 return entry;
724}
725
726void *dlm_lowcomms_get_buffer(int nodeid, int len,
727 gfp_t allocation, char **ppc)
728{
729 struct connection *con;
730 struct writequeue_entry *e;
731 int offset = 0;
732 int users = 0;
733
734 if (!atomic_read(&accepting))
735 return NULL;
736
737 con = nodeid2con(nodeid, allocation);
738 if (!con)
739 return NULL;
740
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL;
746 } else {
747 offset = e->end;
748 e->end += len;
749 users = e->users++;
750 }
751 spin_unlock(&con->writequeue_lock);
752
753 if (e) {
754 got_one:
755 if (users == 0)
756 kmap(e->page);
757 *ppc = page_address(e->page) + offset;
758 return e;
759 }
760
761 e = new_writequeue_entry(con, allocation);
762 if (e) {
763 spin_lock(&con->writequeue_lock);
764 offset = e->end;
765 e->end += len;
766 users = e->users++;
767 list_add_tail(&e->list, &con->writequeue);
768 spin_unlock(&con->writequeue_lock);
769 goto got_one;
770 }
771 return NULL;
772}
773
774void dlm_lowcomms_commit_buffer(void *mh)
775{
776 struct writequeue_entry *e = (struct writequeue_entry *)mh;
777 struct connection *con = e->con;
778 int users;
779
780 if (!atomic_read(&accepting))
781 return;
782
783 spin_lock(&con->writequeue_lock);
784 users = --e->users;
785 if (users)
786 goto out;
787 e->len = e->end - e->offset;
788 kunmap(e->page);
789 spin_unlock(&con->writequeue_lock);
790
791 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
792 spin_lock_bh(&write_sockets_lock);
793 list_add_tail(&con->write_list, &write_sockets);
794 spin_unlock_bh(&write_sockets_lock);
795
796 wake_up_interruptible(&lowcomms_send_waitq);
797 }
798 return;
799
800 out:
801 spin_unlock(&con->writequeue_lock);
802 return;
803}
804
805static void free_entry(struct writequeue_entry *e)
806{
807 __free_page(e->page);
808 kfree(e);
809}
810
811/* Send a message */
812static int send_to_sock(struct connection *con)
813{
814 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
816 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
817 struct writequeue_entry *e;
818 int len, offset;
819
820 down_read(&con->sock_sem);
821 if (con->sock == NULL)
822 goto out_connect;
823
824 sendpage = con->sock->ops->sendpage;
825
826 spin_lock(&con->writequeue_lock);
827 for (;;) {
828 e = list_entry(con->writequeue.next, struct writequeue_entry,
829 list);
830 if ((struct list_head *) e == &con->writequeue)
831 break;
832
833 len = e->len;
834 offset = e->offset;
835 BUG_ON(len == 0 && e->users == 0);
836 spin_unlock(&con->writequeue_lock);
837
838 ret = 0;
839 if (len) {
840 ret = sendpage(con->sock, e->page, offset, len,
841 msg_flags);
842 if (ret == -EAGAIN || ret == 0)
843 goto out;
844 if (ret <= 0)
845 goto send_error;
846 }
847 else {
848 /* Don't starve people filling buffers */
849 schedule();
850 }
851
852 spin_lock(&con->writequeue_lock);
853 e->offset += ret;
854 e->len -= ret;
855
856 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list);
858 free_entry(e);
859 continue;
860 }
861 }
862 spin_unlock(&con->writequeue_lock);
863 out:
864 up_read(&con->sock_sem);
865 return ret;
866
867 send_error:
868 up_read(&con->sock_sem);
869 close_connection(con, FALSE);
870 lowcomms_connect_sock(con);
871 return ret;
872
873 out_connect:
874 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con);
876 return 0;
877}
878
879static void clean_one_writequeue(struct connection *con)
880{
881 struct list_head *list;
882 struct list_head *temp;
883
884 spin_lock(&con->writequeue_lock);
885 list_for_each_safe(list, temp, &con->writequeue) {
886 struct writequeue_entry *e =
887 list_entry(list, struct writequeue_entry, list);
888 list_del(&e->list);
889 free_entry(e);
890 }
891 spin_unlock(&con->writequeue_lock);
892}
893
894/* Called from recovery when it knows that a node has
895 left the cluster */
896int dlm_lowcomms_close(int nodeid)
897{
898 struct connection *con;
899
900 if (!connections)
901 goto out;
902
903 log_print("closing connection to node %d", nodeid);
904 con = nodeid2con(nodeid, 0);
905 if (con) {
906 clean_one_writequeue(con);
907 close_connection(con, TRUE);
908 atomic_set(&con->waiting_requests, 0);
909 }
910 return 0;
911
912 out:
913 return -1;
914}
915
916/* API send message call, may queue the request */
917/* N.B. This is the old interface - use the new one for new calls */
918int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
919{
920 struct writequeue_entry *e;
921 char *b;
922
923 e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
924 if (e) {
925 memcpy(b, buf, len);
926 dlm_lowcomms_commit_buffer(e);
927 return 0;
928 }
929 return -ENOBUFS;
930}
931
932/* Look for activity on active sockets */
933static void process_sockets(void)
934{
935 struct list_head *list;
936 struct list_head *temp;
937 int count = 0;
938
939 spin_lock_bh(&read_sockets_lock);
940 list_for_each_safe(list, temp, &read_sockets) {
941
942 struct connection *con =
943 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags);
946
947 spin_unlock_bh(&read_sockets_lock);
948
949 /* This can reach zero if we are processing requests
950 * as they come in.
951 */
952 if (atomic_read(&con->waiting_requests) == 0) {
953 spin_lock_bh(&read_sockets_lock);
954 continue;
955 }
956
957 do {
958 con->rx_action(con);
959
960 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule();
963 count = 0;
964 }
965
966 } while (!atomic_dec_and_test(&con->waiting_requests) &&
967 !kthread_should_stop());
968
969 spin_lock_bh(&read_sockets_lock);
970 }
971 spin_unlock_bh(&read_sockets_lock);
972}
973
974/* Try to send any messages that are pending
975 */
976static void process_output_queue(void)
977{
978 struct list_head *list;
979 struct list_head *temp;
980 int ret;
981
982 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con =
985 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list);
988
989 spin_unlock_bh(&write_sockets_lock);
990
991 ret = send_to_sock(con);
992 if (ret < 0) {
993 }
994 spin_lock_bh(&write_sockets_lock);
995 }
996 spin_unlock_bh(&write_sockets_lock);
997}
998
999static void process_state_queue(void)
1000{
1001 struct list_head *list;
1002 struct list_head *temp;
1003 int ret;
1004
1005 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con =
1008 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock);
1012
1013 ret = connect_to_sock(con);
1014 if (ret < 0) {
1015 }
1016 spin_lock_bh(&state_sockets_lock);
1017 }
1018 spin_unlock_bh(&state_sockets_lock);
1019}
1020
1021
1022/* Discard all entries on the write queues */
1023static void clean_writequeues(void)
1024{
1025 int nodeid;
1026
1027 for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
1028 struct connection *con = nodeid2con(nodeid, 0);
1029
1030 if (con)
1031 clean_one_writequeue(con);
1032 }
1033}
1034
1035static int read_list_empty(void)
1036{
1037 int status;
1038
1039 spin_lock_bh(&read_sockets_lock);
1040 status = list_empty(&read_sockets);
1041 spin_unlock_bh(&read_sockets_lock);
1042
1043 return status;
1044}
1045
1046/* DLM Transport comms receive daemon */
1047static int dlm_recvd(void *data)
1048{
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1052
1053 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty())
1056 schedule();
1057 set_current_state(TASK_RUNNING);
1058
1059 process_sockets();
1060 }
1061
1062 return 0;
1063}
1064
1065static int write_and_state_lists_empty(void)
1066{
1067 int status;
1068
1069 spin_lock_bh(&write_sockets_lock);
1070 status = list_empty(&write_sockets);
1071 spin_unlock_bh(&write_sockets_lock);
1072
1073 spin_lock_bh(&state_sockets_lock);
1074 if (list_empty(&state_sockets) == 0)
1075 status = 0;
1076 spin_unlock_bh(&state_sockets_lock);
1077
1078 return status;
1079}
1080
1081/* DLM Transport send daemon */
1082static int dlm_sendd(void *data)
1083{
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1087
1088 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty())
1091 schedule();
1092 set_current_state(TASK_RUNNING);
1093
1094 process_state_queue();
1095 process_output_queue();
1096 }
1097
1098 return 0;
1099}
1100
1101static void daemons_stop(void)
1102{
1103 kthread_stop(recv_task);
1104 kthread_stop(send_task);
1105}
1106
1107static int daemons_start(void)
1108{
1109 struct task_struct *p;
1110 int error;
1111
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p);
1114 if (error) {
1115 log_print("can't start dlm_recvd %d", error);
1116 return error;
1117 }
1118 recv_task = p;
1119
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p);
1122 if (error) {
1123 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task);
1125 return error;
1126 }
1127 send_task = p;
1128
1129 return 0;
1130}
1131
1132/*
1133 * Return the largest buffer size we can cope with.
1134 */
1135int lowcomms_max_buffer_size(void)
1136{
1137 return PAGE_CACHE_SIZE;
1138}
1139
1140void dlm_lowcomms_stop(void)
1141{
1142 int i;
1143
1144 atomic_set(&accepting, 0);
1145
1146 /* Set all the activity flags to prevent any
1147 socket activity.
1148 */
1149 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i])
1151 connections[i]->flags |= 0x7;
1152 }
1153 daemons_stop();
1154 clean_writequeues();
1155
1156 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) {
1158 close_connection(connections[i], TRUE);
1159 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]);
1162 }
1163 }
1164
1165 kfree(connections);
1166 connections = NULL;
1167
1168 kmem_cache_destroy(con_cache);
1169}
1170
1171/* This is quite likely to sleep... */
1172int dlm_lowcomms_start(void)
1173{
1174 int error = 0;
1175
1176 error = -ENOTCONN;
1177
1178 /*
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1182 */
1183 init_waitqueue_head(&lowcomms_send_waitq);
1184
1185 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections)
1189 goto out;
1190
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1193
1194 conn_array_size = NODE_INCREMENT;
1195
1196 if (dlm_our_addr(&dlm_local_addr, 0)) {
1197 log_print("no local IP address has been set");
1198 goto fail_free_conn;
1199 }
1200 if (!dlm_our_addr(&dlm_local_addr, 1)) {
1201 log_print("This dlm comms module does not support multi-homed clustering");
1202 goto fail_free_conn;
1203 }
1204
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL);
1207 if (!con_cache)
1208 goto fail_free_conn;
1209
1210
1211 /* Start listening */
1212 error = listen_for_all();
1213 if (error)
1214 goto fail_unlisten;
1215
1216 error = daemons_start();
1217 if (error)
1218 goto fail_unlisten;
1219
1220 atomic_set(&accepting, 1);
1221
1222 return 0;
1223
1224 fail_unlisten:
1225 close_connection(connections[0], 0);
1226 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache);
1228
1229 fail_free_conn:
1230 kfree(connections);
1231
1232 out:
1233 return error;
1234}
1235
1236int dlm_lowcomms_init(void)
1237{
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1241
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1246
1247 return 0;
1248}
1249
1250void dlm_lowcomms_exit(void)
1251{
1252}
1253
1254/*
1255 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically
1257 * adjust the settings for this buffer only. This must remain at the end
1258 * of the file.
1259 * ---------------------------------------------------------------------------
1260 * Local variables:
1261 * c-file-style: "linux"
1262 * End:
1263 */