diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/Kconfig | 17 | ||||
-rw-r--r-- | fs/dlm/Makefile | 4 | ||||
-rw-r--r-- | fs/dlm/lowcomms-sctp.c (renamed from fs/dlm/lowcomms.c) | 0 | ||||
-rw-r--r-- | fs/dlm/lowcomms-tcp.c | 1263 |
4 files changed, 1283 insertions, 1 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index 81b2c6465eeb..c5985b883b2c 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig | |||
@@ -9,6 +9,23 @@ config DLM | |||
9 | A general purpose distributed lock manager for kernel or userspace | 9 | A general purpose distributed lock manager for kernel or userspace |
10 | applications. | 10 | applications. |
11 | 11 | ||
12 | choice | ||
13 | prompt "Select DLM communications protocol" | ||
14 | depends on DLM | ||
15 | default DLM_TCP | ||
16 | help | ||
17 | The DLM Can use TCP or SCTP for it's network communications. | ||
18 | SCTP supports multi-homed operations whereas TCP doesn't. | ||
19 | However, SCTP seems to have stability problems at the moment. | ||
20 | |||
21 | config DLM_TCP | ||
22 | bool "TCP/IP" | ||
23 | |||
24 | config DLM_SCTP | ||
25 | bool "SCTP" | ||
26 | |||
27 | endchoice | ||
28 | |||
12 | config DLM_DEBUG | 29 | config DLM_DEBUG |
13 | bool "DLM debugging" | 30 | bool "DLM debugging" |
14 | depends on DLM | 31 | depends on DLM |
diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile index 1832e0297f7d..65388944eba0 100644 --- a/fs/dlm/Makefile +++ b/fs/dlm/Makefile | |||
@@ -4,7 +4,6 @@ dlm-y := ast.o \ | |||
4 | dir.o \ | 4 | dir.o \ |
5 | lock.o \ | 5 | lock.o \ |
6 | lockspace.o \ | 6 | lockspace.o \ |
7 | lowcomms.o \ | ||
8 | main.o \ | 7 | main.o \ |
9 | member.o \ | 8 | member.o \ |
10 | memory.o \ | 9 | memory.o \ |
@@ -17,3 +16,6 @@ dlm-y := ast.o \ | |||
17 | util.o | 16 | util.o |
18 | dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o | 17 | dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o |
19 | 18 | ||
19 | dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o | ||
20 | |||
21 | dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o \ No newline at end of file | ||
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms-sctp.c index 6da6b14d5a61..6da6b14d5a61 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms-sctp.c | |||
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c new file mode 100644 index 000000000000..7289e59b4bd3 --- /dev/null +++ b/fs/dlm/lowcomms-tcp.c | |||
@@ -0,0 +1,1263 @@ | |||
1 | /****************************************************************************** | ||
2 | ******************************************************************************* | ||
3 | ** | ||
4 | ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. | ||
5 | ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. | ||
6 | ** | ||
7 | ** This copyrighted material is made available to anyone wishing to use, | ||
8 | ** modify, copy, or redistribute it subject to the terms and conditions | ||
9 | ** of the GNU General Public License v.2. | ||
10 | ** | ||
11 | ******************************************************************************* | ||
12 | ******************************************************************************/ | ||
13 | |||
14 | /* | ||
15 | * lowcomms.c | ||
16 | * | ||
17 | * This is the "low-level" comms layer. | ||
18 | * | ||
19 | * It is responsible for sending/receiving messages | ||
20 | * from other nodes in the cluster. | ||
21 | * | ||
22 | * Cluster nodes are referred to by their nodeids. nodeids are | ||
23 | * simply 32 bit numbers to the locking module - if they need to | ||
24 | * be expanded for the cluster infrastructure then that is it's | ||
25 | * responsibility. It is this layer's | ||
26 | * responsibility to resolve these into IP address or | ||
27 | * whatever it needs for inter-node communication. | ||
28 | * | ||
29 | * The comms level is two kernel threads that deal mainly with | ||
30 | * the receiving of messages from other nodes and passing them | ||
31 | * up to the mid-level comms layer (which understands the | ||
32 | * message format) for execution by the locking core, and | ||
33 | * a send thread which does all the setting up of connections | ||
34 | * to remote nodes and the sending of data. Threads are not allowed | ||
35 | * to send their own data because it may cause them to wait in times | ||
36 | * of high load. Also, this way, the sending thread can collect together | ||
37 | * messages bound for one node and send them in one block. | ||
38 | * | ||
39 | * I don't see any problem with the recv thread executing the locking | ||
40 | * code on behalf of remote processes as the locking code is | ||
41 | * short, efficient and never waits. | ||
42 | * | ||
43 | */ | ||
44 | |||
45 | |||
46 | #include <asm/ioctls.h> | ||
47 | #include <net/sock.h> | ||
48 | #include <net/tcp.h> | ||
49 | #include <linux/pagemap.h> | ||
50 | |||
51 | #include "dlm_internal.h" | ||
52 | #include "lowcomms.h" | ||
53 | #include "midcomms.h" | ||
54 | #include "config.h" | ||
55 | |||
56 | struct cbuf { | ||
57 | unsigned base; | ||
58 | unsigned len; | ||
59 | unsigned mask; | ||
60 | }; | ||
61 | |||
62 | #ifndef FALSE | ||
63 | #define FALSE 0 | ||
64 | #define TRUE 1 | ||
65 | #endif | ||
66 | #define NODE_INCREMENT 32 | ||
67 | |||
68 | #define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) | ||
69 | #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) | ||
70 | #define CBUF_EMPTY(cb) ((cb)->len == 0) | ||
71 | #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) | ||
72 | #define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ | ||
73 | (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) | ||
74 | #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) | ||
75 | |||
76 | /* Maximum number of incoming messages to process before | ||
77 | doing a schedule() | ||
78 | */ | ||
79 | #define MAX_RX_MSG_COUNT 25 | ||
80 | |||
81 | struct connection { | ||
82 | struct socket *sock; /* NULL if not connected */ | ||
83 | uint32_t nodeid; /* So we know who we are in the list */ | ||
84 | struct rw_semaphore sock_sem; /* Stop connect races */ | ||
85 | struct list_head read_list; /* On this list when ready for reading */ | ||
86 | struct list_head write_list; /* On this list when ready for writing */ | ||
87 | struct list_head state_list; /* On this list when ready to connect */ | ||
88 | unsigned long flags; /* bit 1,2 = We are on the read/write lists */ | ||
89 | #define CF_READ_PENDING 1 | ||
90 | #define CF_WRITE_PENDING 2 | ||
91 | #define CF_CONNECT_PENDING 3 | ||
92 | #define CF_IS_OTHERCON 4 | ||
93 | struct list_head writequeue; /* List of outgoing writequeue_entries */ | ||
94 | struct list_head listenlist; /* List of allocated listening sockets */ | ||
95 | spinlock_t writequeue_lock; | ||
96 | int (*rx_action) (struct connection *); /* What to do when active */ | ||
97 | struct page *rx_page; | ||
98 | struct cbuf cb; | ||
99 | int retries; | ||
100 | atomic_t waiting_requests; | ||
101 | #define MAX_CONNECT_RETRIES 3 | ||
102 | struct connection *othercon; | ||
103 | }; | ||
104 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) | ||
105 | |||
106 | /* An entry waiting to be sent */ | ||
107 | struct writequeue_entry { | ||
108 | struct list_head list; | ||
109 | struct page *page; | ||
110 | int offset; | ||
111 | int len; | ||
112 | int end; | ||
113 | int users; | ||
114 | struct connection *con; | ||
115 | }; | ||
116 | |||
117 | static struct sockaddr_storage dlm_local_addr; | ||
118 | |||
119 | /* Manage daemons */ | ||
120 | static struct task_struct *recv_task; | ||
121 | static struct task_struct *send_task; | ||
122 | |||
123 | static wait_queue_t lowcomms_send_waitq_head; | ||
124 | static wait_queue_head_t lowcomms_send_waitq; | ||
125 | static wait_queue_t lowcomms_recv_waitq_head; | ||
126 | static wait_queue_head_t lowcomms_recv_waitq; | ||
127 | |||
128 | /* An array of pointers to connections, indexed by NODEID */ | ||
129 | static struct connection **connections; | ||
130 | static struct semaphore connections_lock; | ||
131 | static kmem_cache_t *con_cache; | ||
132 | static int conn_array_size; | ||
133 | static atomic_t accepting; | ||
134 | |||
135 | /* List of sockets that have reads pending */ | ||
136 | static struct list_head read_sockets; | ||
137 | static spinlock_t read_sockets_lock; | ||
138 | |||
139 | /* List of sockets which have writes pending */ | ||
140 | static struct list_head write_sockets; | ||
141 | static spinlock_t write_sockets_lock; | ||
142 | |||
143 | /* List of sockets which have connects pending */ | ||
144 | static struct list_head state_sockets; | ||
145 | static spinlock_t state_sockets_lock; | ||
146 | |||
147 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) | ||
148 | { | ||
149 | struct connection *con = NULL; | ||
150 | |||
151 | down(&connections_lock); | ||
152 | if (nodeid >= conn_array_size) { | ||
153 | int new_size = nodeid + NODE_INCREMENT; | ||
154 | struct connection **new_conns; | ||
155 | |||
156 | new_conns = kmalloc(sizeof(struct connection *) * | ||
157 | new_size, allocation); | ||
158 | if (!new_conns) | ||
159 | goto finish; | ||
160 | |||
161 | memset(new_conns, 0, sizeof(struct connection *) * new_size); | ||
162 | memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); | ||
163 | conn_array_size = new_size; | ||
164 | kfree(connections); | ||
165 | connections = new_conns; | ||
166 | |||
167 | } | ||
168 | |||
169 | con = connections[nodeid]; | ||
170 | if (con == NULL && allocation) { | ||
171 | con = kmem_cache_alloc(con_cache, allocation); | ||
172 | if (!con) | ||
173 | goto finish; | ||
174 | |||
175 | memset(con, 0, sizeof(*con)); | ||
176 | con->nodeid = nodeid; | ||
177 | init_rwsem(&con->sock_sem); | ||
178 | INIT_LIST_HEAD(&con->writequeue); | ||
179 | spin_lock_init(&con->writequeue_lock); | ||
180 | |||
181 | connections[nodeid] = con; | ||
182 | } | ||
183 | |||
184 | finish: | ||
185 | up(&connections_lock); | ||
186 | return con; | ||
187 | } | ||
188 | |||
189 | /* Data available on socket or listen socket received a connect */ | ||
190 | static void lowcomms_data_ready(struct sock *sk, int count_unused) | ||
191 | { | ||
192 | struct connection *con = sock2con(sk); | ||
193 | |||
194 | atomic_inc(&con->waiting_requests); | ||
195 | if (test_and_set_bit(CF_READ_PENDING, &con->flags)) | ||
196 | return; | ||
197 | |||
198 | spin_lock_bh(&read_sockets_lock); | ||
199 | list_add_tail(&con->read_list, &read_sockets); | ||
200 | spin_unlock_bh(&read_sockets_lock); | ||
201 | |||
202 | wake_up_interruptible(&lowcomms_recv_waitq); | ||
203 | } | ||
204 | |||
205 | static void lowcomms_write_space(struct sock *sk) | ||
206 | { | ||
207 | struct connection *con = sock2con(sk); | ||
208 | |||
209 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) | ||
210 | return; | ||
211 | |||
212 | spin_lock_bh(&write_sockets_lock); | ||
213 | list_add_tail(&con->write_list, &write_sockets); | ||
214 | spin_unlock_bh(&write_sockets_lock); | ||
215 | |||
216 | wake_up_interruptible(&lowcomms_send_waitq); | ||
217 | } | ||
218 | |||
219 | static inline void lowcomms_connect_sock(struct connection *con) | ||
220 | { | ||
221 | if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) | ||
222 | return; | ||
223 | if (!atomic_read(&accepting)) | ||
224 | return; | ||
225 | |||
226 | spin_lock_bh(&state_sockets_lock); | ||
227 | list_add_tail(&con->state_list, &state_sockets); | ||
228 | spin_unlock_bh(&state_sockets_lock); | ||
229 | |||
230 | wake_up_interruptible(&lowcomms_send_waitq); | ||
231 | } | ||
232 | |||
233 | static void lowcomms_state_change(struct sock *sk) | ||
234 | { | ||
235 | /* struct connection *con = sock2con(sk); */ | ||
236 | |||
237 | switch (sk->sk_state) { | ||
238 | case TCP_ESTABLISHED: | ||
239 | lowcomms_write_space(sk); | ||
240 | break; | ||
241 | |||
242 | case TCP_FIN_WAIT1: | ||
243 | case TCP_FIN_WAIT2: | ||
244 | case TCP_TIME_WAIT: | ||
245 | case TCP_CLOSE: | ||
246 | case TCP_CLOSE_WAIT: | ||
247 | case TCP_LAST_ACK: | ||
248 | case TCP_CLOSING: | ||
249 | /* FIXME: I think this causes more trouble than it solves. | ||
250 | lowcomms wil reconnect anyway when there is something to | ||
251 | send. This just attempts reconnection if a node goes down! | ||
252 | */ | ||
253 | /* lowcomms_connect_sock(con); */ | ||
254 | break; | ||
255 | |||
256 | default: | ||
257 | printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state); | ||
258 | break; | ||
259 | } | ||
260 | } | ||
261 | |||
262 | /* Make a socket active */ | ||
263 | static int add_sock(struct socket *sock, struct connection *con) | ||
264 | { | ||
265 | con->sock = sock; | ||
266 | |||
267 | /* Install a data_ready callback */ | ||
268 | con->sock->sk->sk_data_ready = lowcomms_data_ready; | ||
269 | con->sock->sk->sk_write_space = lowcomms_write_space; | ||
270 | con->sock->sk->sk_state_change = lowcomms_state_change; | ||
271 | |||
272 | return 0; | ||
273 | } | ||
274 | |||
275 | /* Add the port number to an IP6 or 4 sockaddr and return the address | ||
276 | length */ | ||
277 | static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, | ||
278 | int *addr_len) | ||
279 | { | ||
280 | saddr->ss_family = dlm_local_addr.ss_family; | ||
281 | if (saddr->ss_family == AF_INET) { | ||
282 | struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; | ||
283 | in4_addr->sin_port = cpu_to_be16(port); | ||
284 | *addr_len = sizeof(struct sockaddr_in); | ||
285 | } | ||
286 | else { | ||
287 | struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; | ||
288 | in6_addr->sin6_port = cpu_to_be16(port); | ||
289 | *addr_len = sizeof(struct sockaddr_in6); | ||
290 | } | ||
291 | } | ||
292 | |||
293 | /* Close a remote connection and tidy up */ | ||
294 | static void close_connection(struct connection *con, int and_other) | ||
295 | { | ||
296 | down_write(&con->sock_sem); | ||
297 | |||
298 | if (con->sock) { | ||
299 | sock_release(con->sock); | ||
300 | con->sock = NULL; | ||
301 | } | ||
302 | if (con->othercon && and_other) { | ||
303 | /* Argh! recursion in kernel code! | ||
304 | Actually, this isn't a list so it | ||
305 | will only re-enter once. | ||
306 | */ | ||
307 | close_connection(con->othercon, FALSE); | ||
308 | } | ||
309 | if (con->rx_page) { | ||
310 | __free_page(con->rx_page); | ||
311 | con->rx_page = NULL; | ||
312 | } | ||
313 | con->retries = 0; | ||
314 | up_write(&con->sock_sem); | ||
315 | } | ||
316 | |||
317 | /* Data received from remote end */ | ||
318 | static int receive_from_sock(struct connection *con) | ||
319 | { | ||
320 | int ret = 0; | ||
321 | struct msghdr msg; | ||
322 | struct iovec iov[2]; | ||
323 | mm_segment_t fs; | ||
324 | unsigned len; | ||
325 | int r; | ||
326 | int call_again_soon = 0; | ||
327 | |||
328 | down_read(&con->sock_sem); | ||
329 | |||
330 | if (con->sock == NULL) | ||
331 | goto out; | ||
332 | if (con->rx_page == NULL) { | ||
333 | /* | ||
334 | * This doesn't need to be atomic, but I think it should | ||
335 | * improve performance if it is. | ||
336 | */ | ||
337 | con->rx_page = alloc_page(GFP_ATOMIC); | ||
338 | if (con->rx_page == NULL) | ||
339 | goto out_resched; | ||
340 | CBUF_INIT(&con->cb, PAGE_CACHE_SIZE); | ||
341 | } | ||
342 | |||
343 | msg.msg_control = NULL; | ||
344 | msg.msg_controllen = 0; | ||
345 | msg.msg_iovlen = 1; | ||
346 | msg.msg_iov = iov; | ||
347 | msg.msg_name = NULL; | ||
348 | msg.msg_namelen = 0; | ||
349 | msg.msg_flags = 0; | ||
350 | |||
351 | /* | ||
352 | * iov[0] is the bit of the circular buffer between the current end | ||
353 | * point (cb.base + cb.len) and the end of the buffer. | ||
354 | */ | ||
355 | iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb); | ||
356 | iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb); | ||
357 | iov[1].iov_len = 0; | ||
358 | |||
359 | /* | ||
360 | * iov[1] is the bit of the circular buffer between the start of the | ||
361 | * buffer and the start of the currently used section (cb.base) | ||
362 | */ | ||
363 | if (CBUF_DATA(&con->cb) >= con->cb.base) { | ||
364 | iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb); | ||
365 | iov[1].iov_len = con->cb.base; | ||
366 | iov[1].iov_base = page_address(con->rx_page); | ||
367 | msg.msg_iovlen = 2; | ||
368 | } | ||
369 | len = iov[0].iov_len + iov[1].iov_len; | ||
370 | |||
371 | fs = get_fs(); | ||
372 | set_fs(get_ds()); | ||
373 | r = ret = sock_recvmsg(con->sock, &msg, len, | ||
374 | MSG_DONTWAIT | MSG_NOSIGNAL); | ||
375 | set_fs(fs); | ||
376 | |||
377 | if (ret <= 0) | ||
378 | goto out_close; | ||
379 | if (ret == len) | ||
380 | call_again_soon = 1; | ||
381 | CBUF_ADD(&con->cb, ret); | ||
382 | ret = dlm_process_incoming_buffer(con->nodeid, | ||
383 | page_address(con->rx_page), | ||
384 | con->cb.base, con->cb.len, | ||
385 | PAGE_CACHE_SIZE); | ||
386 | if (ret == -EBADMSG) { | ||
387 | printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " | ||
388 | "iov_len=%u, iov_base[0]=%p, read=%d\n", | ||
389 | page_address(con->rx_page), con->cb.base, con->cb.len, | ||
390 | len, iov[0].iov_base, r); | ||
391 | } | ||
392 | if (ret < 0) | ||
393 | goto out_close; | ||
394 | CBUF_EAT(&con->cb, ret); | ||
395 | |||
396 | if (CBUF_EMPTY(&con->cb) && !call_again_soon) { | ||
397 | __free_page(con->rx_page); | ||
398 | con->rx_page = NULL; | ||
399 | } | ||
400 | |||
401 | out: | ||
402 | if (call_again_soon) | ||
403 | goto out_resched; | ||
404 | up_read(&con->sock_sem); | ||
405 | ret = 0; | ||
406 | goto out_ret; | ||
407 | |||
408 | out_resched: | ||
409 | lowcomms_data_ready(con->sock->sk, 0); | ||
410 | up_read(&con->sock_sem); | ||
411 | ret = 0; | ||
412 | schedule(); | ||
413 | goto out_ret; | ||
414 | |||
415 | out_close: | ||
416 | up_read(&con->sock_sem); | ||
417 | if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { | ||
418 | close_connection(con, FALSE); | ||
419 | /* Reconnect when there is something to send */ | ||
420 | } | ||
421 | |||
422 | out_ret: | ||
423 | return ret; | ||
424 | } | ||
425 | |||
426 | /* Listening socket is busy, accept a connection */ | ||
427 | static int accept_from_sock(struct connection *con) | ||
428 | { | ||
429 | int result; | ||
430 | struct sockaddr_storage peeraddr; | ||
431 | struct socket *newsock; | ||
432 | int len; | ||
433 | int nodeid; | ||
434 | struct connection *newcon; | ||
435 | |||
436 | memset(&peeraddr, 0, sizeof(peeraddr)); | ||
437 | result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); | ||
438 | if (result < 0) | ||
439 | return -ENOMEM; | ||
440 | |||
441 | down_read(&con->sock_sem); | ||
442 | |||
443 | result = -ENOTCONN; | ||
444 | if (con->sock == NULL) | ||
445 | goto accept_err; | ||
446 | |||
447 | newsock->type = con->sock->type; | ||
448 | newsock->ops = con->sock->ops; | ||
449 | |||
450 | result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); | ||
451 | if (result < 0) | ||
452 | goto accept_err; | ||
453 | |||
454 | /* Get the connected socket's peer */ | ||
455 | memset(&peeraddr, 0, sizeof(peeraddr)); | ||
456 | if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, | ||
457 | &len, 2)) { | ||
458 | result = -ECONNABORTED; | ||
459 | goto accept_err; | ||
460 | } | ||
461 | |||
462 | /* Get the new node's NODEID */ | ||
463 | make_sockaddr(&peeraddr, 0, &len); | ||
464 | if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { | ||
465 | printk("dlm: connect from non cluster node\n"); | ||
466 | sock_release(newsock); | ||
467 | up_read(&con->sock_sem); | ||
468 | return -1; | ||
469 | } | ||
470 | |||
471 | log_print("got connection from %d", nodeid); | ||
472 | |||
473 | /* Check to see if we already have a connection to this node. This | ||
474 | * could happen if the two nodes initiate a connection at roughly | ||
475 | * the same time and the connections cross on the wire. | ||
476 | * TEMPORARY FIX: | ||
477 | * In this case we store the incoming one in "othercon" | ||
478 | */ | ||
479 | newcon = nodeid2con(nodeid, GFP_KERNEL); | ||
480 | if (!newcon) { | ||
481 | result = -ENOMEM; | ||
482 | goto accept_err; | ||
483 | } | ||
484 | down_write(&newcon->sock_sem); | ||
485 | if (newcon->sock) { | ||
486 | struct connection *othercon = newcon->othercon; | ||
487 | |||
488 | if (!othercon) { | ||
489 | othercon = kmem_cache_alloc(con_cache, GFP_KERNEL); | ||
490 | if (!othercon) { | ||
491 | printk("dlm: failed to allocate incoming socket\n"); | ||
492 | up_write(&newcon->sock_sem); | ||
493 | result = -ENOMEM; | ||
494 | goto accept_err; | ||
495 | } | ||
496 | memset(othercon, 0, sizeof(*othercon)); | ||
497 | othercon->nodeid = nodeid; | ||
498 | othercon->rx_action = receive_from_sock; | ||
499 | init_rwsem(&othercon->sock_sem); | ||
500 | set_bit(CF_IS_OTHERCON, &othercon->flags); | ||
501 | newcon->othercon = othercon; | ||
502 | } | ||
503 | othercon->sock = newsock; | ||
504 | newsock->sk->sk_user_data = othercon; | ||
505 | add_sock(newsock, othercon); | ||
506 | } | ||
507 | else { | ||
508 | newsock->sk->sk_user_data = newcon; | ||
509 | newcon->rx_action = receive_from_sock; | ||
510 | add_sock(newsock, newcon); | ||
511 | |||
512 | } | ||
513 | |||
514 | up_write(&newcon->sock_sem); | ||
515 | |||
516 | /* | ||
517 | * Add it to the active queue in case we got data | ||
518 | * beween processing the accept adding the socket | ||
519 | * to the read_sockets list | ||
520 | */ | ||
521 | lowcomms_data_ready(newsock->sk, 0); | ||
522 | up_read(&con->sock_sem); | ||
523 | |||
524 | return 0; | ||
525 | |||
526 | accept_err: | ||
527 | up_read(&con->sock_sem); | ||
528 | sock_release(newsock); | ||
529 | |||
530 | if (result != -EAGAIN) | ||
531 | printk("dlm: error accepting connection from node: %d\n", result); | ||
532 | return result; | ||
533 | } | ||
534 | |||
535 | /* Connect a new socket to its peer */ | ||
536 | static int connect_to_sock(struct connection *con) | ||
537 | { | ||
538 | int result = -EHOSTUNREACH; | ||
539 | struct sockaddr_storage saddr; | ||
540 | int addr_len; | ||
541 | struct socket *sock; | ||
542 | |||
543 | if (con->nodeid == 0) { | ||
544 | log_print("attempt to connect sock 0 foiled"); | ||
545 | return 0; | ||
546 | } | ||
547 | |||
548 | down_write(&con->sock_sem); | ||
549 | if (con->retries++ > MAX_CONNECT_RETRIES) | ||
550 | goto out; | ||
551 | |||
552 | /* Some odd races can cause double-connects, ignore them */ | ||
553 | if (con->sock) { | ||
554 | result = 0; | ||
555 | goto out; | ||
556 | } | ||
557 | |||
558 | /* Create a socket to communicate with */ | ||
559 | result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); | ||
560 | if (result < 0) | ||
561 | goto out_err; | ||
562 | |||
563 | memset(&saddr, 0, sizeof(saddr)); | ||
564 | if (dlm_nodeid_to_addr(con->nodeid, &saddr)) | ||
565 | goto out_err; | ||
566 | |||
567 | sock->sk->sk_user_data = con; | ||
568 | con->rx_action = receive_from_sock; | ||
569 | |||
570 | make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len); | ||
571 | |||
572 | add_sock(sock, con); | ||
573 | |||
574 | log_print("connecting to %d", con->nodeid); | ||
575 | result = | ||
576 | sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, | ||
577 | O_NONBLOCK); | ||
578 | if (result == -EINPROGRESS) | ||
579 | result = 0; | ||
580 | if (result != 0) | ||
581 | goto out_err; | ||
582 | |||
583 | out: | ||
584 | up_write(&con->sock_sem); | ||
585 | /* | ||
586 | * Returning an error here means we've given up trying to connect to | ||
587 | * a remote node, otherwise we return 0 and reschedule the connetion | ||
588 | * attempt | ||
589 | */ | ||
590 | return result; | ||
591 | |||
592 | out_err: | ||
593 | if (con->sock) { | ||
594 | sock_release(con->sock); | ||
595 | con->sock = NULL; | ||
596 | } | ||
597 | /* | ||
598 | * Some errors are fatal and this list might need adjusting. For other | ||
599 | * errors we try again until the max number of retries is reached. | ||
600 | */ | ||
601 | if (result != -EHOSTUNREACH && result != -ENETUNREACH && | ||
602 | result != -ENETDOWN && result != EINVAL | ||
603 | && result != -EPROTONOSUPPORT) { | ||
604 | lowcomms_connect_sock(con); | ||
605 | result = 0; | ||
606 | } | ||
607 | goto out; | ||
608 | } | ||
609 | |||
610 | static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) | ||
611 | { | ||
612 | struct socket *sock = NULL; | ||
613 | mm_segment_t fs; | ||
614 | int result = 0; | ||
615 | int one = 1; | ||
616 | int addr_len; | ||
617 | |||
618 | if (dlm_local_addr.ss_family == AF_INET) | ||
619 | addr_len = sizeof(struct sockaddr_in); | ||
620 | else | ||
621 | addr_len = sizeof(struct sockaddr_in6); | ||
622 | |||
623 | /* Create a socket to communicate with */ | ||
624 | result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); | ||
625 | if (result < 0) { | ||
626 | printk("dlm: Can't create listening comms socket\n"); | ||
627 | goto create_out; | ||
628 | } | ||
629 | |||
630 | fs = get_fs(); | ||
631 | set_fs(get_ds()); | ||
632 | result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); | ||
633 | set_fs(fs); | ||
634 | if (result < 0) { | ||
635 | printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); | ||
636 | } | ||
637 | sock->sk->sk_user_data = con; | ||
638 | con->rx_action = accept_from_sock; | ||
639 | con->sock = sock; | ||
640 | |||
641 | /* Bind to our port */ | ||
642 | make_sockaddr(saddr, dlm_config.tcp_port, &addr_len); | ||
643 | result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); | ||
644 | if (result < 0) { | ||
645 | printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port); | ||
646 | sock_release(sock); | ||
647 | sock = NULL; | ||
648 | con->sock = NULL; | ||
649 | goto create_out; | ||
650 | } | ||
651 | |||
652 | fs = get_fs(); | ||
653 | set_fs(get_ds()); | ||
654 | |||
655 | result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); | ||
656 | set_fs(fs); | ||
657 | if (result < 0) { | ||
658 | printk("dlm: Set keepalive failed: %d\n", result); | ||
659 | } | ||
660 | |||
661 | result = sock->ops->listen(sock, 5); | ||
662 | if (result < 0) { | ||
663 | printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port); | ||
664 | sock_release(sock); | ||
665 | sock = NULL; | ||
666 | goto create_out; | ||
667 | } | ||
668 | |||
669 | create_out: | ||
670 | return sock; | ||
671 | } | ||
672 | |||
673 | |||
674 | /* Listen on all interfaces */ | ||
675 | static int listen_for_all(void) | ||
676 | { | ||
677 | struct socket *sock = NULL; | ||
678 | struct connection *con = nodeid2con(0, GFP_KERNEL); | ||
679 | int result = -EINVAL; | ||
680 | |||
681 | /* We don't support multi-homed hosts */ | ||
682 | memset(con, 0, sizeof(*con)); | ||
683 | init_rwsem(&con->sock_sem); | ||
684 | spin_lock_init(&con->writequeue_lock); | ||
685 | INIT_LIST_HEAD(&con->writequeue); | ||
686 | set_bit(CF_IS_OTHERCON, &con->flags); | ||
687 | |||
688 | sock = create_listen_sock(con, &dlm_local_addr); | ||
689 | if (sock) { | ||
690 | add_sock(sock, con); | ||
691 | result = 0; | ||
692 | } | ||
693 | else { | ||
694 | result = -EADDRINUSE; | ||
695 | } | ||
696 | |||
697 | return result; | ||
698 | } | ||
699 | |||
700 | |||
701 | |||
702 | static struct writequeue_entry *new_writequeue_entry(struct connection *con, | ||
703 | gfp_t allocation) | ||
704 | { | ||
705 | struct writequeue_entry *entry; | ||
706 | |||
707 | entry = kmalloc(sizeof(struct writequeue_entry), allocation); | ||
708 | if (!entry) | ||
709 | return NULL; | ||
710 | |||
711 | entry->page = alloc_page(allocation); | ||
712 | if (!entry->page) { | ||
713 | kfree(entry); | ||
714 | return NULL; | ||
715 | } | ||
716 | |||
717 | entry->offset = 0; | ||
718 | entry->len = 0; | ||
719 | entry->end = 0; | ||
720 | entry->users = 0; | ||
721 | entry->con = con; | ||
722 | |||
723 | return entry; | ||
724 | } | ||
725 | |||
726 | void *dlm_lowcomms_get_buffer(int nodeid, int len, | ||
727 | gfp_t allocation, char **ppc) | ||
728 | { | ||
729 | struct connection *con; | ||
730 | struct writequeue_entry *e; | ||
731 | int offset = 0; | ||
732 | int users = 0; | ||
733 | |||
734 | if (!atomic_read(&accepting)) | ||
735 | return NULL; | ||
736 | |||
737 | con = nodeid2con(nodeid, allocation); | ||
738 | if (!con) | ||
739 | return NULL; | ||
740 | |||
741 | spin_lock(&con->writequeue_lock); | ||
742 | e = list_entry(con->writequeue.prev, struct writequeue_entry, list); | ||
743 | if (((struct list_head *) e == &con->writequeue) || | ||
744 | (PAGE_CACHE_SIZE - e->end < len)) { | ||
745 | e = NULL; | ||
746 | } else { | ||
747 | offset = e->end; | ||
748 | e->end += len; | ||
749 | users = e->users++; | ||
750 | } | ||
751 | spin_unlock(&con->writequeue_lock); | ||
752 | |||
753 | if (e) { | ||
754 | got_one: | ||
755 | if (users == 0) | ||
756 | kmap(e->page); | ||
757 | *ppc = page_address(e->page) + offset; | ||
758 | return e; | ||
759 | } | ||
760 | |||
761 | e = new_writequeue_entry(con, allocation); | ||
762 | if (e) { | ||
763 | spin_lock(&con->writequeue_lock); | ||
764 | offset = e->end; | ||
765 | e->end += len; | ||
766 | users = e->users++; | ||
767 | list_add_tail(&e->list, &con->writequeue); | ||
768 | spin_unlock(&con->writequeue_lock); | ||
769 | goto got_one; | ||
770 | } | ||
771 | return NULL; | ||
772 | } | ||
773 | |||
774 | void dlm_lowcomms_commit_buffer(void *mh) | ||
775 | { | ||
776 | struct writequeue_entry *e = (struct writequeue_entry *)mh; | ||
777 | struct connection *con = e->con; | ||
778 | int users; | ||
779 | |||
780 | if (!atomic_read(&accepting)) | ||
781 | return; | ||
782 | |||
783 | spin_lock(&con->writequeue_lock); | ||
784 | users = --e->users; | ||
785 | if (users) | ||
786 | goto out; | ||
787 | e->len = e->end - e->offset; | ||
788 | kunmap(e->page); | ||
789 | spin_unlock(&con->writequeue_lock); | ||
790 | |||
791 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { | ||
792 | spin_lock_bh(&write_sockets_lock); | ||
793 | list_add_tail(&con->write_list, &write_sockets); | ||
794 | spin_unlock_bh(&write_sockets_lock); | ||
795 | |||
796 | wake_up_interruptible(&lowcomms_send_waitq); | ||
797 | } | ||
798 | return; | ||
799 | |||
800 | out: | ||
801 | spin_unlock(&con->writequeue_lock); | ||
802 | return; | ||
803 | } | ||
804 | |||
805 | static void free_entry(struct writequeue_entry *e) | ||
806 | { | ||
807 | __free_page(e->page); | ||
808 | kfree(e); | ||
809 | } | ||
810 | |||
811 | /* Send a message */ | ||
812 | static int send_to_sock(struct connection *con) | ||
813 | { | ||
814 | int ret = 0; | ||
815 | ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); | ||
816 | const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; | ||
817 | struct writequeue_entry *e; | ||
818 | int len, offset; | ||
819 | |||
820 | down_read(&con->sock_sem); | ||
821 | if (con->sock == NULL) | ||
822 | goto out_connect; | ||
823 | |||
824 | sendpage = con->sock->ops->sendpage; | ||
825 | |||
826 | spin_lock(&con->writequeue_lock); | ||
827 | for (;;) { | ||
828 | e = list_entry(con->writequeue.next, struct writequeue_entry, | ||
829 | list); | ||
830 | if ((struct list_head *) e == &con->writequeue) | ||
831 | break; | ||
832 | |||
833 | len = e->len; | ||
834 | offset = e->offset; | ||
835 | BUG_ON(len == 0 && e->users == 0); | ||
836 | spin_unlock(&con->writequeue_lock); | ||
837 | |||
838 | ret = 0; | ||
839 | if (len) { | ||
840 | ret = sendpage(con->sock, e->page, offset, len, | ||
841 | msg_flags); | ||
842 | if (ret == -EAGAIN || ret == 0) | ||
843 | goto out; | ||
844 | if (ret <= 0) | ||
845 | goto send_error; | ||
846 | } | ||
847 | else { | ||
848 | /* Don't starve people filling buffers */ | ||
849 | schedule(); | ||
850 | } | ||
851 | |||
852 | spin_lock(&con->writequeue_lock); | ||
853 | e->offset += ret; | ||
854 | e->len -= ret; | ||
855 | |||
856 | if (e->len == 0 && e->users == 0) { | ||
857 | list_del(&e->list); | ||
858 | free_entry(e); | ||
859 | continue; | ||
860 | } | ||
861 | } | ||
862 | spin_unlock(&con->writequeue_lock); | ||
863 | out: | ||
864 | up_read(&con->sock_sem); | ||
865 | return ret; | ||
866 | |||
867 | send_error: | ||
868 | up_read(&con->sock_sem); | ||
869 | close_connection(con, FALSE); | ||
870 | lowcomms_connect_sock(con); | ||
871 | return ret; | ||
872 | |||
873 | out_connect: | ||
874 | up_read(&con->sock_sem); | ||
875 | lowcomms_connect_sock(con); | ||
876 | return 0; | ||
877 | } | ||
878 | |||
879 | static void clean_one_writequeue(struct connection *con) | ||
880 | { | ||
881 | struct list_head *list; | ||
882 | struct list_head *temp; | ||
883 | |||
884 | spin_lock(&con->writequeue_lock); | ||
885 | list_for_each_safe(list, temp, &con->writequeue) { | ||
886 | struct writequeue_entry *e = | ||
887 | list_entry(list, struct writequeue_entry, list); | ||
888 | list_del(&e->list); | ||
889 | free_entry(e); | ||
890 | } | ||
891 | spin_unlock(&con->writequeue_lock); | ||
892 | } | ||
893 | |||
894 | /* Called from recovery when it knows that a node has | ||
895 | left the cluster */ | ||
896 | int dlm_lowcomms_close(int nodeid) | ||
897 | { | ||
898 | struct connection *con; | ||
899 | |||
900 | if (!connections) | ||
901 | goto out; | ||
902 | |||
903 | log_print("closing connection to node %d", nodeid); | ||
904 | con = nodeid2con(nodeid, 0); | ||
905 | if (con) { | ||
906 | clean_one_writequeue(con); | ||
907 | close_connection(con, TRUE); | ||
908 | atomic_set(&con->waiting_requests, 0); | ||
909 | } | ||
910 | return 0; | ||
911 | |||
912 | out: | ||
913 | return -1; | ||
914 | } | ||
915 | |||
916 | /* API send message call, may queue the request */ | ||
917 | /* N.B. This is the old interface - use the new one for new calls */ | ||
918 | int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation) | ||
919 | { | ||
920 | struct writequeue_entry *e; | ||
921 | char *b; | ||
922 | |||
923 | e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b); | ||
924 | if (e) { | ||
925 | memcpy(b, buf, len); | ||
926 | dlm_lowcomms_commit_buffer(e); | ||
927 | return 0; | ||
928 | } | ||
929 | return -ENOBUFS; | ||
930 | } | ||
931 | |||
932 | /* Look for activity on active sockets */ | ||
933 | static void process_sockets(void) | ||
934 | { | ||
935 | struct list_head *list; | ||
936 | struct list_head *temp; | ||
937 | int count = 0; | ||
938 | |||
939 | spin_lock_bh(&read_sockets_lock); | ||
940 | list_for_each_safe(list, temp, &read_sockets) { | ||
941 | |||
942 | struct connection *con = | ||
943 | list_entry(list, struct connection, read_list); | ||
944 | list_del(&con->read_list); | ||
945 | clear_bit(CF_READ_PENDING, &con->flags); | ||
946 | |||
947 | spin_unlock_bh(&read_sockets_lock); | ||
948 | |||
949 | /* This can reach zero if we are processing requests | ||
950 | * as they come in. | ||
951 | */ | ||
952 | if (atomic_read(&con->waiting_requests) == 0) { | ||
953 | spin_lock_bh(&read_sockets_lock); | ||
954 | continue; | ||
955 | } | ||
956 | |||
957 | do { | ||
958 | con->rx_action(con); | ||
959 | |||
960 | /* Don't starve out everyone else */ | ||
961 | if (++count >= MAX_RX_MSG_COUNT) { | ||
962 | schedule(); | ||
963 | count = 0; | ||
964 | } | ||
965 | |||
966 | } while (!atomic_dec_and_test(&con->waiting_requests) && | ||
967 | !kthread_should_stop()); | ||
968 | |||
969 | spin_lock_bh(&read_sockets_lock); | ||
970 | } | ||
971 | spin_unlock_bh(&read_sockets_lock); | ||
972 | } | ||
973 | |||
974 | /* Try to send any messages that are pending | ||
975 | */ | ||
976 | static void process_output_queue(void) | ||
977 | { | ||
978 | struct list_head *list; | ||
979 | struct list_head *temp; | ||
980 | int ret; | ||
981 | |||
982 | spin_lock_bh(&write_sockets_lock); | ||
983 | list_for_each_safe(list, temp, &write_sockets) { | ||
984 | struct connection *con = | ||
985 | list_entry(list, struct connection, write_list); | ||
986 | clear_bit(CF_WRITE_PENDING, &con->flags); | ||
987 | list_del(&con->write_list); | ||
988 | |||
989 | spin_unlock_bh(&write_sockets_lock); | ||
990 | |||
991 | ret = send_to_sock(con); | ||
992 | if (ret < 0) { | ||
993 | } | ||
994 | spin_lock_bh(&write_sockets_lock); | ||
995 | } | ||
996 | spin_unlock_bh(&write_sockets_lock); | ||
997 | } | ||
998 | |||
999 | static void process_state_queue(void) | ||
1000 | { | ||
1001 | struct list_head *list; | ||
1002 | struct list_head *temp; | ||
1003 | int ret; | ||
1004 | |||
1005 | spin_lock_bh(&state_sockets_lock); | ||
1006 | list_for_each_safe(list, temp, &state_sockets) { | ||
1007 | struct connection *con = | ||
1008 | list_entry(list, struct connection, state_list); | ||
1009 | list_del(&con->state_list); | ||
1010 | clear_bit(CF_CONNECT_PENDING, &con->flags); | ||
1011 | spin_unlock_bh(&state_sockets_lock); | ||
1012 | |||
1013 | ret = connect_to_sock(con); | ||
1014 | if (ret < 0) { | ||
1015 | } | ||
1016 | spin_lock_bh(&state_sockets_lock); | ||
1017 | } | ||
1018 | spin_unlock_bh(&state_sockets_lock); | ||
1019 | } | ||
1020 | |||
1021 | |||
1022 | /* Discard all entries on the write queues */ | ||
1023 | static void clean_writequeues(void) | ||
1024 | { | ||
1025 | int nodeid; | ||
1026 | |||
1027 | for (nodeid = 1; nodeid < conn_array_size; nodeid++) { | ||
1028 | struct connection *con = nodeid2con(nodeid, 0); | ||
1029 | |||
1030 | if (con) | ||
1031 | clean_one_writequeue(con); | ||
1032 | } | ||
1033 | } | ||
1034 | |||
1035 | static int read_list_empty(void) | ||
1036 | { | ||
1037 | int status; | ||
1038 | |||
1039 | spin_lock_bh(&read_sockets_lock); | ||
1040 | status = list_empty(&read_sockets); | ||
1041 | spin_unlock_bh(&read_sockets_lock); | ||
1042 | |||
1043 | return status; | ||
1044 | } | ||
1045 | |||
1046 | /* DLM Transport comms receive daemon */ | ||
1047 | static int dlm_recvd(void *data) | ||
1048 | { | ||
1049 | init_waitqueue_head(&lowcomms_recv_waitq); | ||
1050 | init_waitqueue_entry(&lowcomms_recv_waitq_head, current); | ||
1051 | add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); | ||
1052 | |||
1053 | while (!kthread_should_stop()) { | ||
1054 | set_current_state(TASK_INTERRUPTIBLE); | ||
1055 | if (read_list_empty()) | ||
1056 | schedule(); | ||
1057 | set_current_state(TASK_RUNNING); | ||
1058 | |||
1059 | process_sockets(); | ||
1060 | } | ||
1061 | |||
1062 | return 0; | ||
1063 | } | ||
1064 | |||
1065 | static int write_and_state_lists_empty(void) | ||
1066 | { | ||
1067 | int status; | ||
1068 | |||
1069 | spin_lock_bh(&write_sockets_lock); | ||
1070 | status = list_empty(&write_sockets); | ||
1071 | spin_unlock_bh(&write_sockets_lock); | ||
1072 | |||
1073 | spin_lock_bh(&state_sockets_lock); | ||
1074 | if (list_empty(&state_sockets) == 0) | ||
1075 | status = 0; | ||
1076 | spin_unlock_bh(&state_sockets_lock); | ||
1077 | |||
1078 | return status; | ||
1079 | } | ||
1080 | |||
1081 | /* DLM Transport send daemon */ | ||
1082 | static int dlm_sendd(void *data) | ||
1083 | { | ||
1084 | init_waitqueue_head(&lowcomms_send_waitq); | ||
1085 | init_waitqueue_entry(&lowcomms_send_waitq_head, current); | ||
1086 | add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); | ||
1087 | |||
1088 | while (!kthread_should_stop()) { | ||
1089 | set_current_state(TASK_INTERRUPTIBLE); | ||
1090 | if (write_and_state_lists_empty()) | ||
1091 | schedule(); | ||
1092 | set_current_state(TASK_RUNNING); | ||
1093 | |||
1094 | process_state_queue(); | ||
1095 | process_output_queue(); | ||
1096 | } | ||
1097 | |||
1098 | return 0; | ||
1099 | } | ||
1100 | |||
1101 | static void daemons_stop(void) | ||
1102 | { | ||
1103 | kthread_stop(recv_task); | ||
1104 | kthread_stop(send_task); | ||
1105 | } | ||
1106 | |||
1107 | static int daemons_start(void) | ||
1108 | { | ||
1109 | struct task_struct *p; | ||
1110 | int error; | ||
1111 | |||
1112 | p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); | ||
1113 | error = IS_ERR(p); | ||
1114 | if (error) { | ||
1115 | log_print("can't start dlm_recvd %d", error); | ||
1116 | return error; | ||
1117 | } | ||
1118 | recv_task = p; | ||
1119 | |||
1120 | p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); | ||
1121 | error = IS_ERR(p); | ||
1122 | if (error) { | ||
1123 | log_print("can't start dlm_sendd %d", error); | ||
1124 | kthread_stop(recv_task); | ||
1125 | return error; | ||
1126 | } | ||
1127 | send_task = p; | ||
1128 | |||
1129 | return 0; | ||
1130 | } | ||
1131 | |||
1132 | /* | ||
1133 | * Return the largest buffer size we can cope with. | ||
1134 | */ | ||
1135 | int lowcomms_max_buffer_size(void) | ||
1136 | { | ||
1137 | return PAGE_CACHE_SIZE; | ||
1138 | } | ||
1139 | |||
1140 | void dlm_lowcomms_stop(void) | ||
1141 | { | ||
1142 | int i; | ||
1143 | |||
1144 | atomic_set(&accepting, 0); | ||
1145 | |||
1146 | /* Set all the activity flags to prevent any | ||
1147 | socket activity. | ||
1148 | */ | ||
1149 | for (i = 0; i < conn_array_size; i++) { | ||
1150 | if (connections[i]) | ||
1151 | connections[i]->flags |= 0x7; | ||
1152 | } | ||
1153 | daemons_stop(); | ||
1154 | clean_writequeues(); | ||
1155 | |||
1156 | for (i = 0; i < conn_array_size; i++) { | ||
1157 | if (connections[i]) { | ||
1158 | close_connection(connections[i], TRUE); | ||
1159 | if (connections[i]->othercon) | ||
1160 | kmem_cache_free(con_cache, connections[i]->othercon); | ||
1161 | kmem_cache_free(con_cache, connections[i]); | ||
1162 | } | ||
1163 | } | ||
1164 | |||
1165 | kfree(connections); | ||
1166 | connections = NULL; | ||
1167 | |||
1168 | kmem_cache_destroy(con_cache); | ||
1169 | } | ||
1170 | |||
1171 | /* This is quite likely to sleep... */ | ||
1172 | int dlm_lowcomms_start(void) | ||
1173 | { | ||
1174 | int error = 0; | ||
1175 | |||
1176 | error = -ENOTCONN; | ||
1177 | |||
1178 | /* | ||
1179 | * Temporarily initialise the waitq head so that lowcomms_send_message | ||
1180 | * doesn't crash if it gets called before the thread is fully | ||
1181 | * initialised | ||
1182 | */ | ||
1183 | init_waitqueue_head(&lowcomms_send_waitq); | ||
1184 | |||
1185 | error = -ENOMEM; | ||
1186 | connections = kmalloc(sizeof(struct connection *) * | ||
1187 | NODE_INCREMENT, GFP_KERNEL); | ||
1188 | if (!connections) | ||
1189 | goto out; | ||
1190 | |||
1191 | memset(connections, 0, | ||
1192 | sizeof(struct connection *) * NODE_INCREMENT); | ||
1193 | |||
1194 | conn_array_size = NODE_INCREMENT; | ||
1195 | |||
1196 | if (dlm_our_addr(&dlm_local_addr, 0)) { | ||
1197 | log_print("no local IP address has been set"); | ||
1198 | goto fail_free_conn; | ||
1199 | } | ||
1200 | if (!dlm_our_addr(&dlm_local_addr, 1)) { | ||
1201 | log_print("This dlm comms module does not support multi-homed clustering"); | ||
1202 | goto fail_free_conn; | ||
1203 | } | ||
1204 | |||
1205 | con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), | ||
1206 | __alignof__(struct connection), 0, NULL, NULL); | ||
1207 | if (!con_cache) | ||
1208 | goto fail_free_conn; | ||
1209 | |||
1210 | |||
1211 | /* Start listening */ | ||
1212 | error = listen_for_all(); | ||
1213 | if (error) | ||
1214 | goto fail_unlisten; | ||
1215 | |||
1216 | error = daemons_start(); | ||
1217 | if (error) | ||
1218 | goto fail_unlisten; | ||
1219 | |||
1220 | atomic_set(&accepting, 1); | ||
1221 | |||
1222 | return 0; | ||
1223 | |||
1224 | fail_unlisten: | ||
1225 | close_connection(connections[0], 0); | ||
1226 | kmem_cache_free(con_cache, connections[0]); | ||
1227 | kmem_cache_destroy(con_cache); | ||
1228 | |||
1229 | fail_free_conn: | ||
1230 | kfree(connections); | ||
1231 | |||
1232 | out: | ||
1233 | return error; | ||
1234 | } | ||
1235 | |||
1236 | int dlm_lowcomms_init(void) | ||
1237 | { | ||
1238 | INIT_LIST_HEAD(&read_sockets); | ||
1239 | INIT_LIST_HEAD(&write_sockets); | ||
1240 | INIT_LIST_HEAD(&state_sockets); | ||
1241 | |||
1242 | spin_lock_init(&read_sockets_lock); | ||
1243 | spin_lock_init(&write_sockets_lock); | ||
1244 | spin_lock_init(&state_sockets_lock); | ||
1245 | init_MUTEX(&connections_lock); | ||
1246 | |||
1247 | return 0; | ||
1248 | } | ||
1249 | |||
1250 | void dlm_lowcomms_exit(void) | ||
1251 | { | ||
1252 | } | ||
1253 | |||
1254 | /* | ||
1255 | * Overrides for Emacs so that we follow Linus's tabbing style. | ||
1256 | * Emacs will notice this stuff at the end of the file and automatically | ||
1257 | * adjust the settings for this buffer only. This must remain at the end | ||
1258 | * of the file. | ||
1259 | * --------------------------------------------------------------------------- | ||
1260 | * Local variables: | ||
1261 | * c-file-style: "linux" | ||
1262 | * End: | ||
1263 | */ | ||