aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrick Caulfield <pcaulfie@redhat.com>2006-12-06 10:10:37 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2006-12-07 09:25:13 -0500
commitac33d0710595579e3cfca42dde2257eb0b123f6d (patch)
treeebb3050be68aa49666ee03f51ffe2667f5b18c74
parent34126f9f41901ca9d7d0031c2b11fc0d6c07b72d (diff)
[DLM] Clean up lowcomms
This fixes up most of the things pointed out by akpm and Pavel Machek with comments below indicating why some things have been left: Andrew Morton wrote: > >> +static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) >> +{ >> + struct nodeinfo *ni; >> + int r; >> + int n; >> + >> + down_read(&nodeinfo_lock); > > Given that this function can sleep, I wonder if `alloc' is useful. > > I see lots of callers passing in a literal "0" for `alloc'. That's in fact > a secret (GFP_ATOMIC & ~__GFP_HIGH). I doubt if that's what you really > meant. Particularly as the code could at least have used __GFP_WAIT (aka > GFP_NOIO) which is much, much more reliable than "0". In fact "0" is the > least reliable mode possible. > > IOW, this is all bollixed up. When 0 is passed into nodeid2nodeinfo the function does not try to allocate a new structure at all. it's an indication that the caller only wants the nodeinfo struct for that nodeid if there actually is one in existance. I've tidied the function itself so it's more obvious, (and tidier!) >> +/* Data received from remote end */ >> +static int receive_from_sock(void) >> +{ >> + int ret = 0; >> + struct msghdr msg; >> + struct kvec iov[2]; >> + unsigned len; >> + int r; >> + struct sctp_sndrcvinfo *sinfo; >> + struct cmsghdr *cmsg; >> + struct nodeinfo *ni; >> + >> + /* These two are marginally too big for stack allocation, but this >> + * function is (currently) only called by dlm_recvd so static should be >> + * OK. >> + */ >> + static struct sockaddr_storage msgname; >> + static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > > whoa. This is globally singly-threaded code?? Yes. it is only ever run in the context of dlm_recvd. >> >> +static void initiate_association(int nodeid) >> +{ >> + struct sockaddr_storage rem_addr; >> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > > Another static buffer to worry about. Globally singly-threaded code? Yes. Only ever called by dlm_sendd. >> + >> +/* Send a message */ >> +static int send_to_sock(struct nodeinfo *ni) >> +{ >> + int ret = 0; >> + struct writequeue_entry *e; >> + int len, offset; >> + struct msghdr outmsg; >> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > > Singly-threaded? Yep. >> >> +static void dealloc_nodeinfo(void) >> +{ >> + int i; >> + >> + for (i=1; i<=max_nodeid; i++) { >> + struct nodeinfo *ni = nodeid2nodeinfo(i, 0); >> + if (ni) { >> + idr_remove(&nodeinfo_idr, i); > > Didn't that need locking? Not. it's only ever called at DLM shutdown after all the other threads have been stopped. >> >> +static int write_list_empty(void) >> +{ >> + int status; >> + >> + spin_lock_bh(&write_nodes_lock); >> + status = list_empty(&write_nodes); >> + spin_unlock_bh(&write_nodes_lock); >> + >> + return status; >> +} > > This function's return value is meaningless. As soon as the lock gets > dropped, the return value can get out of sync with reality. > > Looking at the caller, this _might_ happen to be OK, but it's a nasty and > dangerous thing. Really the locking should be moved into the caller. It's just an optimisation to allow the caller to schedule if there is no work to do. if something arrives immediately afterwards then it will get picked up when the process re-awakes (and it will be woken by that arrival). The 'accepting' atomic has gone completely. as Andrew pointed out it didn't really achieve much anyway. I suspect it was a plaster over some other startup or shutdown bug to be honest. Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com> Cc: Andrew Morton <akpm@osdl.org> Cc: Pavel Machek <pavel@ucw.cz>
-rw-r--r--fs/dlm/lowcomms-sctp.c264
-rw-r--r--fs/dlm/lowcomms-tcp.c342
-rw-r--r--fs/dlm/lowcomms.h2
-rw-r--r--fs/dlm/main.c10
4 files changed, 261 insertions, 357 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
index 6da6b14d5a61..fe158d7a9285 100644
--- a/fs/dlm/lowcomms-sctp.c
+++ b/fs/dlm/lowcomms-sctp.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -75,13 +75,13 @@ struct nodeinfo {
75}; 75};
76 76
77static DEFINE_IDR(nodeinfo_idr); 77static DEFINE_IDR(nodeinfo_idr);
78static struct rw_semaphore nodeinfo_lock; 78static DECLARE_RWSEM(nodeinfo_lock);
79static int max_nodeid; 79static int max_nodeid;
80 80
81struct cbuf { 81struct cbuf {
82 unsigned base; 82 unsigned int base;
83 unsigned len; 83 unsigned int len;
84 unsigned mask; 84 unsigned int mask;
85}; 85};
86 86
87/* Just the one of these, now. But this struct keeps 87/* Just the one of these, now. But this struct keeps
@@ -90,9 +90,9 @@ struct cbuf {
90#define CF_READ_PENDING 1 90#define CF_READ_PENDING 1
91 91
92struct connection { 92struct connection {
93 struct socket *sock; 93 struct socket *sock;
94 unsigned long flags; 94 unsigned long flags;
95 struct page *rx_page; 95 struct page *rx_page;
96 atomic_t waiting_requests; 96 atomic_t waiting_requests;
97 struct cbuf cb; 97 struct cbuf cb;
98 int eagain_flag; 98 int eagain_flag;
@@ -102,36 +102,40 @@ struct connection {
102 102
103struct writequeue_entry { 103struct writequeue_entry {
104 struct list_head list; 104 struct list_head list;
105 struct page *page; 105 struct page *page;
106 int offset; 106 int offset;
107 int len; 107 int len;
108 int end; 108 int end;
109 int users; 109 int users;
110 struct nodeinfo *ni; 110 struct nodeinfo *ni;
111}; 111};
112 112
113#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 113static void cbuf_add(struct cbuf *cb, int n)
114#define CBUF_EMPTY(cb) ((cb)->len == 0) 114{
115#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 115 cb->len += n;
116#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 116}
117 117
118#define CBUF_INIT(cb, size) \ 118static int cbuf_data(struct cbuf *cb)
119do { \ 119{
120 (cb)->base = (cb)->len = 0; \ 120 return ((cb->base + cb->len) & cb->mask);
121 (cb)->mask = ((size)-1); \ 121}
122} while(0)
123 122
124#define CBUF_EAT(cb, n) \ 123static void cbuf_init(struct cbuf *cb, int size)
125do { \ 124{
126 (cb)->len -= (n); \ 125 cb->base = cb->len = 0;
127 (cb)->base += (n); \ 126 cb->mask = size-1;
128 (cb)->base &= (cb)->mask; \ 127}
129} while(0)
130 128
129static void cbuf_eat(struct cbuf *cb, int n)
130{
131 cb->len -= n;
132 cb->base += n;
133 cb->base &= cb->mask;
134}
131 135
132/* List of nodes which have writes pending */ 136/* List of nodes which have writes pending */
133static struct list_head write_nodes; 137static LIST_HEAD(write_nodes);
134static spinlock_t write_nodes_lock; 138static DEFINE_SPINLOCK(write_nodes_lock);
135 139
136/* Maximum number of incoming messages to process before 140/* Maximum number of incoming messages to process before
137 * doing a schedule() 141 * doing a schedule()
@@ -141,8 +145,7 @@ static spinlock_t write_nodes_lock;
141/* Manage daemons */ 145/* Manage daemons */
142static struct task_struct *recv_task; 146static struct task_struct *recv_task;
143static struct task_struct *send_task; 147static struct task_struct *send_task;
144static wait_queue_head_t lowcomms_recv_wait; 148static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
145static atomic_t accepting;
146 149
147/* The SCTP connection */ 150/* The SCTP connection */
148static struct connection sctp_con; 151static struct connection sctp_con;
@@ -161,11 +164,11 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
161 return error; 164 return error;
162 165
163 if (dlm_local_addr[0]->ss_family == AF_INET) { 166 if (dlm_local_addr[0]->ss_family == AF_INET) {
164 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 167 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
165 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 168 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
166 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 169 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
167 } else { 170 } else {
168 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 171 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
169 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 172 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
170 memcpy(&ret6->sin6_addr, &in6->sin6_addr, 173 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
171 sizeof(in6->sin6_addr)); 174 sizeof(in6->sin6_addr));
@@ -174,6 +177,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
174 return 0; 177 return 0;
175} 178}
176 179
180/* If alloc is 0 here we will not attempt to allocate a new
181 nodeinfo struct */
177static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) 182static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
178{ 183{
179 struct nodeinfo *ni; 184 struct nodeinfo *ni;
@@ -184,44 +189,45 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
184 ni = idr_find(&nodeinfo_idr, nodeid); 189 ni = idr_find(&nodeinfo_idr, nodeid);
185 up_read(&nodeinfo_lock); 190 up_read(&nodeinfo_lock);
186 191
187 if (!ni && alloc) { 192 if (ni || !alloc)
188 down_write(&nodeinfo_lock); 193 return ni;
189 194
190 ni = idr_find(&nodeinfo_idr, nodeid); 195 down_write(&nodeinfo_lock);
191 if (ni)
192 goto out_up;
193 196
194 r = idr_pre_get(&nodeinfo_idr, alloc); 197 ni = idr_find(&nodeinfo_idr, nodeid);
195 if (!r) 198 if (ni)
196 goto out_up; 199 goto out_up;
197 200
198 ni = kmalloc(sizeof(struct nodeinfo), alloc); 201 r = idr_pre_get(&nodeinfo_idr, alloc);
199 if (!ni) 202 if (!r)
200 goto out_up; 203 goto out_up;
201 204
202 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); 205 ni = kmalloc(sizeof(struct nodeinfo), alloc);
203 if (r) { 206 if (!ni)
204 kfree(ni); 207 goto out_up;
205 ni = NULL; 208
206 goto out_up; 209 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
207 } 210 if (r) {
208 if (n != nodeid) { 211 kfree(ni);
209 idr_remove(&nodeinfo_idr, n); 212 ni = NULL;
210 kfree(ni); 213 goto out_up;
211 ni = NULL;
212 goto out_up;
213 }
214 memset(ni, 0, sizeof(struct nodeinfo));
215 spin_lock_init(&ni->lock);
216 INIT_LIST_HEAD(&ni->writequeue);
217 spin_lock_init(&ni->writequeue_lock);
218 ni->nodeid = nodeid;
219
220 if (nodeid > max_nodeid)
221 max_nodeid = nodeid;
222 out_up:
223 up_write(&nodeinfo_lock);
224 } 214 }
215 if (n != nodeid) {
216 idr_remove(&nodeinfo_idr, n);
217 kfree(ni);
218 ni = NULL;
219 goto out_up;
220 }
221 memset(ni, 0, sizeof(struct nodeinfo));
222 spin_lock_init(&ni->lock);
223 INIT_LIST_HEAD(&ni->writequeue);
224 spin_lock_init(&ni->writequeue_lock);
225 ni->nodeid = nodeid;
226
227 if (nodeid > max_nodeid)
228 max_nodeid = nodeid;
229out_up:
230 up_write(&nodeinfo_lock);
225 231
226 return ni; 232 return ni;
227} 233}
@@ -279,13 +285,13 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
279 in4_addr->sin_port = cpu_to_be16(port); 285 in4_addr->sin_port = cpu_to_be16(port);
280 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 286 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
281 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - 287 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
282 sizeof(struct sockaddr_in)); 288 sizeof(struct sockaddr_in));
283 *addr_len = sizeof(struct sockaddr_in); 289 *addr_len = sizeof(struct sockaddr_in);
284 } else { 290 } else {
285 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 291 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
286 in6_addr->sin6_port = cpu_to_be16(port); 292 in6_addr->sin6_port = cpu_to_be16(port);
287 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - 293 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
288 sizeof(struct sockaddr_in6)); 294 sizeof(struct sockaddr_in6));
289 *addr_len = sizeof(struct sockaddr_in6); 295 *addr_len = sizeof(struct sockaddr_in6);
290 } 296 }
291} 297}
@@ -324,7 +330,7 @@ static void send_shutdown(sctp_assoc_t associd)
324 cmsg->cmsg_type = SCTP_SNDRCV; 330 cmsg->cmsg_type = SCTP_SNDRCV;
325 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 331 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
326 outmessage.msg_controllen = cmsg->cmsg_len; 332 outmessage.msg_controllen = cmsg->cmsg_len;
327 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 333 sinfo = CMSG_DATA(cmsg);
328 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 334 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
329 335
330 sinfo->sinfo_flags |= MSG_EOF; 336 sinfo->sinfo_flags |= MSG_EOF;
@@ -387,7 +393,7 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
387 393
388 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 394 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
389 log_print("COMM_UP for invalid assoc ID %d", 395 log_print("COMM_UP for invalid assoc ID %d",
390 (int)sn->sn_assoc_change.sac_assoc_id); 396 (int)sn->sn_assoc_change.sac_assoc_id);
391 init_failed(); 397 init_failed();
392 return; 398 return;
393 } 399 }
@@ -398,15 +404,18 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
398 fs = get_fs(); 404 fs = get_fs();
399 set_fs(get_ds()); 405 set_fs(get_ds());
400 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, 406 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
401 IPPROTO_SCTP, SCTP_PRIMARY_ADDR, 407 IPPROTO_SCTP,
402 (char*)&prim, &prim_len); 408 SCTP_PRIMARY_ADDR,
409 (char*)&prim,
410 &prim_len);
403 set_fs(fs); 411 set_fs(fs);
404 if (ret < 0) { 412 if (ret < 0) {
405 struct nodeinfo *ni; 413 struct nodeinfo *ni;
406 414
407 log_print("getsockopt/sctp_primary_addr on " 415 log_print("getsockopt/sctp_primary_addr on "
408 "new assoc %d failed : %d", 416 "new assoc %d failed : %d",
409 (int)sn->sn_assoc_change.sac_assoc_id, ret); 417 (int)sn->sn_assoc_change.sac_assoc_id,
418 ret);
410 419
411 /* Retry INIT later */ 420 /* Retry INIT later */
412 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); 421 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
@@ -426,12 +435,10 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
426 return; 435 return;
427 436
428 /* Save the assoc ID */ 437 /* Save the assoc ID */
429 spin_lock(&ni->lock);
430 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; 438 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
431 spin_unlock(&ni->lock);
432 439
433 log_print("got new/restarted association %d nodeid %d", 440 log_print("got new/restarted association %d nodeid %d",
434 (int)sn->sn_assoc_change.sac_assoc_id, nodeid); 441 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
435 442
436 /* Send any pending writes */ 443 /* Send any pending writes */
437 clear_bit(NI_INIT_PENDING, &ni->flags); 444 clear_bit(NI_INIT_PENDING, &ni->flags);
@@ -507,13 +514,12 @@ static int receive_from_sock(void)
507 sctp_con.rx_page = alloc_page(GFP_ATOMIC); 514 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
508 if (sctp_con.rx_page == NULL) 515 if (sctp_con.rx_page == NULL)
509 goto out_resched; 516 goto out_resched;
510 CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE); 517 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
511 } 518 }
512 519
513 memset(&incmsg, 0, sizeof(incmsg)); 520 memset(&incmsg, 0, sizeof(incmsg));
514 memset(&msgname, 0, sizeof(msgname)); 521 memset(&msgname, 0, sizeof(msgname));
515 522
516 memset(incmsg, 0, sizeof(incmsg));
517 msg.msg_name = &msgname; 523 msg.msg_name = &msgname;
518 msg.msg_namelen = sizeof(msgname); 524 msg.msg_namelen = sizeof(msgname);
519 msg.msg_flags = 0; 525 msg.msg_flags = 0;
@@ -532,17 +538,17 @@ static int receive_from_sock(void)
532 * iov[0] is the bit of the circular buffer between the current end 538 * iov[0] is the bit of the circular buffer between the current end
533 * point (cb.base + cb.len) and the end of the buffer. 539 * point (cb.base + cb.len) and the end of the buffer.
534 */ 540 */
535 iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb); 541 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
536 iov[0].iov_base = page_address(sctp_con.rx_page) + 542 iov[0].iov_base = page_address(sctp_con.rx_page) +
537 CBUF_DATA(&sctp_con.cb); 543 cbuf_data(&sctp_con.cb);
538 iov[1].iov_len = 0; 544 iov[1].iov_len = 0;
539 545
540 /* 546 /*
541 * iov[1] is the bit of the circular buffer between the start of the 547 * iov[1] is the bit of the circular buffer between the start of the
542 * buffer and the start of the currently used section (cb.base) 548 * buffer and the start of the currently used section (cb.base)
543 */ 549 */
544 if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) { 550 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
545 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb); 551 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
546 iov[1].iov_len = sctp_con.cb.base; 552 iov[1].iov_len = sctp_con.cb.base;
547 iov[1].iov_base = page_address(sctp_con.rx_page); 553 iov[1].iov_base = page_address(sctp_con.rx_page);
548 msg.msg_iovlen = 2; 554 msg.msg_iovlen = 2;
@@ -557,7 +563,7 @@ static int receive_from_sock(void)
557 msg.msg_control = incmsg; 563 msg.msg_control = incmsg;
558 msg.msg_controllen = sizeof(incmsg); 564 msg.msg_controllen = sizeof(incmsg);
559 cmsg = CMSG_FIRSTHDR(&msg); 565 cmsg = CMSG_FIRSTHDR(&msg);
560 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 566 sinfo = CMSG_DATA(cmsg);
561 567
562 if (msg.msg_flags & MSG_NOTIFICATION) { 568 if (msg.msg_flags & MSG_NOTIFICATION) {
563 process_sctp_notification(&msg, page_address(sctp_con.rx_page)); 569 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
@@ -583,29 +589,29 @@ static int receive_from_sock(void)
583 if (r == 1) 589 if (r == 1)
584 return 0; 590 return 0;
585 591
586 CBUF_ADD(&sctp_con.cb, ret); 592 cbuf_add(&sctp_con.cb, ret);
587 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), 593 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
588 page_address(sctp_con.rx_page), 594 page_address(sctp_con.rx_page),
589 sctp_con.cb.base, sctp_con.cb.len, 595 sctp_con.cb.base, sctp_con.cb.len,
590 PAGE_CACHE_SIZE); 596 PAGE_CACHE_SIZE);
591 if (ret < 0) 597 if (ret < 0)
592 goto out_close; 598 goto out_close;
593 CBUF_EAT(&sctp_con.cb, ret); 599 cbuf_eat(&sctp_con.cb, ret);
594 600
595 out: 601out:
596 ret = 0; 602 ret = 0;
597 goto out_ret; 603 goto out_ret;
598 604
599 out_resched: 605out_resched:
600 lowcomms_data_ready(sctp_con.sock->sk, 0); 606 lowcomms_data_ready(sctp_con.sock->sk, 0);
601 ret = 0; 607 ret = 0;
602 schedule(); 608 cond_resched();
603 goto out_ret; 609 goto out_ret;
604 610
605 out_close: 611out_close:
606 if (ret != -EAGAIN) 612 if (ret != -EAGAIN)
607 log_print("error reading from sctp socket: %d", ret); 613 log_print("error reading from sctp socket: %d", ret);
608 out_ret: 614out_ret:
609 return ret; 615 return ret;
610} 616}
611 617
@@ -619,10 +625,12 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
619 set_fs(get_ds()); 625 set_fs(get_ds());
620 if (num == 1) 626 if (num == 1)
621 result = sctp_con.sock->ops->bind(sctp_con.sock, 627 result = sctp_con.sock->ops->bind(sctp_con.sock,
622 (struct sockaddr *) addr, addr_len); 628 (struct sockaddr *) addr,
629 addr_len);
623 else 630 else
624 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, 631 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
625 SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len); 632 SCTP_SOCKOPT_BINDX_ADD,
633 (char *)addr, addr_len);
626 set_fs(fs); 634 set_fs(fs);
627 635
628 if (result < 0) 636 if (result < 0)
@@ -719,10 +727,10 @@ static int init_sock(void)
719 727
720 return 0; 728 return 0;
721 729
722 create_delsock: 730create_delsock:
723 sock_release(sock); 731 sock_release(sock);
724 sctp_con.sock = NULL; 732 sctp_con.sock = NULL;
725 out: 733out:
726 return result; 734 return result;
727} 735}
728 736
@@ -756,16 +764,13 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
756 int users = 0; 764 int users = 0;
757 struct nodeinfo *ni; 765 struct nodeinfo *ni;
758 766
759 if (!atomic_read(&accepting))
760 return NULL;
761
762 ni = nodeid2nodeinfo(nodeid, allocation); 767 ni = nodeid2nodeinfo(nodeid, allocation);
763 if (!ni) 768 if (!ni)
764 return NULL; 769 return NULL;
765 770
766 spin_lock(&ni->writequeue_lock); 771 spin_lock(&ni->writequeue_lock);
767 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); 772 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
768 if (((struct list_head *) e == &ni->writequeue) || 773 if ((&e->list == &ni->writequeue) ||
769 (PAGE_CACHE_SIZE - e->end < len)) { 774 (PAGE_CACHE_SIZE - e->end < len)) {
770 e = NULL; 775 e = NULL;
771 } else { 776 } else {
@@ -776,7 +781,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
776 spin_unlock(&ni->writequeue_lock); 781 spin_unlock(&ni->writequeue_lock);
777 782
778 if (e) { 783 if (e) {
779 got_one: 784 got_one:
780 if (users == 0) 785 if (users == 0)
781 kmap(e->page); 786 kmap(e->page);
782 *ppc = page_address(e->page) + offset; 787 *ppc = page_address(e->page) + offset;
@@ -803,9 +808,6 @@ void dlm_lowcomms_commit_buffer(void *arg)
803 int users; 808 int users;
804 struct nodeinfo *ni = e->ni; 809 struct nodeinfo *ni = e->ni;
805 810
806 if (!atomic_read(&accepting))
807 return;
808
809 spin_lock(&ni->writequeue_lock); 811 spin_lock(&ni->writequeue_lock);
810 users = --e->users; 812 users = --e->users;
811 if (users) 813 if (users)
@@ -822,7 +824,7 @@ void dlm_lowcomms_commit_buffer(void *arg)
822 } 824 }
823 return; 825 return;
824 826
825 out: 827out:
826 spin_unlock(&ni->writequeue_lock); 828 spin_unlock(&ni->writequeue_lock);
827 return; 829 return;
828} 830}
@@ -878,7 +880,7 @@ static void initiate_association(int nodeid)
878 cmsg->cmsg_level = IPPROTO_SCTP; 880 cmsg->cmsg_level = IPPROTO_SCTP;
879 cmsg->cmsg_type = SCTP_SNDRCV; 881 cmsg->cmsg_type = SCTP_SNDRCV;
880 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 882 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
881 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 883 sinfo = CMSG_DATA(cmsg);
882 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 884 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
883 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); 885 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
884 886
@@ -892,7 +894,7 @@ static void initiate_association(int nodeid)
892} 894}
893 895
894/* Send a message */ 896/* Send a message */
895static int send_to_sock(struct nodeinfo *ni) 897static void send_to_sock(struct nodeinfo *ni)
896{ 898{
897 int ret = 0; 899 int ret = 0;
898 struct writequeue_entry *e; 900 struct writequeue_entry *e;
@@ -903,13 +905,13 @@ static int send_to_sock(struct nodeinfo *ni)
903 struct sctp_sndrcvinfo *sinfo; 905 struct sctp_sndrcvinfo *sinfo;
904 struct kvec iov; 906 struct kvec iov;
905 907
906 /* See if we need to init an association before we start 908 /* See if we need to init an association before we start
907 sending precious messages */ 909 sending precious messages */
908 spin_lock(&ni->lock); 910 spin_lock(&ni->lock);
909 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { 911 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
910 spin_unlock(&ni->lock); 912 spin_unlock(&ni->lock);
911 initiate_association(ni->nodeid); 913 initiate_association(ni->nodeid);
912 return 0; 914 return;
913 } 915 }
914 spin_unlock(&ni->lock); 916 spin_unlock(&ni->lock);
915 917
@@ -923,7 +925,7 @@ static int send_to_sock(struct nodeinfo *ni)
923 cmsg->cmsg_level = IPPROTO_SCTP; 925 cmsg->cmsg_level = IPPROTO_SCTP;
924 cmsg->cmsg_type = SCTP_SNDRCV; 926 cmsg->cmsg_type = SCTP_SNDRCV;
925 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 927 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
926 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 928 sinfo = CMSG_DATA(cmsg);
927 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 929 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
928 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); 930 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
929 sinfo->sinfo_assoc_id = ni->assoc_id; 931 sinfo->sinfo_assoc_id = ni->assoc_id;
@@ -955,7 +957,7 @@ static int send_to_sock(struct nodeinfo *ni)
955 goto send_error; 957 goto send_error;
956 } else { 958 } else {
957 /* Don't starve people filling buffers */ 959 /* Don't starve people filling buffers */
958 schedule(); 960 cond_resched();
959 } 961 }
960 962
961 spin_lock(&ni->writequeue_lock); 963 spin_lock(&ni->writequeue_lock);
@@ -964,15 +966,16 @@ static int send_to_sock(struct nodeinfo *ni)
964 966
965 if (e->len == 0 && e->users == 0) { 967 if (e->len == 0 && e->users == 0) {
966 list_del(&e->list); 968 list_del(&e->list);
969 kunmap(e->page);
967 free_entry(e); 970 free_entry(e);
968 continue; 971 continue;
969 } 972 }
970 } 973 }
971 spin_unlock(&ni->writequeue_lock); 974 spin_unlock(&ni->writequeue_lock);
972 out: 975out:
973 return ret; 976 return;
974 977
975 send_error: 978send_error:
976 log_print("Error sending to node %d %d", ni->nodeid, ret); 979 log_print("Error sending to node %d %d", ni->nodeid, ret);
977 spin_lock(&ni->lock); 980 spin_lock(&ni->lock);
978 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { 981 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
@@ -982,7 +985,7 @@ static int send_to_sock(struct nodeinfo *ni)
982 } else 985 } else
983 spin_unlock(&ni->lock); 986 spin_unlock(&ni->lock);
984 987
985 return ret; 988 return;
986} 989}
987 990
988/* Try to send any messages that are pending */ 991/* Try to send any messages that are pending */
@@ -994,7 +997,7 @@ static void process_output_queue(void)
994 spin_lock_bh(&write_nodes_lock); 997 spin_lock_bh(&write_nodes_lock);
995 list_for_each_safe(list, temp, &write_nodes) { 998 list_for_each_safe(list, temp, &write_nodes) {
996 struct nodeinfo *ni = 999 struct nodeinfo *ni =
997 list_entry(list, struct nodeinfo, write_list); 1000 list_entry(list, struct nodeinfo, write_list);
998 clear_bit(NI_WRITE_PENDING, &ni->flags); 1001 clear_bit(NI_WRITE_PENDING, &ni->flags);
999 list_del(&ni->write_list); 1002 list_del(&ni->write_list);
1000 1003
@@ -1106,7 +1109,7 @@ static int dlm_recvd(void *data)
1106 set_current_state(TASK_INTERRUPTIBLE); 1109 set_current_state(TASK_INTERRUPTIBLE);
1107 add_wait_queue(&lowcomms_recv_wait, &wait); 1110 add_wait_queue(&lowcomms_recv_wait, &wait);
1108 if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) 1111 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1109 schedule(); 1112 cond_resched();
1110 remove_wait_queue(&lowcomms_recv_wait, &wait); 1113 remove_wait_queue(&lowcomms_recv_wait, &wait);
1111 set_current_state(TASK_RUNNING); 1114 set_current_state(TASK_RUNNING);
1112 1115
@@ -1118,12 +1121,12 @@ static int dlm_recvd(void *data)
1118 1121
1119 /* Don't starve out everyone else */ 1122 /* Don't starve out everyone else */
1120 if (++count >= MAX_RX_MSG_COUNT) { 1123 if (++count >= MAX_RX_MSG_COUNT) {
1121 schedule(); 1124 cond_resched();
1122 count = 0; 1125 count = 0;
1123 } 1126 }
1124 } while (!kthread_should_stop() && ret >=0); 1127 } while (!kthread_should_stop() && ret >=0);
1125 } 1128 }
1126 schedule(); 1129 cond_resched();
1127 } 1130 }
1128 1131
1129 return 0; 1132 return 0;
@@ -1138,7 +1141,7 @@ static int dlm_sendd(void *data)
1138 while (!kthread_should_stop()) { 1141 while (!kthread_should_stop()) {
1139 set_current_state(TASK_INTERRUPTIBLE); 1142 set_current_state(TASK_INTERRUPTIBLE);
1140 if (write_list_empty()) 1143 if (write_list_empty())
1141 schedule(); 1144 cond_resched();
1142 set_current_state(TASK_RUNNING); 1145 set_current_state(TASK_RUNNING);
1143 1146
1144 if (sctp_con.eagain_flag) { 1147 if (sctp_con.eagain_flag) {
@@ -1166,7 +1169,7 @@ static int daemons_start(void)
1166 1169
1167 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1170 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1168 error = IS_ERR(p); 1171 error = IS_ERR(p);
1169 if (error) { 1172 if (error) {
1170 log_print("can't start dlm_recvd %d", error); 1173 log_print("can't start dlm_recvd %d", error);
1171 return error; 1174 return error;
1172 } 1175 }
@@ -1174,7 +1177,7 @@ static int daemons_start(void)
1174 1177
1175 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1178 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1176 error = IS_ERR(p); 1179 error = IS_ERR(p);
1177 if (error) { 1180 if (error) {
1178 log_print("can't start dlm_sendd %d", error); 1181 log_print("can't start dlm_sendd %d", error);
1179 kthread_stop(recv_task); 1182 kthread_stop(recv_task);
1180 return error; 1183 return error;
@@ -1197,43 +1200,28 @@ int dlm_lowcomms_start(void)
1197 error = daemons_start(); 1200 error = daemons_start();
1198 if (error) 1201 if (error)
1199 goto fail_sock; 1202 goto fail_sock;
1200 atomic_set(&accepting, 1);
1201 return 0; 1203 return 0;
1202 1204
1203 fail_sock: 1205fail_sock:
1204 close_connection(); 1206 close_connection();
1205 return error; 1207 return error;
1206} 1208}
1207 1209
1208/* Set all the activity flags to prevent any socket activity. */
1209
1210void dlm_lowcomms_stop(void) 1210void dlm_lowcomms_stop(void)
1211{ 1211{
1212 atomic_set(&accepting, 0); 1212 int i;
1213
1213 sctp_con.flags = 0x7; 1214 sctp_con.flags = 0x7;
1214 daemons_stop(); 1215 daemons_stop();
1215 clean_writequeues(); 1216 clean_writequeues();
1216 close_connection(); 1217 close_connection();
1217 dealloc_nodeinfo(); 1218 dealloc_nodeinfo();
1218 max_nodeid = 0; 1219 max_nodeid = 0;
1219}
1220 1220
1221int dlm_lowcomms_init(void) 1221 dlm_local_count = 0;
1222{ 1222 dlm_local_nodeid = 0;
1223 init_waitqueue_head(&lowcomms_recv_wait);
1224 spin_lock_init(&write_nodes_lock);
1225 INIT_LIST_HEAD(&write_nodes);
1226 init_rwsem(&nodeinfo_lock);
1227 return 0;
1228}
1229
1230void dlm_lowcomms_exit(void)
1231{
1232 int i;
1233 1223
1234 for (i = 0; i < dlm_local_count; i++) 1224 for (i = 0; i < dlm_local_count; i++)
1235 kfree(dlm_local_addr[i]); 1225 kfree(dlm_local_addr[i]);
1236 dlm_local_count = 0;
1237 dlm_local_nodeid = 0;
1238} 1226}
1239 1227
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index 7289e59b4bd3..8f2791fc8447 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -54,44 +54,59 @@
54#include "config.h" 54#include "config.h"
55 55
56struct cbuf { 56struct cbuf {
57 unsigned base; 57 unsigned int base;
58 unsigned len; 58 unsigned int len;
59 unsigned mask; 59 unsigned int mask;
60}; 60};
61 61
62#ifndef FALSE
63#define FALSE 0
64#define TRUE 1
65#endif
66#define NODE_INCREMENT 32 62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n)
64{
65 cb->len += n;
66}
67 67
68#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) 68static int cbuf_data(struct cbuf *cb)
69#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 69{
70#define CBUF_EMPTY(cb) ((cb)->len == 0) 70 return ((cb->base + cb->len) & cb->mask);
71#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 71}
72#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ 72
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) 73static void cbuf_init(struct cbuf *cb, int size)
74#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 74{
75 cb->base = cb->len = 0;
76 cb->mask = size-1;
77}
78
79static void cbuf_eat(struct cbuf *cb, int n)
80{
81 cb->len -= n;
82 cb->base += n;
83 cb->base &= cb->mask;
84}
85
86static bool cbuf_empty(struct cbuf *cb)
87{
88 return cb->len == 0;
89}
75 90
76/* Maximum number of incoming messages to process before 91/* Maximum number of incoming messages to process before
77 doing a schedule() 92 doing a cond_resched()
78*/ 93*/
79#define MAX_RX_MSG_COUNT 25 94#define MAX_RX_MSG_COUNT 25
80 95
81struct connection { 96struct connection {
82 struct socket *sock; /* NULL if not connected */ 97 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */ 98 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */ 99 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */ 100 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */ 101 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */ 102 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 103 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89#define CF_READ_PENDING 1 104#define CF_READ_PENDING 1
90#define CF_WRITE_PENDING 2 105#define CF_WRITE_PENDING 2
91#define CF_CONNECT_PENDING 3 106#define CF_CONNECT_PENDING 3
92#define CF_IS_OTHERCON 4 107#define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */ 108 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */ 109 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock; 110 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */ 111 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page; 112 struct page *rx_page;
@@ -121,28 +136,27 @@ static struct task_struct *recv_task;
121static struct task_struct *send_task; 136static struct task_struct *send_task;
122 137
123static wait_queue_t lowcomms_send_waitq_head; 138static wait_queue_t lowcomms_send_waitq_head;
124static wait_queue_head_t lowcomms_send_waitq; 139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
125static wait_queue_t lowcomms_recv_waitq_head; 140static wait_queue_t lowcomms_recv_waitq_head;
126static wait_queue_head_t lowcomms_recv_waitq; 141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
127 142
128/* An array of pointers to connections, indexed by NODEID */ 143/* An array of pointers to connections, indexed by NODEID */
129static struct connection **connections; 144static struct connection **connections;
130static struct semaphore connections_lock; 145static DECLARE_MUTEX(connections_lock);
131static kmem_cache_t *con_cache; 146static kmem_cache_t *con_cache;
132static int conn_array_size; 147static int conn_array_size;
133static atomic_t accepting;
134 148
135/* List of sockets that have reads pending */ 149/* List of sockets that have reads pending */
136static struct list_head read_sockets; 150static LIST_HEAD(read_sockets);
137static spinlock_t read_sockets_lock; 151static DEFINE_SPINLOCK(read_sockets_lock);
138 152
139/* List of sockets which have writes pending */ 153/* List of sockets which have writes pending */
140static struct list_head write_sockets; 154static LIST_HEAD(write_sockets);
141static spinlock_t write_sockets_lock; 155static DEFINE_SPINLOCK(write_sockets_lock);
142 156
143/* List of sockets which have connects pending */ 157/* List of sockets which have connects pending */
144static struct list_head state_sockets; 158static LIST_HEAD(state_sockets);
145static spinlock_t state_sockets_lock; 159static DEFINE_SPINLOCK(state_sockets_lock);
146 160
147static struct connection *nodeid2con(int nodeid, gfp_t allocation) 161static struct connection *nodeid2con(int nodeid, gfp_t allocation)
148{ 162{
@@ -153,12 +167,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
153 int new_size = nodeid + NODE_INCREMENT; 167 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns; 168 struct connection **new_conns;
155 169
156 new_conns = kmalloc(sizeof(struct connection *) * 170 new_conns = kzalloc(sizeof(struct connection *) *
157 new_size, allocation); 171 new_size, allocation);
158 if (!new_conns) 172 if (!new_conns)
159 goto finish; 173 goto finish;
160 174
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); 175 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size; 176 conn_array_size = new_size;
164 kfree(connections); 177 kfree(connections);
@@ -168,11 +181,10 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
168 181
169 con = connections[nodeid]; 182 con = connections[nodeid];
170 if (con == NULL && allocation) { 183 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation); 184 con = kmem_cache_zalloc(con_cache, allocation);
172 if (!con) 185 if (!con)
173 goto finish; 186 goto finish;
174 187
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid; 188 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem); 189 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue); 190 INIT_LIST_HEAD(&con->writequeue);
@@ -181,7 +193,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
181 connections[nodeid] = con; 193 connections[nodeid] = con;
182 } 194 }
183 195
184 finish: 196finish:
185 up(&connections_lock); 197 up(&connections_lock);
186 return con; 198 return con;
187} 199}
@@ -220,8 +232,6 @@ static inline void lowcomms_connect_sock(struct connection *con)
220{ 232{
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return; 234 return;
223 if (!atomic_read(&accepting))
224 return;
225 235
226 spin_lock_bh(&state_sockets_lock); 236 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets); 237 list_add_tail(&con->state_list, &state_sockets);
@@ -232,31 +242,8 @@ static inline void lowcomms_connect_sock(struct connection *con)
232 242
233static void lowcomms_state_change(struct sock *sk) 243static void lowcomms_state_change(struct sock *sk)
234{ 244{
235/* struct connection *con = sock2con(sk); */ 245 if (sk->sk_state == TCP_ESTABLISHED)
236
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk); 246 lowcomms_write_space(sk);
240 break;
241
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
252 */
253 /* lowcomms_connect_sock(con); */
254 break;
255
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
259 }
260} 247}
261 248
262/* Make a socket active */ 249/* Make a socket active */
@@ -277,13 +264,12 @@ static int add_sock(struct socket *sock, struct connection *con)
277static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 264static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len) 265 int *addr_len)
279{ 266{
280 saddr->ss_family = dlm_local_addr.ss_family; 267 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) { 268 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 269 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port); 270 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in); 271 *addr_len = sizeof(struct sockaddr_in);
285 } 272 } else {
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 273 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port); 274 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6); 275 *addr_len = sizeof(struct sockaddr_in6);
@@ -291,7 +277,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
291} 277}
292 278
293/* Close a remote connection and tidy up */ 279/* Close a remote connection and tidy up */
294static void close_connection(struct connection *con, int and_other) 280static void close_connection(struct connection *con, bool and_other)
295{ 281{
296 down_write(&con->sock_sem); 282 down_write(&con->sock_sem);
297 283
@@ -300,11 +286,8 @@ static void close_connection(struct connection *con, int and_other)
300 con->sock = NULL; 286 con->sock = NULL;
301 } 287 }
302 if (con->othercon && and_other) { 288 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code! 289 /* Will only re-enter once. */
304 Actually, this isn't a list so it 290 close_connection(con->othercon, false);
305 will only re-enter once.
306 */
307 close_connection(con->othercon, FALSE);
308 } 291 }
309 if (con->rx_page) { 292 if (con->rx_page) {
310 __free_page(con->rx_page); 293 __free_page(con->rx_page);
@@ -337,7 +320,7 @@ static int receive_from_sock(struct connection *con)
337 con->rx_page = alloc_page(GFP_ATOMIC); 320 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL) 321 if (con->rx_page == NULL)
339 goto out_resched; 322 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE); 323 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
341 } 324 }
342 325
343 msg.msg_control = NULL; 326 msg.msg_control = NULL;
@@ -352,16 +335,16 @@ static int receive_from_sock(struct connection *con)
352 * iov[0] is the bit of the circular buffer between the current end 335 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer. 336 * point (cb.base + cb.len) and the end of the buffer.
354 */ 337 */
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb); 338 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb); 339 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
357 iov[1].iov_len = 0; 340 iov[1].iov_len = 0;
358 341
359 /* 342 /*
360 * iov[1] is the bit of the circular buffer between the start of the 343 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base) 344 * buffer and the start of the currently used section (cb.base)
362 */ 345 */
363 if (CBUF_DATA(&con->cb) >= con->cb.base) { 346 if (cbuf_data(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb); 347 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
365 iov[1].iov_len = con->cb.base; 348 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page); 349 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2; 350 msg.msg_iovlen = 2;
@@ -378,7 +361,7 @@ static int receive_from_sock(struct connection *con)
378 goto out_close; 361 goto out_close;
379 if (ret == len) 362 if (ret == len)
380 call_again_soon = 1; 363 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret); 364 cbuf_add(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid, 365 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page), 366 page_address(con->rx_page),
384 con->cb.base, con->cb.len, 367 con->cb.base, con->cb.len,
@@ -391,35 +374,32 @@ static int receive_from_sock(struct connection *con)
391 } 374 }
392 if (ret < 0) 375 if (ret < 0)
393 goto out_close; 376 goto out_close;
394 CBUF_EAT(&con->cb, ret); 377 cbuf_eat(&con->cb, ret);
395 378
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) { 379 if (cbuf_empty(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page); 380 __free_page(con->rx_page);
398 con->rx_page = NULL; 381 con->rx_page = NULL;
399 } 382 }
400 383
401 out: 384out:
402 if (call_again_soon) 385 if (call_again_soon)
403 goto out_resched; 386 goto out_resched;
404 up_read(&con->sock_sem); 387 up_read(&con->sock_sem);
405 ret = 0; 388 return 0;
406 goto out_ret;
407 389
408 out_resched: 390out_resched:
409 lowcomms_data_ready(con->sock->sk, 0); 391 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem); 392 up_read(&con->sock_sem);
411 ret = 0; 393 cond_resched();
412 schedule(); 394 return 0;
413 goto out_ret;
414 395
415 out_close: 396out_close:
416 up_read(&con->sock_sem); 397 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { 398 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE); 399 close_connection(con, false);
419 /* Reconnect when there is something to send */ 400 /* Reconnect when there is something to send */
420 } 401 }
421 402
422 out_ret:
423 return ret; 403 return ret;
424} 404}
425 405
@@ -434,7 +414,8 @@ static int accept_from_sock(struct connection *con)
434 struct connection *newcon; 414 struct connection *newcon;
435 415
436 memset(&peeraddr, 0, sizeof(peeraddr)); 416 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); 417 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418 IPPROTO_TCP, &newsock);
438 if (result < 0) 419 if (result < 0)
439 return -ENOMEM; 420 return -ENOMEM;
440 421
@@ -462,7 +443,7 @@ static int accept_from_sock(struct connection *con)
462 /* Get the new node's NODEID */ 443 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len); 444 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 445 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n"); 446 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock); 447 sock_release(newsock);
467 up_read(&con->sock_sem); 448 up_read(&con->sock_sem);
468 return -1; 449 return -1;
@@ -483,17 +464,16 @@ static int accept_from_sock(struct connection *con)
483 } 464 }
484 down_write(&newcon->sock_sem); 465 down_write(&newcon->sock_sem);
485 if (newcon->sock) { 466 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon; 467 struct connection *othercon = newcon->othercon;
487 468
488 if (!othercon) { 469 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL); 470 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
490 if (!othercon) { 471 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n"); 472 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem); 473 up_write(&newcon->sock_sem);
493 result = -ENOMEM; 474 result = -ENOMEM;
494 goto accept_err; 475 goto accept_err;
495 } 476 }
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid; 477 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock; 478 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem); 479 init_rwsem(&othercon->sock_sem);
@@ -523,7 +503,7 @@ static int accept_from_sock(struct connection *con)
523 503
524 return 0; 504 return 0;
525 505
526 accept_err: 506accept_err:
527 up_read(&con->sock_sem); 507 up_read(&con->sock_sem);
528 sock_release(newsock); 508 sock_release(newsock);
529 509
@@ -533,7 +513,7 @@ static int accept_from_sock(struct connection *con)
533} 513}
534 514
535/* Connect a new socket to its peer */ 515/* Connect a new socket to its peer */
536static int connect_to_sock(struct connection *con) 516static void connect_to_sock(struct connection *con)
537{ 517{
538 int result = -EHOSTUNREACH; 518 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr; 519 struct sockaddr_storage saddr;
@@ -542,7 +522,7 @@ static int connect_to_sock(struct connection *con)
542 522
543 if (con->nodeid == 0) { 523 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled"); 524 log_print("attempt to connect sock 0 foiled");
545 return 0; 525 return;
546 } 526 }
547 527
548 down_write(&con->sock_sem); 528 down_write(&con->sock_sem);
@@ -556,13 +536,14 @@ static int connect_to_sock(struct connection *con)
556 } 536 }
557 537
558 /* Create a socket to communicate with */ 538 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); 539 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540 IPPROTO_TCP, &sock);
560 if (result < 0) 541 if (result < 0)
561 goto out_err; 542 goto out_err;
562 543
563 memset(&saddr, 0, sizeof(saddr)); 544 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 545 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err; 546 goto out_err;
566 547
567 sock->sk->sk_user_data = con; 548 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock; 549 con->rx_action = receive_from_sock;
@@ -574,22 +555,13 @@ static int connect_to_sock(struct connection *con)
574 log_print("connecting to %d", con->nodeid); 555 log_print("connecting to %d", con->nodeid);
575 result = 556 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 557 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK); 558 O_NONBLOCK);
578 if (result == -EINPROGRESS) 559 if (result == -EINPROGRESS)
579 result = 0; 560 result = 0;
580 if (result != 0) 561 if (result == 0)
581 goto out_err; 562 goto out;
582
583 out:
584 up_write(&con->sock_sem);
585 /*
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
589 */
590 return result;
591 563
592 out_err: 564out_err:
593 if (con->sock) { 565 if (con->sock) {
594 sock_release(con->sock); 566 sock_release(con->sock);
595 con->sock = NULL; 567 con->sock = NULL;
@@ -604,12 +576,15 @@ static int connect_to_sock(struct connection *con)
604 lowcomms_connect_sock(con); 576 lowcomms_connect_sock(con);
605 result = 0; 577 result = 0;
606 } 578 }
607 goto out; 579out:
580 up_write(&con->sock_sem);
581 return;
608} 582}
609 583
610static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) 584static struct socket *create_listen_sock(struct connection *con,
585 struct sockaddr_storage *saddr)
611{ 586{
612 struct socket *sock = NULL; 587 struct socket *sock = NULL;
613 mm_segment_t fs; 588 mm_segment_t fs;
614 int result = 0; 589 int result = 0;
615 int one = 1; 590 int one = 1;
@@ -629,10 +604,12 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
629 604
630 fs = get_fs(); 605 fs = get_fs();
631 set_fs(get_ds()); 606 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); 607 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608 (char *)&one, sizeof(one));
633 set_fs(fs); 609 set_fs(fs);
634 if (result < 0) { 610 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); 611 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612 result);
636 } 613 }
637 sock->sk->sk_user_data = con; 614 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock; 615 con->rx_action = accept_from_sock;
@@ -652,7 +629,8 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
652 fs = get_fs(); 629 fs = get_fs();
653 set_fs(get_ds()); 630 set_fs(get_ds());
654 631
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); 632 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633 (char *)&one, sizeof(one));
656 set_fs(fs); 634 set_fs(fs);
657 if (result < 0) { 635 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result); 636 printk("dlm: Set keepalive failed: %d\n", result);
@@ -666,7 +644,7 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
666 goto create_out; 644 goto create_out;
667 } 645 }
668 646
669 create_out: 647create_out:
670 return sock; 648 return sock;
671} 649}
672 650
@@ -679,10 +657,6 @@ static int listen_for_all(void)
679 int result = -EINVAL; 657 int result = -EINVAL;
680 658
681 /* We don't support multi-homed hosts */ 659 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags); 660 set_bit(CF_IS_OTHERCON, &con->flags);
687 661
688 sock = create_listen_sock(con, &dlm_local_addr); 662 sock = create_listen_sock(con, &dlm_local_addr);
@@ -731,16 +705,12 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
731 int offset = 0; 705 int offset = 0;
732 int users = 0; 706 int users = 0;
733 707
734 if (!atomic_read(&accepting))
735 return NULL;
736
737 con = nodeid2con(nodeid, allocation); 708 con = nodeid2con(nodeid, allocation);
738 if (!con) 709 if (!con)
739 return NULL; 710 return NULL;
740 711
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 712 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) || 713 if ((&e->list == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) { 714 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL; 715 e = NULL;
746 } else { 716 } else {
@@ -751,7 +721,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
751 spin_unlock(&con->writequeue_lock); 721 spin_unlock(&con->writequeue_lock);
752 722
753 if (e) { 723 if (e) {
754 got_one: 724 got_one:
755 if (users == 0) 725 if (users == 0)
756 kmap(e->page); 726 kmap(e->page);
757 *ppc = page_address(e->page) + offset; 727 *ppc = page_address(e->page) + offset;
@@ -777,10 +747,6 @@ void dlm_lowcomms_commit_buffer(void *mh)
777 struct connection *con = e->con; 747 struct connection *con = e->con;
778 int users; 748 int users;
779 749
780 if (!atomic_read(&accepting))
781 return;
782
783 spin_lock(&con->writequeue_lock);
784 users = --e->users; 750 users = --e->users;
785 if (users) 751 if (users)
786 goto out; 752 goto out;
@@ -797,7 +763,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
797 } 763 }
798 return; 764 return;
799 765
800 out: 766out:
801 spin_unlock(&con->writequeue_lock); 767 spin_unlock(&con->writequeue_lock);
802 return; 768 return;
803} 769}
@@ -809,7 +775,7 @@ static void free_entry(struct writequeue_entry *e)
809} 775}
810 776
811/* Send a message */ 777/* Send a message */
812static int send_to_sock(struct connection *con) 778static void send_to_sock(struct connection *con)
813{ 779{
814 int ret = 0; 780 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); 781 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
@@ -846,7 +812,7 @@ static int send_to_sock(struct connection *con)
846 } 812 }
847 else { 813 else {
848 /* Don't starve people filling buffers */ 814 /* Don't starve people filling buffers */
849 schedule(); 815 cond_resched();
850 } 816 }
851 817
852 spin_lock(&con->writequeue_lock); 818 spin_lock(&con->writequeue_lock);
@@ -855,25 +821,26 @@ static int send_to_sock(struct connection *con)
855 821
856 if (e->len == 0 && e->users == 0) { 822 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list); 823 list_del(&e->list);
824 kunmap(e->page);
858 free_entry(e); 825 free_entry(e);
859 continue; 826 continue;
860 } 827 }
861 } 828 }
862 spin_unlock(&con->writequeue_lock); 829 spin_unlock(&con->writequeue_lock);
863 out: 830out:
864 up_read(&con->sock_sem); 831 up_read(&con->sock_sem);
865 return ret; 832 return;
866 833
867 send_error: 834send_error:
868 up_read(&con->sock_sem); 835 up_read(&con->sock_sem);
869 close_connection(con, FALSE); 836 close_connection(con, false);
870 lowcomms_connect_sock(con); 837 lowcomms_connect_sock(con);
871 return ret; 838 return;
872 839
873 out_connect: 840out_connect:
874 up_read(&con->sock_sem); 841 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con); 842 lowcomms_connect_sock(con);
876 return 0; 843 return;
877} 844}
878 845
879static void clean_one_writequeue(struct connection *con) 846static void clean_one_writequeue(struct connection *con)
@@ -904,12 +871,12 @@ int dlm_lowcomms_close(int nodeid)
904 con = nodeid2con(nodeid, 0); 871 con = nodeid2con(nodeid, 0);
905 if (con) { 872 if (con) {
906 clean_one_writequeue(con); 873 clean_one_writequeue(con);
907 close_connection(con, TRUE); 874 close_connection(con, true);
908 atomic_set(&con->waiting_requests, 0); 875 atomic_set(&con->waiting_requests, 0);
909 } 876 }
910 return 0; 877 return 0;
911 878
912 out: 879out:
913 return -1; 880 return -1;
914} 881}
915 882
@@ -940,7 +907,7 @@ static void process_sockets(void)
940 list_for_each_safe(list, temp, &read_sockets) { 907 list_for_each_safe(list, temp, &read_sockets) {
941 908
942 struct connection *con = 909 struct connection *con =
943 list_entry(list, struct connection, read_list); 910 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list); 911 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags); 912 clear_bit(CF_READ_PENDING, &con->flags);
946 913
@@ -959,7 +926,7 @@ static void process_sockets(void)
959 926
960 /* Don't starve out everyone else */ 927 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) { 928 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule(); 929 cond_resched();
963 count = 0; 930 count = 0;
964 } 931 }
965 932
@@ -977,20 +944,16 @@ static void process_output_queue(void)
977{ 944{
978 struct list_head *list; 945 struct list_head *list;
979 struct list_head *temp; 946 struct list_head *temp;
980 int ret;
981 947
982 spin_lock_bh(&write_sockets_lock); 948 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) { 949 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con = 950 struct connection *con =
985 list_entry(list, struct connection, write_list); 951 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags); 952 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list); 953 list_del(&con->write_list);
988 954
989 spin_unlock_bh(&write_sockets_lock); 955 spin_unlock_bh(&write_sockets_lock);
990 956 send_to_sock(con);
991 ret = send_to_sock(con);
992 if (ret < 0) {
993 }
994 spin_lock_bh(&write_sockets_lock); 957 spin_lock_bh(&write_sockets_lock);
995 } 958 }
996 spin_unlock_bh(&write_sockets_lock); 959 spin_unlock_bh(&write_sockets_lock);
@@ -1000,19 +963,16 @@ static void process_state_queue(void)
1000{ 963{
1001 struct list_head *list; 964 struct list_head *list;
1002 struct list_head *temp; 965 struct list_head *temp;
1003 int ret;
1004 966
1005 spin_lock_bh(&state_sockets_lock); 967 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) { 968 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con = 969 struct connection *con =
1008 list_entry(list, struct connection, state_list); 970 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list); 971 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags); 972 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock); 973 spin_unlock_bh(&state_sockets_lock);
1012 974
1013 ret = connect_to_sock(con); 975 connect_to_sock(con);
1014 if (ret < 0) {
1015 }
1016 spin_lock_bh(&state_sockets_lock); 976 spin_lock_bh(&state_sockets_lock);
1017 } 977 }
1018 spin_unlock_bh(&state_sockets_lock); 978 spin_unlock_bh(&state_sockets_lock);
@@ -1046,14 +1006,13 @@ static int read_list_empty(void)
1046/* DLM Transport comms receive daemon */ 1006/* DLM Transport comms receive daemon */
1047static int dlm_recvd(void *data) 1007static int dlm_recvd(void *data)
1048{ 1008{
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current); 1009 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); 1010 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1052 1011
1053 while (!kthread_should_stop()) { 1012 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE); 1013 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty()) 1014 if (read_list_empty())
1056 schedule(); 1015 cond_resched();
1057 set_current_state(TASK_RUNNING); 1016 set_current_state(TASK_RUNNING);
1058 1017
1059 process_sockets(); 1018 process_sockets();
@@ -1081,14 +1040,13 @@ static int write_and_state_lists_empty(void)
1081/* DLM Transport send daemon */ 1040/* DLM Transport send daemon */
1082static int dlm_sendd(void *data) 1041static int dlm_sendd(void *data)
1083{ 1042{
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current); 1043 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); 1044 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1087 1045
1088 while (!kthread_should_stop()) { 1046 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE); 1047 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty()) 1048 if (write_and_state_lists_empty())
1091 schedule(); 1049 cond_resched();
1092 set_current_state(TASK_RUNNING); 1050 set_current_state(TASK_RUNNING);
1093 1051
1094 process_state_queue(); 1052 process_state_queue();
@@ -1111,7 +1069,7 @@ static int daemons_start(void)
1111 1069
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1070 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p); 1071 error = IS_ERR(p);
1114 if (error) { 1072 if (error) {
1115 log_print("can't start dlm_recvd %d", error); 1073 log_print("can't start dlm_recvd %d", error);
1116 return error; 1074 return error;
1117 } 1075 }
@@ -1119,7 +1077,7 @@ static int daemons_start(void)
1119 1077
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1078 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p); 1079 error = IS_ERR(p);
1122 if (error) { 1080 if (error) {
1123 log_print("can't start dlm_sendd %d", error); 1081 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task); 1082 kthread_stop(recv_task);
1125 return error; 1083 return error;
@@ -1141,21 +1099,20 @@ void dlm_lowcomms_stop(void)
1141{ 1099{
1142 int i; 1100 int i;
1143 1101
1144 atomic_set(&accepting, 0); 1102 /* Set all the flags to prevent any
1145
1146 /* Set all the activity flags to prevent any
1147 socket activity. 1103 socket activity.
1148 */ 1104 */
1149 for (i = 0; i < conn_array_size; i++) { 1105 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i]) 1106 if (connections[i])
1151 connections[i]->flags |= 0x7; 1107 connections[i]->flags |= 0xFF;
1152 } 1108 }
1109
1153 daemons_stop(); 1110 daemons_stop();
1154 clean_writequeues(); 1111 clean_writequeues();
1155 1112
1156 for (i = 0; i < conn_array_size; i++) { 1113 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) { 1114 if (connections[i]) {
1158 close_connection(connections[i], TRUE); 1115 close_connection(connections[i], true);
1159 if (connections[i]->othercon) 1116 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon); 1117 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]); 1118 kmem_cache_free(con_cache, connections[i]);
@@ -1173,24 +1130,12 @@ int dlm_lowcomms_start(void)
1173{ 1130{
1174 int error = 0; 1131 int error = 0;
1175 1132
1176 error = -ENOTCONN;
1177
1178 /*
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1182 */
1183 init_waitqueue_head(&lowcomms_send_waitq);
1184
1185 error = -ENOMEM; 1133 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) * 1134 connections = kzalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL); 1135 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections) 1136 if (!connections)
1189 goto out; 1137 goto out;
1190 1138
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1193
1194 conn_array_size = NODE_INCREMENT; 1139 conn_array_size = NODE_INCREMENT;
1195 1140
1196 if (dlm_our_addr(&dlm_local_addr, 0)) { 1141 if (dlm_our_addr(&dlm_local_addr, 0)) {
@@ -1203,7 +1148,8 @@ int dlm_lowcomms_start(void)
1203 } 1148 }
1204 1149
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1150 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL); 1151 __alignof__(struct connection), 0,
1152 NULL, NULL);
1207 if (!con_cache) 1153 if (!con_cache)
1208 goto fail_free_conn; 1154 goto fail_free_conn;
1209 1155
@@ -1217,40 +1163,20 @@ int dlm_lowcomms_start(void)
1217 if (error) 1163 if (error)
1218 goto fail_unlisten; 1164 goto fail_unlisten;
1219 1165
1220 atomic_set(&accepting, 1);
1221
1222 return 0; 1166 return 0;
1223 1167
1224 fail_unlisten: 1168fail_unlisten:
1225 close_connection(connections[0], 0); 1169 close_connection(connections[0], false);
1226 kmem_cache_free(con_cache, connections[0]); 1170 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache); 1171 kmem_cache_destroy(con_cache);
1228 1172
1229 fail_free_conn: 1173fail_free_conn:
1230 kfree(connections); 1174 kfree(connections);
1231 1175
1232 out: 1176out:
1233 return error; 1177 return error;
1234} 1178}
1235 1179
1236int dlm_lowcomms_init(void)
1237{
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1241
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1246
1247 return 0;
1248}
1249
1250void dlm_lowcomms_exit(void)
1251{
1252}
1253
1254/* 1180/*
1255 * Overrides for Emacs so that we follow Linus's tabbing style. 1181 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically 1182 * Emacs will notice this stuff at the end of the file and automatically
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 2d045e0daae1..a9a9618c0d3f 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -14,8 +14,6 @@
14#ifndef __LOWCOMMS_DOT_H__ 14#ifndef __LOWCOMMS_DOT_H__
15#define __LOWCOMMS_DOT_H__ 15#define __LOWCOMMS_DOT_H__
16 16
17int dlm_lowcomms_init(void);
18void dlm_lowcomms_exit(void);
19int dlm_lowcomms_start(void); 17int dlm_lowcomms_start(void);
20void dlm_lowcomms_stop(void); 18void dlm_lowcomms_stop(void);
21int dlm_lowcomms_close(int nodeid); 19int dlm_lowcomms_close(int nodeid);
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index a8da8dc36b2e..162fbae58fe5 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -16,7 +16,6 @@
16#include "lock.h" 16#include "lock.h"
17#include "user.h" 17#include "user.h"
18#include "memory.h" 18#include "memory.h"
19#include "lowcomms.h"
20#include "config.h" 19#include "config.h"
21 20
22#ifdef CONFIG_DLM_DEBUG 21#ifdef CONFIG_DLM_DEBUG
@@ -47,20 +46,14 @@ static int __init init_dlm(void)
47 if (error) 46 if (error)
48 goto out_config; 47 goto out_config;
49 48
50 error = dlm_lowcomms_init();
51 if (error)
52 goto out_debug;
53
54 error = dlm_user_init(); 49 error = dlm_user_init();
55 if (error) 50 if (error)
56 goto out_lowcomms; 51 goto out_debug;
57 52
58 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__); 53 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);
59 54
60 return 0; 55 return 0;
61 56
62 out_lowcomms:
63 dlm_lowcomms_exit();
64 out_debug: 57 out_debug:
65 dlm_unregister_debugfs(); 58 dlm_unregister_debugfs();
66 out_config: 59 out_config:
@@ -76,7 +69,6 @@ static int __init init_dlm(void)
76static void __exit exit_dlm(void) 69static void __exit exit_dlm(void)
77{ 70{
78 dlm_user_exit(); 71 dlm_user_exit();
79 dlm_lowcomms_exit();
80 dlm_config_exit(); 72 dlm_config_exit();
81 dlm_memory_exit(); 73 dlm_memory_exit();
82 dlm_lockspace_exit(); 74 dlm_lockspace_exit();