aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r--fs/dlm/lowcomms-tcp.c1263
1 files changed, 1263 insertions, 0 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
new file mode 100644
index 000000000000..7289e59b4bd3
--- /dev/null
+++ b/fs/dlm/lowcomms-tcp.c
@@ -0,0 +1,1263 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never waits.
42 *
43 */
44
45
46#include <asm/ioctls.h>
47#include <net/sock.h>
48#include <net/tcp.h>
49#include <linux/pagemap.h>
50
51#include "dlm_internal.h"
52#include "lowcomms.h"
53#include "midcomms.h"
54#include "config.h"
55
56struct cbuf {
57 unsigned base;
58 unsigned len;
59 unsigned mask;
60};
61
62#ifndef FALSE
63#define FALSE 0
64#define TRUE 1
65#endif
66#define NODE_INCREMENT 32
67
68#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0)
69#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
70#define CBUF_EMPTY(cb) ((cb)->len == 0)
71#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
72#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0)
74#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
75
76/* Maximum number of incoming messages to process before
77 doing a schedule()
78*/
79#define MAX_RX_MSG_COUNT 25
80
81struct connection {
82 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89#define CF_READ_PENDING 1
90#define CF_WRITE_PENDING 2
91#define CF_CONNECT_PENDING 3
92#define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page;
98 struct cbuf cb;
99 int retries;
100 atomic_t waiting_requests;
101#define MAX_CONNECT_RETRIES 3
102 struct connection *othercon;
103};
104#define sock2con(x) ((struct connection *)(x)->sk_user_data)
105
106/* An entry waiting to be sent */
107struct writequeue_entry {
108 struct list_head list;
109 struct page *page;
110 int offset;
111 int len;
112 int end;
113 int users;
114 struct connection *con;
115};
116
117static struct sockaddr_storage dlm_local_addr;
118
119/* Manage daemons */
120static struct task_struct *recv_task;
121static struct task_struct *send_task;
122
123static wait_queue_t lowcomms_send_waitq_head;
124static wait_queue_head_t lowcomms_send_waitq;
125static wait_queue_t lowcomms_recv_waitq_head;
126static wait_queue_head_t lowcomms_recv_waitq;
127
128/* An array of pointers to connections, indexed by NODEID */
129static struct connection **connections;
130static struct semaphore connections_lock;
131static kmem_cache_t *con_cache;
132static int conn_array_size;
133static atomic_t accepting;
134
135/* List of sockets that have reads pending */
136static struct list_head read_sockets;
137static spinlock_t read_sockets_lock;
138
139/* List of sockets which have writes pending */
140static struct list_head write_sockets;
141static spinlock_t write_sockets_lock;
142
143/* List of sockets which have connects pending */
144static struct list_head state_sockets;
145static spinlock_t state_sockets_lock;
146
147static struct connection *nodeid2con(int nodeid, gfp_t allocation)
148{
149 struct connection *con = NULL;
150
151 down(&connections_lock);
152 if (nodeid >= conn_array_size) {
153 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns;
155
156 new_conns = kmalloc(sizeof(struct connection *) *
157 new_size, allocation);
158 if (!new_conns)
159 goto finish;
160
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size;
164 kfree(connections);
165 connections = new_conns;
166
167 }
168
169 con = connections[nodeid];
170 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation);
172 if (!con)
173 goto finish;
174
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue);
179 spin_lock_init(&con->writequeue_lock);
180
181 connections[nodeid] = con;
182 }
183
184 finish:
185 up(&connections_lock);
186 return con;
187}
188
189/* Data available on socket or listen socket received a connect */
190static void lowcomms_data_ready(struct sock *sk, int count_unused)
191{
192 struct connection *con = sock2con(sk);
193
194 atomic_inc(&con->waiting_requests);
195 if (test_and_set_bit(CF_READ_PENDING, &con->flags))
196 return;
197
198 spin_lock_bh(&read_sockets_lock);
199 list_add_tail(&con->read_list, &read_sockets);
200 spin_unlock_bh(&read_sockets_lock);
201
202 wake_up_interruptible(&lowcomms_recv_waitq);
203}
204
205static void lowcomms_write_space(struct sock *sk)
206{
207 struct connection *con = sock2con(sk);
208
209 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
210 return;
211
212 spin_lock_bh(&write_sockets_lock);
213 list_add_tail(&con->write_list, &write_sockets);
214 spin_unlock_bh(&write_sockets_lock);
215
216 wake_up_interruptible(&lowcomms_send_waitq);
217}
218
219static inline void lowcomms_connect_sock(struct connection *con)
220{
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return;
223 if (!atomic_read(&accepting))
224 return;
225
226 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets);
228 spin_unlock_bh(&state_sockets_lock);
229
230 wake_up_interruptible(&lowcomms_send_waitq);
231}
232
233static void lowcomms_state_change(struct sock *sk)
234{
235/* struct connection *con = sock2con(sk); */
236
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk);
240 break;
241
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
252 */
253 /* lowcomms_connect_sock(con); */
254 break;
255
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
259 }
260}
261
262/* Make a socket active */
263static int add_sock(struct socket *sock, struct connection *con)
264{
265 con->sock = sock;
266
267 /* Install a data_ready callback */
268 con->sock->sk->sk_data_ready = lowcomms_data_ready;
269 con->sock->sk->sk_write_space = lowcomms_write_space;
270 con->sock->sk->sk_state_change = lowcomms_state_change;
271
272 return 0;
273}
274
275/* Add the port number to an IP6 or 4 sockaddr and return the address
276 length */
277static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len)
279{
280 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in);
285 }
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6);
290 }
291}
292
293/* Close a remote connection and tidy up */
294static void close_connection(struct connection *con, int and_other)
295{
296 down_write(&con->sock_sem);
297
298 if (con->sock) {
299 sock_release(con->sock);
300 con->sock = NULL;
301 }
302 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code!
304 Actually, this isn't a list so it
305 will only re-enter once.
306 */
307 close_connection(con->othercon, FALSE);
308 }
309 if (con->rx_page) {
310 __free_page(con->rx_page);
311 con->rx_page = NULL;
312 }
313 con->retries = 0;
314 up_write(&con->sock_sem);
315}
316
317/* Data received from remote end */
318static int receive_from_sock(struct connection *con)
319{
320 int ret = 0;
321 struct msghdr msg;
322 struct iovec iov[2];
323 mm_segment_t fs;
324 unsigned len;
325 int r;
326 int call_again_soon = 0;
327
328 down_read(&con->sock_sem);
329
330 if (con->sock == NULL)
331 goto out;
332 if (con->rx_page == NULL) {
333 /*
334 * This doesn't need to be atomic, but I think it should
335 * improve performance if it is.
336 */
337 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL)
339 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE);
341 }
342
343 msg.msg_control = NULL;
344 msg.msg_controllen = 0;
345 msg.msg_iovlen = 1;
346 msg.msg_iov = iov;
347 msg.msg_name = NULL;
348 msg.msg_namelen = 0;
349 msg.msg_flags = 0;
350
351 /*
352 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer.
354 */
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb);
357 iov[1].iov_len = 0;
358
359 /*
360 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base)
362 */
363 if (CBUF_DATA(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb);
365 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2;
368 }
369 len = iov[0].iov_len + iov[1].iov_len;
370
371 fs = get_fs();
372 set_fs(get_ds());
373 r = ret = sock_recvmsg(con->sock, &msg, len,
374 MSG_DONTWAIT | MSG_NOSIGNAL);
375 set_fs(fs);
376
377 if (ret <= 0)
378 goto out_close;
379 if (ret == len)
380 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page),
384 con->cb.base, con->cb.len,
385 PAGE_CACHE_SIZE);
386 if (ret == -EBADMSG) {
387 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
388 "iov_len=%u, iov_base[0]=%p, read=%d\n",
389 page_address(con->rx_page), con->cb.base, con->cb.len,
390 len, iov[0].iov_base, r);
391 }
392 if (ret < 0)
393 goto out_close;
394 CBUF_EAT(&con->cb, ret);
395
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page);
398 con->rx_page = NULL;
399 }
400
401 out:
402 if (call_again_soon)
403 goto out_resched;
404 up_read(&con->sock_sem);
405 ret = 0;
406 goto out_ret;
407
408 out_resched:
409 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem);
411 ret = 0;
412 schedule();
413 goto out_ret;
414
415 out_close:
416 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE);
419 /* Reconnect when there is something to send */
420 }
421
422 out_ret:
423 return ret;
424}
425
426/* Listening socket is busy, accept a connection */
427static int accept_from_sock(struct connection *con)
428{
429 int result;
430 struct sockaddr_storage peeraddr;
431 struct socket *newsock;
432 int len;
433 int nodeid;
434 struct connection *newcon;
435
436 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock);
438 if (result < 0)
439 return -ENOMEM;
440
441 down_read(&con->sock_sem);
442
443 result = -ENOTCONN;
444 if (con->sock == NULL)
445 goto accept_err;
446
447 newsock->type = con->sock->type;
448 newsock->ops = con->sock->ops;
449
450 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
451 if (result < 0)
452 goto accept_err;
453
454 /* Get the connected socket's peer */
455 memset(&peeraddr, 0, sizeof(peeraddr));
456 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
457 &len, 2)) {
458 result = -ECONNABORTED;
459 goto accept_err;
460 }
461
462 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock);
467 up_read(&con->sock_sem);
468 return -1;
469 }
470
471 log_print("got connection from %d", nodeid);
472
473 /* Check to see if we already have a connection to this node. This
474 * could happen if the two nodes initiate a connection at roughly
475 * the same time and the connections cross on the wire.
476 * TEMPORARY FIX:
477 * In this case we store the incoming one in "othercon"
478 */
479 newcon = nodeid2con(nodeid, GFP_KERNEL);
480 if (!newcon) {
481 result = -ENOMEM;
482 goto accept_err;
483 }
484 down_write(&newcon->sock_sem);
485 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon;
487
488 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL);
490 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem);
493 result = -ENOMEM;
494 goto accept_err;
495 }
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem);
500 set_bit(CF_IS_OTHERCON, &othercon->flags);
501 newcon->othercon = othercon;
502 }
503 othercon->sock = newsock;
504 newsock->sk->sk_user_data = othercon;
505 add_sock(newsock, othercon);
506 }
507 else {
508 newsock->sk->sk_user_data = newcon;
509 newcon->rx_action = receive_from_sock;
510 add_sock(newsock, newcon);
511
512 }
513
514 up_write(&newcon->sock_sem);
515
516 /*
517 * Add it to the active queue in case we got data
518 * beween processing the accept adding the socket
519 * to the read_sockets list
520 */
521 lowcomms_data_ready(newsock->sk, 0);
522 up_read(&con->sock_sem);
523
524 return 0;
525
526 accept_err:
527 up_read(&con->sock_sem);
528 sock_release(newsock);
529
530 if (result != -EAGAIN)
531 printk("dlm: error accepting connection from node: %d\n", result);
532 return result;
533}
534
535/* Connect a new socket to its peer */
536static int connect_to_sock(struct connection *con)
537{
538 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr;
540 int addr_len;
541 struct socket *sock;
542
543 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled");
545 return 0;
546 }
547
548 down_write(&con->sock_sem);
549 if (con->retries++ > MAX_CONNECT_RETRIES)
550 goto out;
551
552 /* Some odd races can cause double-connects, ignore them */
553 if (con->sock) {
554 result = 0;
555 goto out;
556 }
557
558 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
560 if (result < 0)
561 goto out_err;
562
563 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err;
566
567 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock;
569
570 make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
571
572 add_sock(sock, con);
573
574 log_print("connecting to %d", con->nodeid);
575 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK);
578 if (result == -EINPROGRESS)
579 result = 0;
580 if (result != 0)
581 goto out_err;
582
583 out:
584 up_write(&con->sock_sem);
585 /*
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
589 */
590 return result;
591
592 out_err:
593 if (con->sock) {
594 sock_release(con->sock);
595 con->sock = NULL;
596 }
597 /*
598 * Some errors are fatal and this list might need adjusting. For other
599 * errors we try again until the max number of retries is reached.
600 */
601 if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
602 result != -ENETDOWN && result != EINVAL
603 && result != -EPROTONOSUPPORT) {
604 lowcomms_connect_sock(con);
605 result = 0;
606 }
607 goto out;
608}
609
610static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr)
611{
612 struct socket *sock = NULL;
613 mm_segment_t fs;
614 int result = 0;
615 int one = 1;
616 int addr_len;
617
618 if (dlm_local_addr.ss_family == AF_INET)
619 addr_len = sizeof(struct sockaddr_in);
620 else
621 addr_len = sizeof(struct sockaddr_in6);
622
623 /* Create a socket to communicate with */
624 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
625 if (result < 0) {
626 printk("dlm: Can't create listening comms socket\n");
627 goto create_out;
628 }
629
630 fs = get_fs();
631 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
633 set_fs(fs);
634 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result);
636 }
637 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock;
639 con->sock = sock;
640
641 /* Bind to our port */
642 make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
643 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
644 if (result < 0) {
645 printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
646 sock_release(sock);
647 sock = NULL;
648 con->sock = NULL;
649 goto create_out;
650 }
651
652 fs = get_fs();
653 set_fs(get_ds());
654
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one));
656 set_fs(fs);
657 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result);
659 }
660
661 result = sock->ops->listen(sock, 5);
662 if (result < 0) {
663 printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
664 sock_release(sock);
665 sock = NULL;
666 goto create_out;
667 }
668
669 create_out:
670 return sock;
671}
672
673
674/* Listen on all interfaces */
675static int listen_for_all(void)
676{
677 struct socket *sock = NULL;
678 struct connection *con = nodeid2con(0, GFP_KERNEL);
679 int result = -EINVAL;
680
681 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags);
687
688 sock = create_listen_sock(con, &dlm_local_addr);
689 if (sock) {
690 add_sock(sock, con);
691 result = 0;
692 }
693 else {
694 result = -EADDRINUSE;
695 }
696
697 return result;
698}
699
700
701
702static struct writequeue_entry *new_writequeue_entry(struct connection *con,
703 gfp_t allocation)
704{
705 struct writequeue_entry *entry;
706
707 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
708 if (!entry)
709 return NULL;
710
711 entry->page = alloc_page(allocation);
712 if (!entry->page) {
713 kfree(entry);
714 return NULL;
715 }
716
717 entry->offset = 0;
718 entry->len = 0;
719 entry->end = 0;
720 entry->users = 0;
721 entry->con = con;
722
723 return entry;
724}
725
726void *dlm_lowcomms_get_buffer(int nodeid, int len,
727 gfp_t allocation, char **ppc)
728{
729 struct connection *con;
730 struct writequeue_entry *e;
731 int offset = 0;
732 int users = 0;
733
734 if (!atomic_read(&accepting))
735 return NULL;
736
737 con = nodeid2con(nodeid, allocation);
738 if (!con)
739 return NULL;
740
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL;
746 } else {
747 offset = e->end;
748 e->end += len;
749 users = e->users++;
750 }
751 spin_unlock(&con->writequeue_lock);
752
753 if (e) {
754 got_one:
755 if (users == 0)
756 kmap(e->page);
757 *ppc = page_address(e->page) + offset;
758 return e;
759 }
760
761 e = new_writequeue_entry(con, allocation);
762 if (e) {
763 spin_lock(&con->writequeue_lock);
764 offset = e->end;
765 e->end += len;
766 users = e->users++;
767 list_add_tail(&e->list, &con->writequeue);
768 spin_unlock(&con->writequeue_lock);
769 goto got_one;
770 }
771 return NULL;
772}
773
774void dlm_lowcomms_commit_buffer(void *mh)
775{
776 struct writequeue_entry *e = (struct writequeue_entry *)mh;
777 struct connection *con = e->con;
778 int users;
779
780 if (!atomic_read(&accepting))
781 return;
782
783 spin_lock(&con->writequeue_lock);
784 users = --e->users;
785 if (users)
786 goto out;
787 e->len = e->end - e->offset;
788 kunmap(e->page);
789 spin_unlock(&con->writequeue_lock);
790
791 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
792 spin_lock_bh(&write_sockets_lock);
793 list_add_tail(&con->write_list, &write_sockets);
794 spin_unlock_bh(&write_sockets_lock);
795
796 wake_up_interruptible(&lowcomms_send_waitq);
797 }
798 return;
799
800 out:
801 spin_unlock(&con->writequeue_lock);
802 return;
803}
804
805static void free_entry(struct writequeue_entry *e)
806{
807 __free_page(e->page);
808 kfree(e);
809}
810
811/* Send a message */
812static int send_to_sock(struct connection *con)
813{
814 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
816 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
817 struct writequeue_entry *e;
818 int len, offset;
819
820 down_read(&con->sock_sem);
821 if (con->sock == NULL)
822 goto out_connect;
823
824 sendpage = con->sock->ops->sendpage;
825
826 spin_lock(&con->writequeue_lock);
827 for (;;) {
828 e = list_entry(con->writequeue.next, struct writequeue_entry,
829 list);
830 if ((struct list_head *) e == &con->writequeue)
831 break;
832
833 len = e->len;
834 offset = e->offset;
835 BUG_ON(len == 0 && e->users == 0);
836 spin_unlock(&con->writequeue_lock);
837
838 ret = 0;
839 if (len) {
840 ret = sendpage(con->sock, e->page, offset, len,
841 msg_flags);
842 if (ret == -EAGAIN || ret == 0)
843 goto out;
844 if (ret <= 0)
845 goto send_error;
846 }
847 else {
848 /* Don't starve people filling buffers */
849 schedule();
850 }
851
852 spin_lock(&con->writequeue_lock);
853 e->offset += ret;
854 e->len -= ret;
855
856 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list);
858 free_entry(e);
859 continue;
860 }
861 }
862 spin_unlock(&con->writequeue_lock);
863 out:
864 up_read(&con->sock_sem);
865 return ret;
866
867 send_error:
868 up_read(&con->sock_sem);
869 close_connection(con, FALSE);
870 lowcomms_connect_sock(con);
871 return ret;
872
873 out_connect:
874 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con);
876 return 0;
877}
878
879static void clean_one_writequeue(struct connection *con)
880{
881 struct list_head *list;
882 struct list_head *temp;
883
884 spin_lock(&con->writequeue_lock);
885 list_for_each_safe(list, temp, &con->writequeue) {
886 struct writequeue_entry *e =
887 list_entry(list, struct writequeue_entry, list);
888 list_del(&e->list);
889 free_entry(e);
890 }
891 spin_unlock(&con->writequeue_lock);
892}
893
894/* Called from recovery when it knows that a node has
895 left the cluster */
896int dlm_lowcomms_close(int nodeid)
897{
898 struct connection *con;
899
900 if (!connections)
901 goto out;
902
903 log_print("closing connection to node %d", nodeid);
904 con = nodeid2con(nodeid, 0);
905 if (con) {
906 clean_one_writequeue(con);
907 close_connection(con, TRUE);
908 atomic_set(&con->waiting_requests, 0);
909 }
910 return 0;
911
912 out:
913 return -1;
914}
915
916/* API send message call, may queue the request */
917/* N.B. This is the old interface - use the new one for new calls */
918int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
919{
920 struct writequeue_entry *e;
921 char *b;
922
923 e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
924 if (e) {
925 memcpy(b, buf, len);
926 dlm_lowcomms_commit_buffer(e);
927 return 0;
928 }
929 return -ENOBUFS;
930}
931
932/* Look for activity on active sockets */
933static void process_sockets(void)
934{
935 struct list_head *list;
936 struct list_head *temp;
937 int count = 0;
938
939 spin_lock_bh(&read_sockets_lock);
940 list_for_each_safe(list, temp, &read_sockets) {
941
942 struct connection *con =
943 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags);
946
947 spin_unlock_bh(&read_sockets_lock);
948
949 /* This can reach zero if we are processing requests
950 * as they come in.
951 */
952 if (atomic_read(&con->waiting_requests) == 0) {
953 spin_lock_bh(&read_sockets_lock);
954 continue;
955 }
956
957 do {
958 con->rx_action(con);
959
960 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule();
963 count = 0;
964 }
965
966 } while (!atomic_dec_and_test(&con->waiting_requests) &&
967 !kthread_should_stop());
968
969 spin_lock_bh(&read_sockets_lock);
970 }
971 spin_unlock_bh(&read_sockets_lock);
972}
973
974/* Try to send any messages that are pending
975 */
976static void process_output_queue(void)
977{
978 struct list_head *list;
979 struct list_head *temp;
980 int ret;
981
982 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con =
985 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list);
988
989 spin_unlock_bh(&write_sockets_lock);
990
991 ret = send_to_sock(con);
992 if (ret < 0) {
993 }
994 spin_lock_bh(&write_sockets_lock);
995 }
996 spin_unlock_bh(&write_sockets_lock);
997}
998
999static void process_state_queue(void)
1000{
1001 struct list_head *list;
1002 struct list_head *temp;
1003 int ret;
1004
1005 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con =
1008 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock);
1012
1013 ret = connect_to_sock(con);
1014 if (ret < 0) {
1015 }
1016 spin_lock_bh(&state_sockets_lock);
1017 }
1018 spin_unlock_bh(&state_sockets_lock);
1019}
1020
1021
1022/* Discard all entries on the write queues */
1023static void clean_writequeues(void)
1024{
1025 int nodeid;
1026
1027 for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
1028 struct connection *con = nodeid2con(nodeid, 0);
1029
1030 if (con)
1031 clean_one_writequeue(con);
1032 }
1033}
1034
1035static int read_list_empty(void)
1036{
1037 int status;
1038
1039 spin_lock_bh(&read_sockets_lock);
1040 status = list_empty(&read_sockets);
1041 spin_unlock_bh(&read_sockets_lock);
1042
1043 return status;
1044}
1045
1046/* DLM Transport comms receive daemon */
1047static int dlm_recvd(void *data)
1048{
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1052
1053 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty())
1056 schedule();
1057 set_current_state(TASK_RUNNING);
1058
1059 process_sockets();
1060 }
1061
1062 return 0;
1063}
1064
1065static int write_and_state_lists_empty(void)
1066{
1067 int status;
1068
1069 spin_lock_bh(&write_sockets_lock);
1070 status = list_empty(&write_sockets);
1071 spin_unlock_bh(&write_sockets_lock);
1072
1073 spin_lock_bh(&state_sockets_lock);
1074 if (list_empty(&state_sockets) == 0)
1075 status = 0;
1076 spin_unlock_bh(&state_sockets_lock);
1077
1078 return status;
1079}
1080
1081/* DLM Transport send daemon */
1082static int dlm_sendd(void *data)
1083{
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1087
1088 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty())
1091 schedule();
1092 set_current_state(TASK_RUNNING);
1093
1094 process_state_queue();
1095 process_output_queue();
1096 }
1097
1098 return 0;
1099}
1100
1101static void daemons_stop(void)
1102{
1103 kthread_stop(recv_task);
1104 kthread_stop(send_task);
1105}
1106
1107static int daemons_start(void)
1108{
1109 struct task_struct *p;
1110 int error;
1111
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p);
1114 if (error) {
1115 log_print("can't start dlm_recvd %d", error);
1116 return error;
1117 }
1118 recv_task = p;
1119
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p);
1122 if (error) {
1123 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task);
1125 return error;
1126 }
1127 send_task = p;
1128
1129 return 0;
1130}
1131
1132/*
1133 * Return the largest buffer size we can cope with.
1134 */
1135int lowcomms_max_buffer_size(void)
1136{
1137 return PAGE_CACHE_SIZE;
1138}
1139
1140void dlm_lowcomms_stop(void)
1141{
1142 int i;
1143
1144 atomic_set(&accepting, 0);
1145
1146 /* Set all the activity flags to prevent any
1147 socket activity.
1148 */
1149 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i])
1151 connections[i]->flags |= 0x7;
1152 }
1153 daemons_stop();
1154 clean_writequeues();
1155
1156 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) {
1158 close_connection(connections[i], TRUE);
1159 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]);
1162 }
1163 }
1164
1165 kfree(connections);
1166 connections = NULL;
1167
1168 kmem_cache_destroy(con_cache);
1169}
1170
1171/* This is quite likely to sleep... */
1172int dlm_lowcomms_start(void)
1173{
1174 int error = 0;
1175
1176 error = -ENOTCONN;
1177
1178 /*
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1182 */
1183 init_waitqueue_head(&lowcomms_send_waitq);
1184
1185 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections)
1189 goto out;
1190
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1193
1194 conn_array_size = NODE_INCREMENT;
1195
1196 if (dlm_our_addr(&dlm_local_addr, 0)) {
1197 log_print("no local IP address has been set");
1198 goto fail_free_conn;
1199 }
1200 if (!dlm_our_addr(&dlm_local_addr, 1)) {
1201 log_print("This dlm comms module does not support multi-homed clustering");
1202 goto fail_free_conn;
1203 }
1204
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL);
1207 if (!con_cache)
1208 goto fail_free_conn;
1209
1210
1211 /* Start listening */
1212 error = listen_for_all();
1213 if (error)
1214 goto fail_unlisten;
1215
1216 error = daemons_start();
1217 if (error)
1218 goto fail_unlisten;
1219
1220 atomic_set(&accepting, 1);
1221
1222 return 0;
1223
1224 fail_unlisten:
1225 close_connection(connections[0], 0);
1226 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache);
1228
1229 fail_free_conn:
1230 kfree(connections);
1231
1232 out:
1233 return error;
1234}
1235
1236int dlm_lowcomms_init(void)
1237{
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1241
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1246
1247 return 0;
1248}
1249
1250void dlm_lowcomms_exit(void)
1251{
1252}
1253
1254/*
1255 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically
1257 * adjust the settings for this buffer only. This must remain at the end
1258 * of the file.
1259 * ---------------------------------------------------------------------------
1260 * Local variables:
1261 * c-file-style: "linux"
1262 * End:
1263 */