aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r--fs/dlm/lowcomms-tcp.c342
1 files changed, 134 insertions, 208 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index 7289e59b4bd3..8f2791fc8447 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -54,44 +54,59 @@
54#include "config.h" 54#include "config.h"
55 55
56struct cbuf { 56struct cbuf {
57 unsigned base; 57 unsigned int base;
58 unsigned len; 58 unsigned int len;
59 unsigned mask; 59 unsigned int mask;
60}; 60};
61 61
62#ifndef FALSE
63#define FALSE 0
64#define TRUE 1
65#endif
66#define NODE_INCREMENT 32 62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n)
64{
65 cb->len += n;
66}
67 67
68#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) 68static int cbuf_data(struct cbuf *cb)
69#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 69{
70#define CBUF_EMPTY(cb) ((cb)->len == 0) 70 return ((cb->base + cb->len) & cb->mask);
71#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 71}
72#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ 72
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) 73static void cbuf_init(struct cbuf *cb, int size)
74#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 74{
75 cb->base = cb->len = 0;
76 cb->mask = size-1;
77}
78
79static void cbuf_eat(struct cbuf *cb, int n)
80{
81 cb->len -= n;
82 cb->base += n;
83 cb->base &= cb->mask;
84}
85
86static bool cbuf_empty(struct cbuf *cb)
87{
88 return cb->len == 0;
89}
75 90
76/* Maximum number of incoming messages to process before 91/* Maximum number of incoming messages to process before
77 doing a schedule() 92 doing a cond_resched()
78*/ 93*/
79#define MAX_RX_MSG_COUNT 25 94#define MAX_RX_MSG_COUNT 25
80 95
81struct connection { 96struct connection {
82 struct socket *sock; /* NULL if not connected */ 97 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */ 98 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */ 99 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */ 100 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */ 101 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */ 102 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 103 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89#define CF_READ_PENDING 1 104#define CF_READ_PENDING 1
90#define CF_WRITE_PENDING 2 105#define CF_WRITE_PENDING 2
91#define CF_CONNECT_PENDING 3 106#define CF_CONNECT_PENDING 3
92#define CF_IS_OTHERCON 4 107#define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */ 108 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */ 109 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock; 110 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */ 111 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page; 112 struct page *rx_page;
@@ -121,28 +136,27 @@ static struct task_struct *recv_task;
121static struct task_struct *send_task; 136static struct task_struct *send_task;
122 137
123static wait_queue_t lowcomms_send_waitq_head; 138static wait_queue_t lowcomms_send_waitq_head;
124static wait_queue_head_t lowcomms_send_waitq; 139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
125static wait_queue_t lowcomms_recv_waitq_head; 140static wait_queue_t lowcomms_recv_waitq_head;
126static wait_queue_head_t lowcomms_recv_waitq; 141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
127 142
128/* An array of pointers to connections, indexed by NODEID */ 143/* An array of pointers to connections, indexed by NODEID */
129static struct connection **connections; 144static struct connection **connections;
130static struct semaphore connections_lock; 145static DECLARE_MUTEX(connections_lock);
131static kmem_cache_t *con_cache; 146static kmem_cache_t *con_cache;
132static int conn_array_size; 147static int conn_array_size;
133static atomic_t accepting;
134 148
135/* List of sockets that have reads pending */ 149/* List of sockets that have reads pending */
136static struct list_head read_sockets; 150static LIST_HEAD(read_sockets);
137static spinlock_t read_sockets_lock; 151static DEFINE_SPINLOCK(read_sockets_lock);
138 152
139/* List of sockets which have writes pending */ 153/* List of sockets which have writes pending */
140static struct list_head write_sockets; 154static LIST_HEAD(write_sockets);
141static spinlock_t write_sockets_lock; 155static DEFINE_SPINLOCK(write_sockets_lock);
142 156
143/* List of sockets which have connects pending */ 157/* List of sockets which have connects pending */
144static struct list_head state_sockets; 158static LIST_HEAD(state_sockets);
145static spinlock_t state_sockets_lock; 159static DEFINE_SPINLOCK(state_sockets_lock);
146 160
147static struct connection *nodeid2con(int nodeid, gfp_t allocation) 161static struct connection *nodeid2con(int nodeid, gfp_t allocation)
148{ 162{
@@ -153,12 +167,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
153 int new_size = nodeid + NODE_INCREMENT; 167 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns; 168 struct connection **new_conns;
155 169
156 new_conns = kmalloc(sizeof(struct connection *) * 170 new_conns = kzalloc(sizeof(struct connection *) *
157 new_size, allocation); 171 new_size, allocation);
158 if (!new_conns) 172 if (!new_conns)
159 goto finish; 173 goto finish;
160 174
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); 175 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size; 176 conn_array_size = new_size;
164 kfree(connections); 177 kfree(connections);
@@ -168,11 +181,10 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
168 181
169 con = connections[nodeid]; 182 con = connections[nodeid];
170 if (con == NULL && allocation) { 183 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation); 184 con = kmem_cache_zalloc(con_cache, allocation);
172 if (!con) 185 if (!con)
173 goto finish; 186 goto finish;
174 187
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid; 188 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem); 189 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue); 190 INIT_LIST_HEAD(&con->writequeue);
@@ -181,7 +193,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
181 connections[nodeid] = con; 193 connections[nodeid] = con;
182 } 194 }
183 195
184 finish: 196finish:
185 up(&connections_lock); 197 up(&connections_lock);
186 return con; 198 return con;
187} 199}
@@ -220,8 +232,6 @@ static inline void lowcomms_connect_sock(struct connection *con)
220{ 232{
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return; 234 return;
223 if (!atomic_read(&accepting))
224 return;
225 235
226 spin_lock_bh(&state_sockets_lock); 236 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets); 237 list_add_tail(&con->state_list, &state_sockets);
@@ -232,31 +242,8 @@ static inline void lowcomms_connect_sock(struct connection *con)
232 242
233static void lowcomms_state_change(struct sock *sk) 243static void lowcomms_state_change(struct sock *sk)
234{ 244{
235/* struct connection *con = sock2con(sk); */ 245 if (sk->sk_state == TCP_ESTABLISHED)
236
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk); 246 lowcomms_write_space(sk);
240 break;
241
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
252 */
253 /* lowcomms_connect_sock(con); */
254 break;
255
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
259 }
260} 247}
261 248
262/* Make a socket active */ 249/* Make a socket active */
@@ -277,13 +264,12 @@ static int add_sock(struct socket *sock, struct connection *con)
277static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 264static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len) 265 int *addr_len)
279{ 266{
280 saddr->ss_family = dlm_local_addr.ss_family; 267 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) { 268 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 269 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port); 270 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in); 271 *addr_len = sizeof(struct sockaddr_in);
285 } 272 } else {
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 273 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port); 274 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6); 275 *addr_len = sizeof(struct sockaddr_in6);
@@ -291,7 +277,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
291} 277}
292 278
293/* Close a remote connection and tidy up */ 279/* Close a remote connection and tidy up */
294static void close_connection(struct connection *con, int and_other) 280static void close_connection(struct connection *con, bool and_other)
295{ 281{
296 down_write(&con->sock_sem); 282 down_write(&con->sock_sem);
297 283
@@ -300,11 +286,8 @@ static void close_connection(struct connection *con, int and_other)
300 con->sock = NULL; 286 con->sock = NULL;
301 } 287 }
302 if (con->othercon && and_other) { 288 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code! 289 /* Will only re-enter once. */
304 Actually, this isn't a list so it 290 close_connection(con->othercon, false);
305 will only re-enter once.
306 */
307 close_connection(con->othercon, FALSE);
308 } 291 }
309 if (con->rx_page) { 292 if (con->rx_page) {
310 __free_page(con->rx_page); 293 __free_page(con->rx_page);
@@ -337,7 +320,7 @@ static int receive_from_sock(struct connection *con)
337 con->rx_page = alloc_page(GFP_ATOMIC); 320 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL) 321 if (con->rx_page == NULL)
339 goto out_resched; 322 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE); 323 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
341 } 324 }
342 325
343 msg.msg_control = NULL; 326 msg.msg_control = NULL;
@@ -352,16 +335,16 @@ static int receive_from_sock(struct connection *con)
352 * iov[0] is the bit of the circular buffer between the current end 335 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer. 336 * point (cb.base + cb.len) and the end of the buffer.
354 */ 337 */
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb); 338 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb); 339 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
357 iov[1].iov_len = 0; 340 iov[1].iov_len = 0;
358 341
359 /* 342 /*
360 * iov[1] is the bit of the circular buffer between the start of the 343 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base) 344 * buffer and the start of the currently used section (cb.base)
362 */ 345 */
363 if (CBUF_DATA(&con->cb) >= con->cb.base) { 346 if (cbuf_data(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb); 347 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
365 iov[1].iov_len = con->cb.base; 348 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page); 349 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2; 350 msg.msg_iovlen = 2;
@@ -378,7 +361,7 @@ static int receive_from_sock(struct connection *con)
378 goto out_close; 361 goto out_close;
379 if (ret == len) 362 if (ret == len)
380 call_again_soon = 1; 363 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret); 364 cbuf_add(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid, 365 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page), 366 page_address(con->rx_page),
384 con->cb.base, con->cb.len, 367 con->cb.base, con->cb.len,
@@ -391,35 +374,32 @@ static int receive_from_sock(struct connection *con)
391 } 374 }
392 if (ret < 0) 375 if (ret < 0)
393 goto out_close; 376 goto out_close;
394 CBUF_EAT(&con->cb, ret); 377 cbuf_eat(&con->cb, ret);
395 378
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) { 379 if (cbuf_empty(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page); 380 __free_page(con->rx_page);
398 con->rx_page = NULL; 381 con->rx_page = NULL;
399 } 382 }
400 383
401 out: 384out:
402 if (call_again_soon) 385 if (call_again_soon)
403 goto out_resched; 386 goto out_resched;
404 up_read(&con->sock_sem); 387 up_read(&con->sock_sem);
405 ret = 0; 388 return 0;
406 goto out_ret;
407 389
408 out_resched: 390out_resched:
409 lowcomms_data_ready(con->sock->sk, 0); 391 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem); 392 up_read(&con->sock_sem);
411 ret = 0; 393 cond_resched();
412 schedule(); 394 return 0;
413 goto out_ret;
414 395
415 out_close: 396out_close:
416 up_read(&con->sock_sem); 397 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { 398 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE); 399 close_connection(con, false);
419 /* Reconnect when there is something to send */ 400 /* Reconnect when there is something to send */
420 } 401 }
421 402
422 out_ret:
423 return ret; 403 return ret;
424} 404}
425 405
@@ -434,7 +414,8 @@ static int accept_from_sock(struct connection *con)
434 struct connection *newcon; 414 struct connection *newcon;
435 415
436 memset(&peeraddr, 0, sizeof(peeraddr)); 416 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); 417 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418 IPPROTO_TCP, &newsock);
438 if (result < 0) 419 if (result < 0)
439 return -ENOMEM; 420 return -ENOMEM;
440 421
@@ -462,7 +443,7 @@ static int accept_from_sock(struct connection *con)
462 /* Get the new node's NODEID */ 443 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len); 444 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 445 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n"); 446 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock); 447 sock_release(newsock);
467 up_read(&con->sock_sem); 448 up_read(&con->sock_sem);
468 return -1; 449 return -1;
@@ -483,17 +464,16 @@ static int accept_from_sock(struct connection *con)
483 } 464 }
484 down_write(&newcon->sock_sem); 465 down_write(&newcon->sock_sem);
485 if (newcon->sock) { 466 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon; 467 struct connection *othercon = newcon->othercon;
487 468
488 if (!othercon) { 469 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL); 470 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
490 if (!othercon) { 471 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n"); 472 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem); 473 up_write(&newcon->sock_sem);
493 result = -ENOMEM; 474 result = -ENOMEM;
494 goto accept_err; 475 goto accept_err;
495 } 476 }
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid; 477 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock; 478 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem); 479 init_rwsem(&othercon->sock_sem);
@@ -523,7 +503,7 @@ static int accept_from_sock(struct connection *con)
523 503
524 return 0; 504 return 0;
525 505
526 accept_err: 506accept_err:
527 up_read(&con->sock_sem); 507 up_read(&con->sock_sem);
528 sock_release(newsock); 508 sock_release(newsock);
529 509
@@ -533,7 +513,7 @@ static int accept_from_sock(struct connection *con)
533} 513}
534 514
535/* Connect a new socket to its peer */ 515/* Connect a new socket to its peer */
536static int connect_to_sock(struct connection *con) 516static void connect_to_sock(struct connection *con)
537{ 517{
538 int result = -EHOSTUNREACH; 518 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr; 519 struct sockaddr_storage saddr;
@@ -542,7 +522,7 @@ static int connect_to_sock(struct connection *con)
542 522
543 if (con->nodeid == 0) { 523 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled"); 524 log_print("attempt to connect sock 0 foiled");
545 return 0; 525 return;
546 } 526 }
547 527
548 down_write(&con->sock_sem); 528 down_write(&con->sock_sem);
@@ -556,13 +536,14 @@ static int connect_to_sock(struct connection *con)
556 } 536 }
557 537
558 /* Create a socket to communicate with */ 538 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); 539 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540 IPPROTO_TCP, &sock);
560 if (result < 0) 541 if (result < 0)
561 goto out_err; 542 goto out_err;
562 543
563 memset(&saddr, 0, sizeof(saddr)); 544 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 545 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err; 546 goto out_err;
566 547
567 sock->sk->sk_user_data = con; 548 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock; 549 con->rx_action = receive_from_sock;
@@ -574,22 +555,13 @@ static int connect_to_sock(struct connection *con)
574 log_print("connecting to %d", con->nodeid); 555 log_print("connecting to %d", con->nodeid);
575 result = 556 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 557 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK); 558 O_NONBLOCK);
578 if (result == -EINPROGRESS) 559 if (result == -EINPROGRESS)
579 result = 0; 560 result = 0;
580 if (result != 0) 561 if (result == 0)
581 goto out_err; 562 goto out;
582
583 out:
584 up_write(&con->sock_sem);
585 /*
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
589 */
590 return result;
591 563
592 out_err: 564out_err:
593 if (con->sock) { 565 if (con->sock) {
594 sock_release(con->sock); 566 sock_release(con->sock);
595 con->sock = NULL; 567 con->sock = NULL;
@@ -604,12 +576,15 @@ static int connect_to_sock(struct connection *con)
604 lowcomms_connect_sock(con); 576 lowcomms_connect_sock(con);
605 result = 0; 577 result = 0;
606 } 578 }
607 goto out; 579out:
580 up_write(&con->sock_sem);
581 return;
608} 582}
609 583
610static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) 584static struct socket *create_listen_sock(struct connection *con,
585 struct sockaddr_storage *saddr)
611{ 586{
612 struct socket *sock = NULL; 587 struct socket *sock = NULL;
613 mm_segment_t fs; 588 mm_segment_t fs;
614 int result = 0; 589 int result = 0;
615 int one = 1; 590 int one = 1;
@@ -629,10 +604,12 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
629 604
630 fs = get_fs(); 605 fs = get_fs();
631 set_fs(get_ds()); 606 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); 607 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608 (char *)&one, sizeof(one));
633 set_fs(fs); 609 set_fs(fs);
634 if (result < 0) { 610 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); 611 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612 result);
636 } 613 }
637 sock->sk->sk_user_data = con; 614 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock; 615 con->rx_action = accept_from_sock;
@@ -652,7 +629,8 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
652 fs = get_fs(); 629 fs = get_fs();
653 set_fs(get_ds()); 630 set_fs(get_ds());
654 631
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); 632 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633 (char *)&one, sizeof(one));
656 set_fs(fs); 634 set_fs(fs);
657 if (result < 0) { 635 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result); 636 printk("dlm: Set keepalive failed: %d\n", result);
@@ -666,7 +644,7 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
666 goto create_out; 644 goto create_out;
667 } 645 }
668 646
669 create_out: 647create_out:
670 return sock; 648 return sock;
671} 649}
672 650
@@ -679,10 +657,6 @@ static int listen_for_all(void)
679 int result = -EINVAL; 657 int result = -EINVAL;
680 658
681 /* We don't support multi-homed hosts */ 659 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags); 660 set_bit(CF_IS_OTHERCON, &con->flags);
687 661
688 sock = create_listen_sock(con, &dlm_local_addr); 662 sock = create_listen_sock(con, &dlm_local_addr);
@@ -731,16 +705,12 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
731 int offset = 0; 705 int offset = 0;
732 int users = 0; 706 int users = 0;
733 707
734 if (!atomic_read(&accepting))
735 return NULL;
736
737 con = nodeid2con(nodeid, allocation); 708 con = nodeid2con(nodeid, allocation);
738 if (!con) 709 if (!con)
739 return NULL; 710 return NULL;
740 711
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 712 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) || 713 if ((&e->list == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) { 714 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL; 715 e = NULL;
746 } else { 716 } else {
@@ -751,7 +721,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
751 spin_unlock(&con->writequeue_lock); 721 spin_unlock(&con->writequeue_lock);
752 722
753 if (e) { 723 if (e) {
754 got_one: 724 got_one:
755 if (users == 0) 725 if (users == 0)
756 kmap(e->page); 726 kmap(e->page);
757 *ppc = page_address(e->page) + offset; 727 *ppc = page_address(e->page) + offset;
@@ -777,10 +747,6 @@ void dlm_lowcomms_commit_buffer(void *mh)
777 struct connection *con = e->con; 747 struct connection *con = e->con;
778 int users; 748 int users;
779 749
780 if (!atomic_read(&accepting))
781 return;
782
783 spin_lock(&con->writequeue_lock);
784 users = --e->users; 750 users = --e->users;
785 if (users) 751 if (users)
786 goto out; 752 goto out;
@@ -797,7 +763,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
797 } 763 }
798 return; 764 return;
799 765
800 out: 766out:
801 spin_unlock(&con->writequeue_lock); 767 spin_unlock(&con->writequeue_lock);
802 return; 768 return;
803} 769}
@@ -809,7 +775,7 @@ static void free_entry(struct writequeue_entry *e)
809} 775}
810 776
811/* Send a message */ 777/* Send a message */
812static int send_to_sock(struct connection *con) 778static void send_to_sock(struct connection *con)
813{ 779{
814 int ret = 0; 780 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); 781 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
@@ -846,7 +812,7 @@ static int send_to_sock(struct connection *con)
846 } 812 }
847 else { 813 else {
848 /* Don't starve people filling buffers */ 814 /* Don't starve people filling buffers */
849 schedule(); 815 cond_resched();
850 } 816 }
851 817
852 spin_lock(&con->writequeue_lock); 818 spin_lock(&con->writequeue_lock);
@@ -855,25 +821,26 @@ static int send_to_sock(struct connection *con)
855 821
856 if (e->len == 0 && e->users == 0) { 822 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list); 823 list_del(&e->list);
824 kunmap(e->page);
858 free_entry(e); 825 free_entry(e);
859 continue; 826 continue;
860 } 827 }
861 } 828 }
862 spin_unlock(&con->writequeue_lock); 829 spin_unlock(&con->writequeue_lock);
863 out: 830out:
864 up_read(&con->sock_sem); 831 up_read(&con->sock_sem);
865 return ret; 832 return;
866 833
867 send_error: 834send_error:
868 up_read(&con->sock_sem); 835 up_read(&con->sock_sem);
869 close_connection(con, FALSE); 836 close_connection(con, false);
870 lowcomms_connect_sock(con); 837 lowcomms_connect_sock(con);
871 return ret; 838 return;
872 839
873 out_connect: 840out_connect:
874 up_read(&con->sock_sem); 841 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con); 842 lowcomms_connect_sock(con);
876 return 0; 843 return;
877} 844}
878 845
879static void clean_one_writequeue(struct connection *con) 846static void clean_one_writequeue(struct connection *con)
@@ -904,12 +871,12 @@ int dlm_lowcomms_close(int nodeid)
904 con = nodeid2con(nodeid, 0); 871 con = nodeid2con(nodeid, 0);
905 if (con) { 872 if (con) {
906 clean_one_writequeue(con); 873 clean_one_writequeue(con);
907 close_connection(con, TRUE); 874 close_connection(con, true);
908 atomic_set(&con->waiting_requests, 0); 875 atomic_set(&con->waiting_requests, 0);
909 } 876 }
910 return 0; 877 return 0;
911 878
912 out: 879out:
913 return -1; 880 return -1;
914} 881}
915 882
@@ -940,7 +907,7 @@ static void process_sockets(void)
940 list_for_each_safe(list, temp, &read_sockets) { 907 list_for_each_safe(list, temp, &read_sockets) {
941 908
942 struct connection *con = 909 struct connection *con =
943 list_entry(list, struct connection, read_list); 910 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list); 911 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags); 912 clear_bit(CF_READ_PENDING, &con->flags);
946 913
@@ -959,7 +926,7 @@ static void process_sockets(void)
959 926
960 /* Don't starve out everyone else */ 927 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) { 928 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule(); 929 cond_resched();
963 count = 0; 930 count = 0;
964 } 931 }
965 932
@@ -977,20 +944,16 @@ static void process_output_queue(void)
977{ 944{
978 struct list_head *list; 945 struct list_head *list;
979 struct list_head *temp; 946 struct list_head *temp;
980 int ret;
981 947
982 spin_lock_bh(&write_sockets_lock); 948 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) { 949 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con = 950 struct connection *con =
985 list_entry(list, struct connection, write_list); 951 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags); 952 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list); 953 list_del(&con->write_list);
988 954
989 spin_unlock_bh(&write_sockets_lock); 955 spin_unlock_bh(&write_sockets_lock);
990 956 send_to_sock(con);
991 ret = send_to_sock(con);
992 if (ret < 0) {
993 }
994 spin_lock_bh(&write_sockets_lock); 957 spin_lock_bh(&write_sockets_lock);
995 } 958 }
996 spin_unlock_bh(&write_sockets_lock); 959 spin_unlock_bh(&write_sockets_lock);
@@ -1000,19 +963,16 @@ static void process_state_queue(void)
1000{ 963{
1001 struct list_head *list; 964 struct list_head *list;
1002 struct list_head *temp; 965 struct list_head *temp;
1003 int ret;
1004 966
1005 spin_lock_bh(&state_sockets_lock); 967 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) { 968 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con = 969 struct connection *con =
1008 list_entry(list, struct connection, state_list); 970 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list); 971 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags); 972 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock); 973 spin_unlock_bh(&state_sockets_lock);
1012 974
1013 ret = connect_to_sock(con); 975 connect_to_sock(con);
1014 if (ret < 0) {
1015 }
1016 spin_lock_bh(&state_sockets_lock); 976 spin_lock_bh(&state_sockets_lock);
1017 } 977 }
1018 spin_unlock_bh(&state_sockets_lock); 978 spin_unlock_bh(&state_sockets_lock);
@@ -1046,14 +1006,13 @@ static int read_list_empty(void)
1046/* DLM Transport comms receive daemon */ 1006/* DLM Transport comms receive daemon */
1047static int dlm_recvd(void *data) 1007static int dlm_recvd(void *data)
1048{ 1008{
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current); 1009 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); 1010 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1052 1011
1053 while (!kthread_should_stop()) { 1012 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE); 1013 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty()) 1014 if (read_list_empty())
1056 schedule(); 1015 cond_resched();
1057 set_current_state(TASK_RUNNING); 1016 set_current_state(TASK_RUNNING);
1058 1017
1059 process_sockets(); 1018 process_sockets();
@@ -1081,14 +1040,13 @@ static int write_and_state_lists_empty(void)
1081/* DLM Transport send daemon */ 1040/* DLM Transport send daemon */
1082static int dlm_sendd(void *data) 1041static int dlm_sendd(void *data)
1083{ 1042{
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current); 1043 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); 1044 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1087 1045
1088 while (!kthread_should_stop()) { 1046 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE); 1047 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty()) 1048 if (write_and_state_lists_empty())
1091 schedule(); 1049 cond_resched();
1092 set_current_state(TASK_RUNNING); 1050 set_current_state(TASK_RUNNING);
1093 1051
1094 process_state_queue(); 1052 process_state_queue();
@@ -1111,7 +1069,7 @@ static int daemons_start(void)
1111 1069
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1070 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p); 1071 error = IS_ERR(p);
1114 if (error) { 1072 if (error) {
1115 log_print("can't start dlm_recvd %d", error); 1073 log_print("can't start dlm_recvd %d", error);
1116 return error; 1074 return error;
1117 } 1075 }
@@ -1119,7 +1077,7 @@ static int daemons_start(void)
1119 1077
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1078 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p); 1079 error = IS_ERR(p);
1122 if (error) { 1080 if (error) {
1123 log_print("can't start dlm_sendd %d", error); 1081 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task); 1082 kthread_stop(recv_task);
1125 return error; 1083 return error;
@@ -1141,21 +1099,20 @@ void dlm_lowcomms_stop(void)
1141{ 1099{
1142 int i; 1100 int i;
1143 1101
1144 atomic_set(&accepting, 0); 1102 /* Set all the flags to prevent any
1145
1146 /* Set all the activity flags to prevent any
1147 socket activity. 1103 socket activity.
1148 */ 1104 */
1149 for (i = 0; i < conn_array_size; i++) { 1105 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i]) 1106 if (connections[i])
1151 connections[i]->flags |= 0x7; 1107 connections[i]->flags |= 0xFF;
1152 } 1108 }
1109
1153 daemons_stop(); 1110 daemons_stop();
1154 clean_writequeues(); 1111 clean_writequeues();
1155 1112
1156 for (i = 0; i < conn_array_size; i++) { 1113 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) { 1114 if (connections[i]) {
1158 close_connection(connections[i], TRUE); 1115 close_connection(connections[i], true);
1159 if (connections[i]->othercon) 1116 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon); 1117 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]); 1118 kmem_cache_free(con_cache, connections[i]);
@@ -1173,24 +1130,12 @@ int dlm_lowcomms_start(void)
1173{ 1130{
1174 int error = 0; 1131 int error = 0;
1175 1132
1176 error = -ENOTCONN;
1177
1178 /*
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1182 */
1183 init_waitqueue_head(&lowcomms_send_waitq);
1184
1185 error = -ENOMEM; 1133 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) * 1134 connections = kzalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL); 1135 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections) 1136 if (!connections)
1189 goto out; 1137 goto out;
1190 1138
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1193
1194 conn_array_size = NODE_INCREMENT; 1139 conn_array_size = NODE_INCREMENT;
1195 1140
1196 if (dlm_our_addr(&dlm_local_addr, 0)) { 1141 if (dlm_our_addr(&dlm_local_addr, 0)) {
@@ -1203,7 +1148,8 @@ int dlm_lowcomms_start(void)
1203 } 1148 }
1204 1149
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1150 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL); 1151 __alignof__(struct connection), 0,
1152 NULL, NULL);
1207 if (!con_cache) 1153 if (!con_cache)
1208 goto fail_free_conn; 1154 goto fail_free_conn;
1209 1155
@@ -1217,40 +1163,20 @@ int dlm_lowcomms_start(void)
1217 if (error) 1163 if (error)
1218 goto fail_unlisten; 1164 goto fail_unlisten;
1219 1165
1220 atomic_set(&accepting, 1);
1221
1222 return 0; 1166 return 0;
1223 1167
1224 fail_unlisten: 1168fail_unlisten:
1225 close_connection(connections[0], 0); 1169 close_connection(connections[0], false);
1226 kmem_cache_free(con_cache, connections[0]); 1170 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache); 1171 kmem_cache_destroy(con_cache);
1228 1172
1229 fail_free_conn: 1173fail_free_conn:
1230 kfree(connections); 1174 kfree(connections);
1231 1175
1232 out: 1176out:
1233 return error; 1177 return error;
1234} 1178}
1235 1179
1236int dlm_lowcomms_init(void)
1237{
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1241
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1246
1247 return 0;
1248}
1249
1250void dlm_lowcomms_exit(void)
1251{
1252}
1253
1254/* 1180/*
1255 * Overrides for Emacs so that we follow Linus's tabbing style. 1181 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically 1182 * Emacs will notice this stuff at the end of the file and automatically