aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-tcp.c
diff options
context:
space:
mode:
authorPatrick Caulfield <pcaulfie@redhat.com>2006-12-06 10:10:37 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2006-12-07 09:25:13 -0500
commitac33d0710595579e3cfca42dde2257eb0b123f6d (patch)
treeebb3050be68aa49666ee03f51ffe2667f5b18c74 /fs/dlm/lowcomms-tcp.c
parent34126f9f41901ca9d7d0031c2b11fc0d6c07b72d (diff)
[DLM] Clean up lowcomms
This fixes up most of the things pointed out by akpm and Pavel Machek with comments below indicating why some things have been left: Andrew Morton wrote: > >> +static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) >> +{ >> + struct nodeinfo *ni; >> + int r; >> + int n; >> + >> + down_read(&nodeinfo_lock); > > Given that this function can sleep, I wonder if `alloc' is useful. > > I see lots of callers passing in a literal "0" for `alloc'. That's in fact > a secret (GFP_ATOMIC & ~__GFP_HIGH). I doubt if that's what you really > meant. Particularly as the code could at least have used __GFP_WAIT (aka > GFP_NOIO) which is much, much more reliable than "0". In fact "0" is the > least reliable mode possible. > > IOW, this is all bollixed up. When 0 is passed into nodeid2nodeinfo the function does not try to allocate a new structure at all. it's an indication that the caller only wants the nodeinfo struct for that nodeid if there actually is one in existance. I've tidied the function itself so it's more obvious, (and tidier!) >> +/* Data received from remote end */ >> +static int receive_from_sock(void) >> +{ >> + int ret = 0; >> + struct msghdr msg; >> + struct kvec iov[2]; >> + unsigned len; >> + int r; >> + struct sctp_sndrcvinfo *sinfo; >> + struct cmsghdr *cmsg; >> + struct nodeinfo *ni; >> + >> + /* These two are marginally too big for stack allocation, but this >> + * function is (currently) only called by dlm_recvd so static should be >> + * OK. >> + */ >> + static struct sockaddr_storage msgname; >> + static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > > whoa. This is globally singly-threaded code?? Yes. it is only ever run in the context of dlm_recvd. >> >> +static void initiate_association(int nodeid) >> +{ >> + struct sockaddr_storage rem_addr; >> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > > Another static buffer to worry about. Globally singly-threaded code? Yes. Only ever called by dlm_sendd. >> + >> +/* Send a message */ >> +static int send_to_sock(struct nodeinfo *ni) >> +{ >> + int ret = 0; >> + struct writequeue_entry *e; >> + int len, offset; >> + struct msghdr outmsg; >> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > > Singly-threaded? Yep. >> >> +static void dealloc_nodeinfo(void) >> +{ >> + int i; >> + >> + for (i=1; i<=max_nodeid; i++) { >> + struct nodeinfo *ni = nodeid2nodeinfo(i, 0); >> + if (ni) { >> + idr_remove(&nodeinfo_idr, i); > > Didn't that need locking? Not. it's only ever called at DLM shutdown after all the other threads have been stopped. >> >> +static int write_list_empty(void) >> +{ >> + int status; >> + >> + spin_lock_bh(&write_nodes_lock); >> + status = list_empty(&write_nodes); >> + spin_unlock_bh(&write_nodes_lock); >> + >> + return status; >> +} > > This function's return value is meaningless. As soon as the lock gets > dropped, the return value can get out of sync with reality. > > Looking at the caller, this _might_ happen to be OK, but it's a nasty and > dangerous thing. Really the locking should be moved into the caller. It's just an optimisation to allow the caller to schedule if there is no work to do. if something arrives immediately afterwards then it will get picked up when the process re-awakes (and it will be woken by that arrival). The 'accepting' atomic has gone completely. as Andrew pointed out it didn't really achieve much anyway. I suspect it was a plaster over some other startup or shutdown bug to be honest. Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com> Cc: Andrew Morton <akpm@osdl.org> Cc: Pavel Machek <pavel@ucw.cz>
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r--fs/dlm/lowcomms-tcp.c342
1 files changed, 134 insertions, 208 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index 7289e59b4bd3..8f2791fc8447 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -54,44 +54,59 @@
54#include "config.h" 54#include "config.h"
55 55
56struct cbuf { 56struct cbuf {
57 unsigned base; 57 unsigned int base;
58 unsigned len; 58 unsigned int len;
59 unsigned mask; 59 unsigned int mask;
60}; 60};
61 61
62#ifndef FALSE
63#define FALSE 0
64#define TRUE 1
65#endif
66#define NODE_INCREMENT 32 62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n)
64{
65 cb->len += n;
66}
67 67
68#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) 68static int cbuf_data(struct cbuf *cb)
69#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 69{
70#define CBUF_EMPTY(cb) ((cb)->len == 0) 70 return ((cb->base + cb->len) & cb->mask);
71#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 71}
72#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ 72
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) 73static void cbuf_init(struct cbuf *cb, int size)
74#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 74{
75 cb->base = cb->len = 0;
76 cb->mask = size-1;
77}
78
79static void cbuf_eat(struct cbuf *cb, int n)
80{
81 cb->len -= n;
82 cb->base += n;
83 cb->base &= cb->mask;
84}
85
86static bool cbuf_empty(struct cbuf *cb)
87{
88 return cb->len == 0;
89}
75 90
76/* Maximum number of incoming messages to process before 91/* Maximum number of incoming messages to process before
77 doing a schedule() 92 doing a cond_resched()
78*/ 93*/
79#define MAX_RX_MSG_COUNT 25 94#define MAX_RX_MSG_COUNT 25
80 95
81struct connection { 96struct connection {
82 struct socket *sock; /* NULL if not connected */ 97 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */ 98 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */ 99 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */ 100 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */ 101 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */ 102 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 103 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89#define CF_READ_PENDING 1 104#define CF_READ_PENDING 1
90#define CF_WRITE_PENDING 2 105#define CF_WRITE_PENDING 2
91#define CF_CONNECT_PENDING 3 106#define CF_CONNECT_PENDING 3
92#define CF_IS_OTHERCON 4 107#define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */ 108 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */ 109 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock; 110 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */ 111 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page; 112 struct page *rx_page;
@@ -121,28 +136,27 @@ static struct task_struct *recv_task;
121static struct task_struct *send_task; 136static struct task_struct *send_task;
122 137
123static wait_queue_t lowcomms_send_waitq_head; 138static wait_queue_t lowcomms_send_waitq_head;
124static wait_queue_head_t lowcomms_send_waitq; 139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
125static wait_queue_t lowcomms_recv_waitq_head; 140static wait_queue_t lowcomms_recv_waitq_head;
126static wait_queue_head_t lowcomms_recv_waitq; 141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
127 142
128/* An array of pointers to connections, indexed by NODEID */ 143/* An array of pointers to connections, indexed by NODEID */
129static struct connection **connections; 144static struct connection **connections;
130static struct semaphore connections_lock; 145static DECLARE_MUTEX(connections_lock);
131static kmem_cache_t *con_cache; 146static kmem_cache_t *con_cache;
132static int conn_array_size; 147static int conn_array_size;
133static atomic_t accepting;
134 148
135/* List of sockets that have reads pending */ 149/* List of sockets that have reads pending */
136static struct list_head read_sockets; 150static LIST_HEAD(read_sockets);
137static spinlock_t read_sockets_lock; 151static DEFINE_SPINLOCK(read_sockets_lock);
138 152
139/* List of sockets which have writes pending */ 153/* List of sockets which have writes pending */
140static struct list_head write_sockets; 154static LIST_HEAD(write_sockets);
141static spinlock_t write_sockets_lock; 155static DEFINE_SPINLOCK(write_sockets_lock);
142 156
143/* List of sockets which have connects pending */ 157/* List of sockets which have connects pending */
144static struct list_head state_sockets; 158static LIST_HEAD(state_sockets);
145static spinlock_t state_sockets_lock; 159static DEFINE_SPINLOCK(state_sockets_lock);
146 160
147static struct connection *nodeid2con(int nodeid, gfp_t allocation) 161static struct connection *nodeid2con(int nodeid, gfp_t allocation)
148{ 162{
@@ -153,12 +167,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
153 int new_size = nodeid + NODE_INCREMENT; 167 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns; 168 struct connection **new_conns;
155 169
156 new_conns = kmalloc(sizeof(struct connection *) * 170 new_conns = kzalloc(sizeof(struct connection *) *
157 new_size, allocation); 171 new_size, allocation);
158 if (!new_conns) 172 if (!new_conns)
159 goto finish; 173 goto finish;
160 174
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); 175 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size; 176 conn_array_size = new_size;
164 kfree(connections); 177 kfree(connections);
@@ -168,11 +181,10 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
168 181
169 con = connections[nodeid]; 182 con = connections[nodeid];
170 if (con == NULL && allocation) { 183 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation); 184 con = kmem_cache_zalloc(con_cache, allocation);
172 if (!con) 185 if (!con)
173 goto finish; 186 goto finish;
174 187
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid; 188 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem); 189 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue); 190 INIT_LIST_HEAD(&con->writequeue);
@@ -181,7 +193,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
181 connections[nodeid] = con; 193 connections[nodeid] = con;
182 } 194 }
183 195
184 finish: 196finish:
185 up(&connections_lock); 197 up(&connections_lock);
186 return con; 198 return con;
187} 199}
@@ -220,8 +232,6 @@ static inline void lowcomms_connect_sock(struct connection *con)
220{ 232{
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return; 234 return;
223 if (!atomic_read(&accepting))
224 return;
225 235
226 spin_lock_bh(&state_sockets_lock); 236 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets); 237 list_add_tail(&con->state_list, &state_sockets);
@@ -232,31 +242,8 @@ static inline void lowcomms_connect_sock(struct connection *con)
232 242
233static void lowcomms_state_change(struct sock *sk) 243static void lowcomms_state_change(struct sock *sk)
234{ 244{
235/* struct connection *con = sock2con(sk); */ 245 if (sk->sk_state == TCP_ESTABLISHED)
236
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk); 246 lowcomms_write_space(sk);
240 break;
241
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
252 */
253 /* lowcomms_connect_sock(con); */
254 break;
255
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
259 }
260} 247}
261 248
262/* Make a socket active */ 249/* Make a socket active */
@@ -277,13 +264,12 @@ static int add_sock(struct socket *sock, struct connection *con)
277static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 264static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len) 265 int *addr_len)
279{ 266{
280 saddr->ss_family = dlm_local_addr.ss_family; 267 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) { 268 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 269 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port); 270 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in); 271 *addr_len = sizeof(struct sockaddr_in);
285 } 272 } else {
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 273 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port); 274 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6); 275 *addr_len = sizeof(struct sockaddr_in6);
@@ -291,7 +277,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
291} 277}
292 278
293/* Close a remote connection and tidy up */ 279/* Close a remote connection and tidy up */
294static void close_connection(struct connection *con, int and_other) 280static void close_connection(struct connection *con, bool and_other)
295{ 281{
296 down_write(&con->sock_sem); 282 down_write(&con->sock_sem);
297 283
@@ -300,11 +286,8 @@ static void close_connection(struct connection *con, int and_other)
300 con->sock = NULL; 286 con->sock = NULL;
301 } 287 }
302 if (con->othercon && and_other) { 288 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code! 289 /* Will only re-enter once. */
304 Actually, this isn't a list so it 290 close_connection(con->othercon, false);
305 will only re-enter once.
306 */
307 close_connection(con->othercon, FALSE);
308 } 291 }
309 if (con->rx_page) { 292 if (con->rx_page) {
310 __free_page(con->rx_page); 293 __free_page(con->rx_page);
@@ -337,7 +320,7 @@ static int receive_from_sock(struct connection *con)
337 con->rx_page = alloc_page(GFP_ATOMIC); 320 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL) 321 if (con->rx_page == NULL)
339 goto out_resched; 322 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE); 323 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
341 } 324 }
342 325
343 msg.msg_control = NULL; 326 msg.msg_control = NULL;
@@ -352,16 +335,16 @@ static int receive_from_sock(struct connection *con)
352 * iov[0] is the bit of the circular buffer between the current end 335 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer. 336 * point (cb.base + cb.len) and the end of the buffer.
354 */ 337 */
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb); 338 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb); 339 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
357 iov[1].iov_len = 0; 340 iov[1].iov_len = 0;
358 341
359 /* 342 /*
360 * iov[1] is the bit of the circular buffer between the start of the 343 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base) 344 * buffer and the start of the currently used section (cb.base)
362 */ 345 */
363 if (CBUF_DATA(&con->cb) >= con->cb.base) { 346 if (cbuf_data(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb); 347 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
365 iov[1].iov_len = con->cb.base; 348 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page); 349 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2; 350 msg.msg_iovlen = 2;
@@ -378,7 +361,7 @@ static int receive_from_sock(struct connection *con)
378 goto out_close; 361 goto out_close;
379 if (ret == len) 362 if (ret == len)
380 call_again_soon = 1; 363 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret); 364 cbuf_add(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid, 365 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page), 366 page_address(con->rx_page),
384 con->cb.base, con->cb.len, 367 con->cb.base, con->cb.len,
@@ -391,35 +374,32 @@ static int receive_from_sock(struct connection *con)
391 } 374 }
392 if (ret < 0) 375 if (ret < 0)
393 goto out_close; 376 goto out_close;
394 CBUF_EAT(&con->cb, ret); 377 cbuf_eat(&con->cb, ret);
395 378
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) { 379 if (cbuf_empty(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page); 380 __free_page(con->rx_page);
398 con->rx_page = NULL; 381 con->rx_page = NULL;
399 } 382 }
400 383
401 out: 384out:
402 if (call_again_soon) 385 if (call_again_soon)
403 goto out_resched; 386 goto out_resched;
404 up_read(&con->sock_sem); 387 up_read(&con->sock_sem);
405 ret = 0; 388 return 0;
406 goto out_ret;
407 389
408 out_resched: 390out_resched:
409 lowcomms_data_ready(con->sock->sk, 0); 391 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem); 392 up_read(&con->sock_sem);
411 ret = 0; 393 cond_resched();
412 schedule(); 394 return 0;
413 goto out_ret;
414 395
415 out_close: 396out_close:
416 up_read(&con->sock_sem); 397 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { 398 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE); 399 close_connection(con, false);
419 /* Reconnect when there is something to send */ 400 /* Reconnect when there is something to send */
420 } 401 }
421 402
422 out_ret:
423 return ret; 403 return ret;
424} 404}
425 405
@@ -434,7 +414,8 @@ static int accept_from_sock(struct connection *con)
434 struct connection *newcon; 414 struct connection *newcon;
435 415
436 memset(&peeraddr, 0, sizeof(peeraddr)); 416 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); 417 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418 IPPROTO_TCP, &newsock);
438 if (result < 0) 419 if (result < 0)
439 return -ENOMEM; 420 return -ENOMEM;
440 421
@@ -462,7 +443,7 @@ static int accept_from_sock(struct connection *con)
462 /* Get the new node's NODEID */ 443 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len); 444 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 445 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n"); 446 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock); 447 sock_release(newsock);
467 up_read(&con->sock_sem); 448 up_read(&con->sock_sem);
468 return -1; 449 return -1;
@@ -483,17 +464,16 @@ static int accept_from_sock(struct connection *con)
483 } 464 }
484 down_write(&newcon->sock_sem); 465 down_write(&newcon->sock_sem);
485 if (newcon->sock) { 466 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon; 467 struct connection *othercon = newcon->othercon;
487 468
488 if (!othercon) { 469 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL); 470 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
490 if (!othercon) { 471 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n"); 472 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem); 473 up_write(&newcon->sock_sem);
493 result = -ENOMEM; 474 result = -ENOMEM;
494 goto accept_err; 475 goto accept_err;
495 } 476 }
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid; 477 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock; 478 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem); 479 init_rwsem(&othercon->sock_sem);
@@ -523,7 +503,7 @@ static int accept_from_sock(struct connection *con)
523 503
524 return 0; 504 return 0;
525 505
526 accept_err: 506accept_err:
527 up_read(&con->sock_sem); 507 up_read(&con->sock_sem);
528 sock_release(newsock); 508 sock_release(newsock);
529 509
@@ -533,7 +513,7 @@ static int accept_from_sock(struct connection *con)
533} 513}
534 514
535/* Connect a new socket to its peer */ 515/* Connect a new socket to its peer */
536static int connect_to_sock(struct connection *con) 516static void connect_to_sock(struct connection *con)
537{ 517{
538 int result = -EHOSTUNREACH; 518 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr; 519 struct sockaddr_storage saddr;
@@ -542,7 +522,7 @@ static int connect_to_sock(struct connection *con)
542 522
543 if (con->nodeid == 0) { 523 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled"); 524 log_print("attempt to connect sock 0 foiled");
545 return 0; 525 return;
546 } 526 }
547 527
548 down_write(&con->sock_sem); 528 down_write(&con->sock_sem);
@@ -556,13 +536,14 @@ static int connect_to_sock(struct connection *con)
556 } 536 }
557 537
558 /* Create a socket to communicate with */ 538 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); 539 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540 IPPROTO_TCP, &sock);
560 if (result < 0) 541 if (result < 0)
561 goto out_err; 542 goto out_err;
562 543
563 memset(&saddr, 0, sizeof(saddr)); 544 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 545 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err; 546 goto out_err;
566 547
567 sock->sk->sk_user_data = con; 548 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock; 549 con->rx_action = receive_from_sock;
@@ -574,22 +555,13 @@ static int connect_to_sock(struct connection *con)
574 log_print("connecting to %d", con->nodeid); 555 log_print("connecting to %d", con->nodeid);
575 result = 556 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 557 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK); 558 O_NONBLOCK);
578 if (result == -EINPROGRESS) 559 if (result == -EINPROGRESS)
579 result = 0; 560 result = 0;
580 if (result != 0) 561 if (result == 0)
581 goto out_err; 562 goto out;
582
583 out:
584 up_write(&con->sock_sem);
585 /*
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
589 */
590 return result;
591 563
592 out_err: 564out_err:
593 if (con->sock) { 565 if (con->sock) {
594 sock_release(con->sock); 566 sock_release(con->sock);
595 con->sock = NULL; 567 con->sock = NULL;
@@ -604,12 +576,15 @@ static int connect_to_sock(struct connection *con)
604 lowcomms_connect_sock(con); 576 lowcomms_connect_sock(con);
605 result = 0; 577 result = 0;
606 } 578 }
607 goto out; 579out:
580 up_write(&con->sock_sem);
581 return;
608} 582}
609 583
610static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) 584static struct socket *create_listen_sock(struct connection *con,
585 struct sockaddr_storage *saddr)
611{ 586{
612 struct socket *sock = NULL; 587 struct socket *sock = NULL;
613 mm_segment_t fs; 588 mm_segment_t fs;
614 int result = 0; 589 int result = 0;
615 int one = 1; 590 int one = 1;
@@ -629,10 +604,12 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
629 604
630 fs = get_fs(); 605 fs = get_fs();
631 set_fs(get_ds()); 606 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); 607 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608 (char *)&one, sizeof(one));
633 set_fs(fs); 609 set_fs(fs);
634 if (result < 0) { 610 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); 611 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612 result);
636 } 613 }
637 sock->sk->sk_user_data = con; 614 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock; 615 con->rx_action = accept_from_sock;
@@ -652,7 +629,8 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
652 fs = get_fs(); 629 fs = get_fs();
653 set_fs(get_ds()); 630 set_fs(get_ds());
654 631
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); 632 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633 (char *)&one, sizeof(one));
656 set_fs(fs); 634 set_fs(fs);
657 if (result < 0) { 635 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result); 636 printk("dlm: Set keepalive failed: %d\n", result);
@@ -666,7 +644,7 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
666 goto create_out; 644 goto create_out;
667 } 645 }
668 646
669 create_out: 647create_out:
670 return sock; 648 return sock;
671} 649}
672 650
@@ -679,10 +657,6 @@ static int listen_for_all(void)
679 int result = -EINVAL; 657 int result = -EINVAL;
680 658
681 /* We don't support multi-homed hosts */ 659 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags); 660 set_bit(CF_IS_OTHERCON, &con->flags);
687 661
688 sock = create_listen_sock(con, &dlm_local_addr); 662 sock = create_listen_sock(con, &dlm_local_addr);
@@ -731,16 +705,12 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
731 int offset = 0; 705 int offset = 0;
732 int users = 0; 706 int users = 0;
733 707
734 if (!atomic_read(&accepting))
735 return NULL;
736
737 con = nodeid2con(nodeid, allocation); 708 con = nodeid2con(nodeid, allocation);
738 if (!con) 709 if (!con)
739 return NULL; 710 return NULL;
740 711
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 712 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) || 713 if ((&e->list == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) { 714 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL; 715 e = NULL;
746 } else { 716 } else {
@@ -751,7 +721,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
751 spin_unlock(&con->writequeue_lock); 721 spin_unlock(&con->writequeue_lock);
752 722
753 if (e) { 723 if (e) {
754 got_one: 724 got_one:
755 if (users == 0) 725 if (users == 0)
756 kmap(e->page); 726 kmap(e->page);
757 *ppc = page_address(e->page) + offset; 727 *ppc = page_address(e->page) + offset;
@@ -777,10 +747,6 @@ void dlm_lowcomms_commit_buffer(void *mh)
777 struct connection *con = e->con; 747 struct connection *con = e->con;
778 int users; 748 int users;
779 749
780 if (!atomic_read(&accepting))
781 return;
782
783 spin_lock(&con->writequeue_lock);
784 users = --e->users; 750 users = --e->users;
785 if (users) 751 if (users)
786 goto out; 752 goto out;
@@ -797,7 +763,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
797 } 763 }
798 return; 764 return;
799 765
800 out: 766out:
801 spin_unlock(&con->writequeue_lock); 767 spin_unlock(&con->writequeue_lock);
802 return; 768 return;
803} 769}
@@ -809,7 +775,7 @@ static void free_entry(struct writequeue_entry *e)
809} 775}
810 776
811/* Send a message */ 777/* Send a message */
812static int send_to_sock(struct connection *con) 778static void send_to_sock(struct connection *con)
813{ 779{
814 int ret = 0; 780 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); 781 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
@@ -846,7 +812,7 @@ static int send_to_sock(struct connection *con)
846 } 812 }
847 else { 813 else {
848 /* Don't starve people filling buffers */ 814 /* Don't starve people filling buffers */
849 schedule(); 815 cond_resched();
850 } 816 }
851 817
852 spin_lock(&con->writequeue_lock); 818 spin_lock(&con->writequeue_lock);
@@ -855,25 +821,26 @@ static int send_to_sock(struct connection *con)
855 821
856 if (e->len == 0 && e->users == 0) { 822 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list); 823 list_del(&e->list);
824 kunmap(e->page);
858 free_entry(e); 825 free_entry(e);
859 continue; 826 continue;
860 } 827 }
861 } 828 }
862 spin_unlock(&con->writequeue_lock); 829 spin_unlock(&con->writequeue_lock);
863 out: 830out:
864 up_read(&con->sock_sem); 831 up_read(&con->sock_sem);
865 return ret; 832 return;
866 833
867 send_error: 834send_error:
868 up_read(&con->sock_sem); 835 up_read(&con->sock_sem);
869 close_connection(con, FALSE); 836 close_connection(con, false);
870 lowcomms_connect_sock(con); 837 lowcomms_connect_sock(con);
871 return ret; 838 return;
872 839
873 out_connect: 840out_connect:
874 up_read(&con->sock_sem); 841 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con); 842 lowcomms_connect_sock(con);
876 return 0; 843 return;
877} 844}
878 845
879static void clean_one_writequeue(struct connection *con) 846static void clean_one_writequeue(struct connection *con)
@@ -904,12 +871,12 @@ int dlm_lowcomms_close(int nodeid)
904 con = nodeid2con(nodeid, 0); 871 con = nodeid2con(nodeid, 0);
905 if (con) { 872 if (con) {
906 clean_one_writequeue(con); 873 clean_one_writequeue(con);
907 close_connection(con, TRUE); 874 close_connection(con, true);
908 atomic_set(&con->waiting_requests, 0); 875 atomic_set(&con->waiting_requests, 0);
909 } 876 }
910 return 0; 877 return 0;
911 878
912 out: 879out:
913 return -1; 880 return -1;
914} 881}
915 882
@@ -940,7 +907,7 @@ static void process_sockets(void)
940 list_for_each_safe(list, temp, &read_sockets) { 907 list_for_each_safe(list, temp, &read_sockets) {
941 908
942 struct connection *con = 909 struct connection *con =
943 list_entry(list, struct connection, read_list); 910 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list); 911 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags); 912 clear_bit(CF_READ_PENDING, &con->flags);
946 913
@@ -959,7 +926,7 @@ static void process_sockets(void)
959 926
960 /* Don't starve out everyone else */ 927 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) { 928 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule(); 929 cond_resched();
963 count = 0; 930 count = 0;
964 } 931 }
965 932
@@ -977,20 +944,16 @@ static void process_output_queue(void)
977{ 944{
978 struct list_head *list; 945 struct list_head *list;
979 struct list_head *temp; 946 struct list_head *temp;
980 int ret;
981 947
982 spin_lock_bh(&write_sockets_lock); 948 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) { 949 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con = 950 struct connection *con =
985 list_entry(list, struct connection, write_list); 951 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags); 952 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list); 953 list_del(&con->write_list);
988 954
989 spin_unlock_bh(&write_sockets_lock); 955 spin_unlock_bh(&write_sockets_lock);
990 956 send_to_sock(con);
991 ret = send_to_sock(con);
992 if (ret < 0) {
993 }
994 spin_lock_bh(&write_sockets_lock); 957 spin_lock_bh(&write_sockets_lock);
995 } 958 }
996 spin_unlock_bh(&write_sockets_lock); 959 spin_unlock_bh(&write_sockets_lock);
@@ -1000,19 +963,16 @@ static void process_state_queue(void)
1000{ 963{
1001 struct list_head *list; 964 struct list_head *list;
1002 struct list_head *temp; 965 struct list_head *temp;
1003 int ret;
1004 966
1005 spin_lock_bh(&state_sockets_lock); 967 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) { 968 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con = 969 struct connection *con =
1008 list_entry(list, struct connection, state_list); 970 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list); 971 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags); 972 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock); 973 spin_unlock_bh(&state_sockets_lock);
1012 974
1013 ret = connect_to_sock(con); 975 connect_to_sock(con);
1014 if (ret < 0) {
1015 }
1016 spin_lock_bh(&state_sockets_lock); 976 spin_lock_bh(&state_sockets_lock);
1017 } 977 }
1018 spin_unlock_bh(&state_sockets_lock); 978 spin_unlock_bh(&state_sockets_lock);
@@ -1046,14 +1006,13 @@ static int read_list_empty(void)
1046/* DLM Transport comms receive daemon */ 1006/* DLM Transport comms receive daemon */
1047static int dlm_recvd(void *data) 1007static int dlm_recvd(void *data)
1048{ 1008{
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current); 1009 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); 1010 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1052 1011
1053 while (!kthread_should_stop()) { 1012 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE); 1013 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty()) 1014 if (read_list_empty())
1056 schedule(); 1015 cond_resched();
1057 set_current_state(TASK_RUNNING); 1016 set_current_state(TASK_RUNNING);
1058 1017
1059 process_sockets(); 1018 process_sockets();
@@ -1081,14 +1040,13 @@ static int write_and_state_lists_empty(void)
1081/* DLM Transport send daemon */ 1040/* DLM Transport send daemon */
1082static int dlm_sendd(void *data) 1041static int dlm_sendd(void *data)
1083{ 1042{
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current); 1043 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); 1044 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1087 1045
1088 while (!kthread_should_stop()) { 1046 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE); 1047 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty()) 1048 if (write_and_state_lists_empty())
1091 schedule(); 1049 cond_resched();
1092 set_current_state(TASK_RUNNING); 1050 set_current_state(TASK_RUNNING);
1093 1051
1094 process_state_queue(); 1052 process_state_queue();
@@ -1111,7 +1069,7 @@ static int daemons_start(void)
1111 1069
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1070 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p); 1071 error = IS_ERR(p);
1114 if (error) { 1072 if (error) {
1115 log_print("can't start dlm_recvd %d", error); 1073 log_print("can't start dlm_recvd %d", error);
1116 return error; 1074 return error;
1117 } 1075 }
@@ -1119,7 +1077,7 @@ static int daemons_start(void)
1119 1077
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1078 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p); 1079 error = IS_ERR(p);
1122 if (error) { 1080 if (error) {
1123 log_print("can't start dlm_sendd %d", error); 1081 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task); 1082 kthread_stop(recv_task);
1125 return error; 1083 return error;
@@ -1141,21 +1099,20 @@ void dlm_lowcomms_stop(void)
1141{ 1099{
1142 int i; 1100 int i;
1143 1101
1144 atomic_set(&accepting, 0); 1102 /* Set all the flags to prevent any
1145
1146 /* Set all the activity flags to prevent any
1147 socket activity. 1103 socket activity.
1148 */ 1104 */
1149 for (i = 0; i < conn_array_size; i++) { 1105 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i]) 1106 if (connections[i])
1151 connections[i]->flags |= 0x7; 1107 connections[i]->flags |= 0xFF;
1152 } 1108 }
1109
1153 daemons_stop(); 1110 daemons_stop();
1154 clean_writequeues(); 1111 clean_writequeues();
1155 1112
1156 for (i = 0; i < conn_array_size; i++) { 1113 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) { 1114 if (connections[i]) {
1158 close_connection(connections[i], TRUE); 1115 close_connection(connections[i], true);
1159 if (connections[i]->othercon) 1116 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon); 1117 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]); 1118 kmem_cache_free(con_cache, connections[i]);
@@ -1173,24 +1130,12 @@ int dlm_lowcomms_start(void)
1173{ 1130{
1174 int error = 0; 1131 int error = 0;
1175 1132
1176 error = -ENOTCONN;
1177
1178 /*
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1182 */
1183 init_waitqueue_head(&lowcomms_send_waitq);
1184
1185 error = -ENOMEM; 1133 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) * 1134 connections = kzalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL); 1135 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections) 1136 if (!connections)
1189 goto out; 1137 goto out;
1190 1138
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1193
1194 conn_array_size = NODE_INCREMENT; 1139 conn_array_size = NODE_INCREMENT;
1195 1140
1196 if (dlm_our_addr(&dlm_local_addr, 0)) { 1141 if (dlm_our_addr(&dlm_local_addr, 0)) {
@@ -1203,7 +1148,8 @@ int dlm_lowcomms_start(void)
1203 } 1148 }
1204 1149
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1150 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL); 1151 __alignof__(struct connection), 0,
1152 NULL, NULL);
1207 if (!con_cache) 1153 if (!con_cache)
1208 goto fail_free_conn; 1154 goto fail_free_conn;
1209 1155
@@ -1217,40 +1163,20 @@ int dlm_lowcomms_start(void)
1217 if (error) 1163 if (error)
1218 goto fail_unlisten; 1164 goto fail_unlisten;
1219 1165
1220 atomic_set(&accepting, 1);
1221
1222 return 0; 1166 return 0;
1223 1167
1224 fail_unlisten: 1168fail_unlisten:
1225 close_connection(connections[0], 0); 1169 close_connection(connections[0], false);
1226 kmem_cache_free(con_cache, connections[0]); 1170 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache); 1171 kmem_cache_destroy(con_cache);
1228 1172
1229 fail_free_conn: 1173fail_free_conn:
1230 kfree(connections); 1174 kfree(connections);
1231 1175
1232 out: 1176out:
1233 return error; 1177 return error;
1234} 1178}
1235 1179
1236int dlm_lowcomms_init(void)
1237{
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1241
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1246
1247 return 0;
1248}
1249
1250void dlm_lowcomms_exit(void)
1251{
1252}
1253
1254/* 1180/*
1255 * Overrides for Emacs so that we follow Linus's tabbing style. 1181 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically 1182 * Emacs will notice this stuff at the end of the file and automatically