aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/lowcomms-sctp.c264
-rw-r--r--fs/dlm/lowcomms-tcp.c342
-rw-r--r--fs/dlm/lowcomms.h2
-rw-r--r--fs/dlm/main.c10
4 files changed, 261 insertions, 357 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
index 6da6b14d5a61..fe158d7a9285 100644
--- a/fs/dlm/lowcomms-sctp.c
+++ b/fs/dlm/lowcomms-sctp.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -75,13 +75,13 @@ struct nodeinfo {
75}; 75};
76 76
77static DEFINE_IDR(nodeinfo_idr); 77static DEFINE_IDR(nodeinfo_idr);
78static struct rw_semaphore nodeinfo_lock; 78static DECLARE_RWSEM(nodeinfo_lock);
79static int max_nodeid; 79static int max_nodeid;
80 80
81struct cbuf { 81struct cbuf {
82 unsigned base; 82 unsigned int base;
83 unsigned len; 83 unsigned int len;
84 unsigned mask; 84 unsigned int mask;
85}; 85};
86 86
87/* Just the one of these, now. But this struct keeps 87/* Just the one of these, now. But this struct keeps
@@ -90,9 +90,9 @@ struct cbuf {
90#define CF_READ_PENDING 1 90#define CF_READ_PENDING 1
91 91
92struct connection { 92struct connection {
93 struct socket *sock; 93 struct socket *sock;
94 unsigned long flags; 94 unsigned long flags;
95 struct page *rx_page; 95 struct page *rx_page;
96 atomic_t waiting_requests; 96 atomic_t waiting_requests;
97 struct cbuf cb; 97 struct cbuf cb;
98 int eagain_flag; 98 int eagain_flag;
@@ -102,36 +102,40 @@ struct connection {
102 102
103struct writequeue_entry { 103struct writequeue_entry {
104 struct list_head list; 104 struct list_head list;
105 struct page *page; 105 struct page *page;
106 int offset; 106 int offset;
107 int len; 107 int len;
108 int end; 108 int end;
109 int users; 109 int users;
110 struct nodeinfo *ni; 110 struct nodeinfo *ni;
111}; 111};
112 112
113#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 113static void cbuf_add(struct cbuf *cb, int n)
114#define CBUF_EMPTY(cb) ((cb)->len == 0) 114{
115#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 115 cb->len += n;
116#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 116}
117 117
118#define CBUF_INIT(cb, size) \ 118static int cbuf_data(struct cbuf *cb)
119do { \ 119{
120 (cb)->base = (cb)->len = 0; \ 120 return ((cb->base + cb->len) & cb->mask);
121 (cb)->mask = ((size)-1); \ 121}
122} while(0)
123 122
124#define CBUF_EAT(cb, n) \ 123static void cbuf_init(struct cbuf *cb, int size)
125do { \ 124{
126 (cb)->len -= (n); \ 125 cb->base = cb->len = 0;
127 (cb)->base += (n); \ 126 cb->mask = size-1;
128 (cb)->base &= (cb)->mask; \ 127}
129} while(0)
130 128
129static void cbuf_eat(struct cbuf *cb, int n)
130{
131 cb->len -= n;
132 cb->base += n;
133 cb->base &= cb->mask;
134}
131 135
132/* List of nodes which have writes pending */ 136/* List of nodes which have writes pending */
133static struct list_head write_nodes; 137static LIST_HEAD(write_nodes);
134static spinlock_t write_nodes_lock; 138static DEFINE_SPINLOCK(write_nodes_lock);
135 139
136/* Maximum number of incoming messages to process before 140/* Maximum number of incoming messages to process before
137 * doing a schedule() 141 * doing a schedule()
@@ -141,8 +145,7 @@ static spinlock_t write_nodes_lock;
141/* Manage daemons */ 145/* Manage daemons */
142static struct task_struct *recv_task; 146static struct task_struct *recv_task;
143static struct task_struct *send_task; 147static struct task_struct *send_task;
144static wait_queue_head_t lowcomms_recv_wait; 148static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
145static atomic_t accepting;
146 149
147/* The SCTP connection */ 150/* The SCTP connection */
148static struct connection sctp_con; 151static struct connection sctp_con;
@@ -161,11 +164,11 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
161 return error; 164 return error;
162 165
163 if (dlm_local_addr[0]->ss_family == AF_INET) { 166 if (dlm_local_addr[0]->ss_family == AF_INET) {
164 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 167 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
165 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 168 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
166 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 169 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
167 } else { 170 } else {
168 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 171 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
169 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 172 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
170 memcpy(&ret6->sin6_addr, &in6->sin6_addr, 173 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
171 sizeof(in6->sin6_addr)); 174 sizeof(in6->sin6_addr));
@@ -174,6 +177,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
174 return 0; 177 return 0;
175} 178}
176 179
180/* If alloc is 0 here we will not attempt to allocate a new
181 nodeinfo struct */
177static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) 182static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
178{ 183{
179 struct nodeinfo *ni; 184 struct nodeinfo *ni;
@@ -184,44 +189,45 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
184 ni = idr_find(&nodeinfo_idr, nodeid); 189 ni = idr_find(&nodeinfo_idr, nodeid);
185 up_read(&nodeinfo_lock); 190 up_read(&nodeinfo_lock);
186 191
187 if (!ni && alloc) { 192 if (ni || !alloc)
188 down_write(&nodeinfo_lock); 193 return ni;
189 194
190 ni = idr_find(&nodeinfo_idr, nodeid); 195 down_write(&nodeinfo_lock);
191 if (ni)
192 goto out_up;
193 196
194 r = idr_pre_get(&nodeinfo_idr, alloc); 197 ni = idr_find(&nodeinfo_idr, nodeid);
195 if (!r) 198 if (ni)
196 goto out_up; 199 goto out_up;
197 200
198 ni = kmalloc(sizeof(struct nodeinfo), alloc); 201 r = idr_pre_get(&nodeinfo_idr, alloc);
199 if (!ni) 202 if (!r)
200 goto out_up; 203 goto out_up;
201 204
202 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); 205 ni = kmalloc(sizeof(struct nodeinfo), alloc);
203 if (r) { 206 if (!ni)
204 kfree(ni); 207 goto out_up;
205 ni = NULL; 208
206 goto out_up; 209 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
207 } 210 if (r) {
208 if (n != nodeid) { 211 kfree(ni);
209 idr_remove(&nodeinfo_idr, n); 212 ni = NULL;
210 kfree(ni); 213 goto out_up;
211 ni = NULL;
212 goto out_up;
213 }
214 memset(ni, 0, sizeof(struct nodeinfo));
215 spin_lock_init(&ni->lock);
216 INIT_LIST_HEAD(&ni->writequeue);
217 spin_lock_init(&ni->writequeue_lock);
218 ni->nodeid = nodeid;
219
220 if (nodeid > max_nodeid)
221 max_nodeid = nodeid;
222 out_up:
223 up_write(&nodeinfo_lock);
224 } 214 }
215 if (n != nodeid) {
216 idr_remove(&nodeinfo_idr, n);
217 kfree(ni);
218 ni = NULL;
219 goto out_up;
220 }
221 memset(ni, 0, sizeof(struct nodeinfo));
222 spin_lock_init(&ni->lock);
223 INIT_LIST_HEAD(&ni->writequeue);
224 spin_lock_init(&ni->writequeue_lock);
225 ni->nodeid = nodeid;
226
227 if (nodeid > max_nodeid)
228 max_nodeid = nodeid;
229out_up:
230 up_write(&nodeinfo_lock);
225 231
226 return ni; 232 return ni;
227} 233}
@@ -279,13 +285,13 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
279 in4_addr->sin_port = cpu_to_be16(port); 285 in4_addr->sin_port = cpu_to_be16(port);
280 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 286 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
281 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - 287 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
282 sizeof(struct sockaddr_in)); 288 sizeof(struct sockaddr_in));
283 *addr_len = sizeof(struct sockaddr_in); 289 *addr_len = sizeof(struct sockaddr_in);
284 } else { 290 } else {
285 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 291 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
286 in6_addr->sin6_port = cpu_to_be16(port); 292 in6_addr->sin6_port = cpu_to_be16(port);
287 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - 293 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
288 sizeof(struct sockaddr_in6)); 294 sizeof(struct sockaddr_in6));
289 *addr_len = sizeof(struct sockaddr_in6); 295 *addr_len = sizeof(struct sockaddr_in6);
290 } 296 }
291} 297}
@@ -324,7 +330,7 @@ static void send_shutdown(sctp_assoc_t associd)
324 cmsg->cmsg_type = SCTP_SNDRCV; 330 cmsg->cmsg_type = SCTP_SNDRCV;
325 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 331 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
326 outmessage.msg_controllen = cmsg->cmsg_len; 332 outmessage.msg_controllen = cmsg->cmsg_len;
327 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 333 sinfo = CMSG_DATA(cmsg);
328 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 334 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
329 335
330 sinfo->sinfo_flags |= MSG_EOF; 336 sinfo->sinfo_flags |= MSG_EOF;
@@ -387,7 +393,7 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
387 393
388 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 394 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
389 log_print("COMM_UP for invalid assoc ID %d", 395 log_print("COMM_UP for invalid assoc ID %d",
390 (int)sn->sn_assoc_change.sac_assoc_id); 396 (int)sn->sn_assoc_change.sac_assoc_id);
391 init_failed(); 397 init_failed();
392 return; 398 return;
393 } 399 }
@@ -398,15 +404,18 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
398 fs = get_fs(); 404 fs = get_fs();
399 set_fs(get_ds()); 405 set_fs(get_ds());
400 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, 406 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
401 IPPROTO_SCTP, SCTP_PRIMARY_ADDR, 407 IPPROTO_SCTP,
402 (char*)&prim, &prim_len); 408 SCTP_PRIMARY_ADDR,
409 (char*)&prim,
410 &prim_len);
403 set_fs(fs); 411 set_fs(fs);
404 if (ret < 0) { 412 if (ret < 0) {
405 struct nodeinfo *ni; 413 struct nodeinfo *ni;
406 414
407 log_print("getsockopt/sctp_primary_addr on " 415 log_print("getsockopt/sctp_primary_addr on "
408 "new assoc %d failed : %d", 416 "new assoc %d failed : %d",
409 (int)sn->sn_assoc_change.sac_assoc_id, ret); 417 (int)sn->sn_assoc_change.sac_assoc_id,
418 ret);
410 419
411 /* Retry INIT later */ 420 /* Retry INIT later */
412 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); 421 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
@@ -426,12 +435,10 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
426 return; 435 return;
427 436
428 /* Save the assoc ID */ 437 /* Save the assoc ID */
429 spin_lock(&ni->lock);
430 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; 438 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
431 spin_unlock(&ni->lock);
432 439
433 log_print("got new/restarted association %d nodeid %d", 440 log_print("got new/restarted association %d nodeid %d",
434 (int)sn->sn_assoc_change.sac_assoc_id, nodeid); 441 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
435 442
436 /* Send any pending writes */ 443 /* Send any pending writes */
437 clear_bit(NI_INIT_PENDING, &ni->flags); 444 clear_bit(NI_INIT_PENDING, &ni->flags);
@@ -507,13 +514,12 @@ static int receive_from_sock(void)
507 sctp_con.rx_page = alloc_page(GFP_ATOMIC); 514 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
508 if (sctp_con.rx_page == NULL) 515 if (sctp_con.rx_page == NULL)
509 goto out_resched; 516 goto out_resched;
510 CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE); 517 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
511 } 518 }
512 519
513 memset(&incmsg, 0, sizeof(incmsg)); 520 memset(&incmsg, 0, sizeof(incmsg));
514 memset(&msgname, 0, sizeof(msgname)); 521 memset(&msgname, 0, sizeof(msgname));
515 522
516 memset(incmsg, 0, sizeof(incmsg));
517 msg.msg_name = &msgname; 523 msg.msg_name = &msgname;
518 msg.msg_namelen = sizeof(msgname); 524 msg.msg_namelen = sizeof(msgname);
519 msg.msg_flags = 0; 525 msg.msg_flags = 0;
@@ -532,17 +538,17 @@ static int receive_from_sock(void)
532 * iov[0] is the bit of the circular buffer between the current end 538 * iov[0] is the bit of the circular buffer between the current end
533 * point (cb.base + cb.len) and the end of the buffer. 539 * point (cb.base + cb.len) and the end of the buffer.
534 */ 540 */
535 iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb); 541 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
536 iov[0].iov_base = page_address(sctp_con.rx_page) + 542 iov[0].iov_base = page_address(sctp_con.rx_page) +
537 CBUF_DATA(&sctp_con.cb); 543 cbuf_data(&sctp_con.cb);
538 iov[1].iov_len = 0; 544 iov[1].iov_len = 0;
539 545
540 /* 546 /*
541 * iov[1] is the bit of the circular buffer between the start of the 547 * iov[1] is the bit of the circular buffer between the start of the
542 * buffer and the start of the currently used section (cb.base) 548 * buffer and the start of the currently used section (cb.base)
543 */ 549 */
544 if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) { 550 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
545 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb); 551 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
546 iov[1].iov_len = sctp_con.cb.base; 552 iov[1].iov_len = sctp_con.cb.base;
547 iov[1].iov_base = page_address(sctp_con.rx_page); 553 iov[1].iov_base = page_address(sctp_con.rx_page);
548 msg.msg_iovlen = 2; 554 msg.msg_iovlen = 2;
@@ -557,7 +563,7 @@ static int receive_from_sock(void)
557 msg.msg_control = incmsg; 563 msg.msg_control = incmsg;
558 msg.msg_controllen = sizeof(incmsg); 564 msg.msg_controllen = sizeof(incmsg);
559 cmsg = CMSG_FIRSTHDR(&msg); 565 cmsg = CMSG_FIRSTHDR(&msg);
560 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 566 sinfo = CMSG_DATA(cmsg);
561 567
562 if (msg.msg_flags & MSG_NOTIFICATION) { 568 if (msg.msg_flags & MSG_NOTIFICATION) {
563 process_sctp_notification(&msg, page_address(sctp_con.rx_page)); 569 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
@@ -583,29 +589,29 @@ static int receive_from_sock(void)
583 if (r == 1) 589 if (r == 1)
584 return 0; 590 return 0;
585 591
586 CBUF_ADD(&sctp_con.cb, ret); 592 cbuf_add(&sctp_con.cb, ret);
587 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), 593 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
588 page_address(sctp_con.rx_page), 594 page_address(sctp_con.rx_page),
589 sctp_con.cb.base, sctp_con.cb.len, 595 sctp_con.cb.base, sctp_con.cb.len,
590 PAGE_CACHE_SIZE); 596 PAGE_CACHE_SIZE);
591 if (ret < 0) 597 if (ret < 0)
592 goto out_close; 598 goto out_close;
593 CBUF_EAT(&sctp_con.cb, ret); 599 cbuf_eat(&sctp_con.cb, ret);
594 600
595 out: 601out:
596 ret = 0; 602 ret = 0;
597 goto out_ret; 603 goto out_ret;
598 604
599 out_resched: 605out_resched:
600 lowcomms_data_ready(sctp_con.sock->sk, 0); 606 lowcomms_data_ready(sctp_con.sock->sk, 0);
601 ret = 0; 607 ret = 0;
602 schedule(); 608 cond_resched();
603 goto out_ret; 609 goto out_ret;
604 610
605 out_close: 611out_close:
606 if (ret != -EAGAIN) 612 if (ret != -EAGAIN)
607 log_print("error reading from sctp socket: %d", ret); 613 log_print("error reading from sctp socket: %d", ret);
608 out_ret: 614out_ret:
609 return ret; 615 return ret;
610} 616}
611 617
@@ -619,10 +625,12 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
619 set_fs(get_ds()); 625 set_fs(get_ds());
620 if (num == 1) 626 if (num == 1)
621 result = sctp_con.sock->ops->bind(sctp_con.sock, 627 result = sctp_con.sock->ops->bind(sctp_con.sock,
622 (struct sockaddr *) addr, addr_len); 628 (struct sockaddr *) addr,
629 addr_len);
623 else 630 else
624 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, 631 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
625 SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len); 632 SCTP_SOCKOPT_BINDX_ADD,
633 (char *)addr, addr_len);
626 set_fs(fs); 634 set_fs(fs);
627 635
628 if (result < 0) 636 if (result < 0)
@@ -719,10 +727,10 @@ static int init_sock(void)
719 727
720 return 0; 728 return 0;
721 729
722 create_delsock: 730create_delsock:
723 sock_release(sock); 731 sock_release(sock);
724 sctp_con.sock = NULL; 732 sctp_con.sock = NULL;
725 out: 733out:
726 return result; 734 return result;
727} 735}
728 736
@@ -756,16 +764,13 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
756 int users = 0; 764 int users = 0;
757 struct nodeinfo *ni; 765 struct nodeinfo *ni;
758 766
759 if (!atomic_read(&accepting))
760 return NULL;
761
762 ni = nodeid2nodeinfo(nodeid, allocation); 767 ni = nodeid2nodeinfo(nodeid, allocation);
763 if (!ni) 768 if (!ni)
764 return NULL; 769 return NULL;
765 770
766 spin_lock(&ni->writequeue_lock); 771 spin_lock(&ni->writequeue_lock);
767 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); 772 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
768 if (((struct list_head *) e == &ni->writequeue) || 773 if ((&e->list == &ni->writequeue) ||
769 (PAGE_CACHE_SIZE - e->end < len)) { 774 (PAGE_CACHE_SIZE - e->end < len)) {
770 e = NULL; 775 e = NULL;
771 } else { 776 } else {
@@ -776,7 +781,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
776 spin_unlock(&ni->writequeue_lock); 781 spin_unlock(&ni->writequeue_lock);
777 782
778 if (e) { 783 if (e) {
779 got_one: 784 got_one:
780 if (users == 0) 785 if (users == 0)
781 kmap(e->page); 786 kmap(e->page);
782 *ppc = page_address(e->page) + offset; 787 *ppc = page_address(e->page) + offset;
@@ -803,9 +808,6 @@ void dlm_lowcomms_commit_buffer(void *arg)
803 int users; 808 int users;
804 struct nodeinfo *ni = e->ni; 809 struct nodeinfo *ni = e->ni;
805 810
806 if (!atomic_read(&accepting))
807 return;
808
809 spin_lock(&ni->writequeue_lock); 811 spin_lock(&ni->writequeue_lock);
810 users = --e->users; 812 users = --e->users;
811 if (users) 813 if (users)
@@ -822,7 +824,7 @@ void dlm_lowcomms_commit_buffer(void *arg)
822 } 824 }
823 return; 825 return;
824 826
825 out: 827out:
826 spin_unlock(&ni->writequeue_lock); 828 spin_unlock(&ni->writequeue_lock);
827 return; 829 return;
828} 830}
@@ -878,7 +880,7 @@ static void initiate_association(int nodeid)
878 cmsg->cmsg_level = IPPROTO_SCTP; 880 cmsg->cmsg_level = IPPROTO_SCTP;
879 cmsg->cmsg_type = SCTP_SNDRCV; 881 cmsg->cmsg_type = SCTP_SNDRCV;
880 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 882 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
881 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 883 sinfo = CMSG_DATA(cmsg);
882 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 884 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
883 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); 885 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
884 886
@@ -892,7 +894,7 @@ static void initiate_association(int nodeid)
892} 894}
893 895
894/* Send a message */ 896/* Send a message */
895static int send_to_sock(struct nodeinfo *ni) 897static void send_to_sock(struct nodeinfo *ni)
896{ 898{
897 int ret = 0; 899 int ret = 0;
898 struct writequeue_entry *e; 900 struct writequeue_entry *e;
@@ -903,13 +905,13 @@ static int send_to_sock(struct nodeinfo *ni)
903 struct sctp_sndrcvinfo *sinfo; 905 struct sctp_sndrcvinfo *sinfo;
904 struct kvec iov; 906 struct kvec iov;
905 907
906 /* See if we need to init an association before we start 908 /* See if we need to init an association before we start
907 sending precious messages */ 909 sending precious messages */
908 spin_lock(&ni->lock); 910 spin_lock(&ni->lock);
909 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { 911 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
910 spin_unlock(&ni->lock); 912 spin_unlock(&ni->lock);
911 initiate_association(ni->nodeid); 913 initiate_association(ni->nodeid);
912 return 0; 914 return;
913 } 915 }
914 spin_unlock(&ni->lock); 916 spin_unlock(&ni->lock);
915 917
@@ -923,7 +925,7 @@ static int send_to_sock(struct nodeinfo *ni)
923 cmsg->cmsg_level = IPPROTO_SCTP; 925 cmsg->cmsg_level = IPPROTO_SCTP;
924 cmsg->cmsg_type = SCTP_SNDRCV; 926 cmsg->cmsg_type = SCTP_SNDRCV;
925 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 927 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
926 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 928 sinfo = CMSG_DATA(cmsg);
927 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 929 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
928 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); 930 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
929 sinfo->sinfo_assoc_id = ni->assoc_id; 931 sinfo->sinfo_assoc_id = ni->assoc_id;
@@ -955,7 +957,7 @@ static int send_to_sock(struct nodeinfo *ni)
955 goto send_error; 957 goto send_error;
956 } else { 958 } else {
957 /* Don't starve people filling buffers */ 959 /* Don't starve people filling buffers */
958 schedule(); 960 cond_resched();
959 } 961 }
960 962
961 spin_lock(&ni->writequeue_lock); 963 spin_lock(&ni->writequeue_lock);
@@ -964,15 +966,16 @@ static int send_to_sock(struct nodeinfo *ni)
964 966
965 if (e->len == 0 && e->users == 0) { 967 if (e->len == 0 && e->users == 0) {
966 list_del(&e->list); 968 list_del(&e->list);
969 kunmap(e->page);
967 free_entry(e); 970 free_entry(e);
968 continue; 971 continue;
969 } 972 }
970 } 973 }
971 spin_unlock(&ni->writequeue_lock); 974 spin_unlock(&ni->writequeue_lock);
972 out: 975out:
973 return ret; 976 return;
974 977
975 send_error: 978send_error:
976 log_print("Error sending to node %d %d", ni->nodeid, ret); 979 log_print("Error sending to node %d %d", ni->nodeid, ret);
977 spin_lock(&ni->lock); 980 spin_lock(&ni->lock);
978 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { 981 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
@@ -982,7 +985,7 @@ static int send_to_sock(struct nodeinfo *ni)
982 } else 985 } else
983 spin_unlock(&ni->lock); 986 spin_unlock(&ni->lock);
984 987
985 return ret; 988 return;
986} 989}
987 990
988/* Try to send any messages that are pending */ 991/* Try to send any messages that are pending */
@@ -994,7 +997,7 @@ static void process_output_queue(void)
994 spin_lock_bh(&write_nodes_lock); 997 spin_lock_bh(&write_nodes_lock);
995 list_for_each_safe(list, temp, &write_nodes) { 998 list_for_each_safe(list, temp, &write_nodes) {
996 struct nodeinfo *ni = 999 struct nodeinfo *ni =
997 list_entry(list, struct nodeinfo, write_list); 1000 list_entry(list, struct nodeinfo, write_list);
998 clear_bit(NI_WRITE_PENDING, &ni->flags); 1001 clear_bit(NI_WRITE_PENDING, &ni->flags);
999 list_del(&ni->write_list); 1002 list_del(&ni->write_list);
1000 1003
@@ -1106,7 +1109,7 @@ static int dlm_recvd(void *data)
1106 set_current_state(TASK_INTERRUPTIBLE); 1109 set_current_state(TASK_INTERRUPTIBLE);
1107 add_wait_queue(&lowcomms_recv_wait, &wait); 1110 add_wait_queue(&lowcomms_recv_wait, &wait);
1108 if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) 1111 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1109 schedule(); 1112 cond_resched();
1110 remove_wait_queue(&lowcomms_recv_wait, &wait); 1113 remove_wait_queue(&lowcomms_recv_wait, &wait);
1111 set_current_state(TASK_RUNNING); 1114 set_current_state(TASK_RUNNING);
1112 1115
@@ -1118,12 +1121,12 @@ static int dlm_recvd(void *data)
1118 1121
1119 /* Don't starve out everyone else */ 1122 /* Don't starve out everyone else */
1120 if (++count >= MAX_RX_MSG_COUNT) { 1123 if (++count >= MAX_RX_MSG_COUNT) {
1121 schedule(); 1124 cond_resched();
1122 count = 0; 1125 count = 0;
1123 } 1126 }
1124 } while (!kthread_should_stop() && ret >=0); 1127 } while (!kthread_should_stop() && ret >=0);
1125 } 1128 }
1126 schedule(); 1129 cond_resched();
1127 } 1130 }
1128 1131
1129 return 0; 1132 return 0;
@@ -1138,7 +1141,7 @@ static int dlm_sendd(void *data)
1138 while (!kthread_should_stop()) { 1141 while (!kthread_should_stop()) {
1139 set_current_state(TASK_INTERRUPTIBLE); 1142 set_current_state(TASK_INTERRUPTIBLE);
1140 if (write_list_empty()) 1143 if (write_list_empty())
1141 schedule(); 1144 cond_resched();
1142 set_current_state(TASK_RUNNING); 1145 set_current_state(TASK_RUNNING);
1143 1146
1144 if (sctp_con.eagain_flag) { 1147 if (sctp_con.eagain_flag) {
@@ -1166,7 +1169,7 @@ static int daemons_start(void)
1166 1169
1167 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1170 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1168 error = IS_ERR(p); 1171 error = IS_ERR(p);
1169 if (error) { 1172 if (error) {
1170 log_print("can't start dlm_recvd %d", error); 1173 log_print("can't start dlm_recvd %d", error);
1171 return error; 1174 return error;
1172 } 1175 }
@@ -1174,7 +1177,7 @@ static int daemons_start(void)
1174 1177
1175 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1178 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1176 error = IS_ERR(p); 1179 error = IS_ERR(p);
1177 if (error) { 1180 if (error) {
1178 log_print("can't start dlm_sendd %d", error); 1181 log_print("can't start dlm_sendd %d", error);
1179 kthread_stop(recv_task); 1182 kthread_stop(recv_task);
1180 return error; 1183 return error;
@@ -1197,43 +1200,28 @@ int dlm_lowcomms_start(void)
1197 error = daemons_start(); 1200 error = daemons_start();
1198 if (error) 1201 if (error)
1199 goto fail_sock; 1202 goto fail_sock;
1200 atomic_set(&accepting, 1);
1201 return 0; 1203 return 0;
1202 1204
1203 fail_sock: 1205fail_sock:
1204 close_connection(); 1206 close_connection();
1205 return error; 1207 return error;
1206} 1208}
1207 1209
1208/* Set all the activity flags to prevent any socket activity. */
1209
1210void dlm_lowcomms_stop(void) 1210void dlm_lowcomms_stop(void)
1211{ 1211{
1212 atomic_set(&accepting, 0); 1212 int i;
1213
1213 sctp_con.flags = 0x7; 1214 sctp_con.flags = 0x7;
1214 daemons_stop(); 1215 daemons_stop();
1215 clean_writequeues(); 1216 clean_writequeues();
1216 close_connection(); 1217 close_connection();
1217 dealloc_nodeinfo(); 1218 dealloc_nodeinfo();
1218 max_nodeid = 0; 1219 max_nodeid = 0;
1219}
1220 1220
1221int dlm_lowcomms_init(void) 1221 dlm_local_count = 0;
1222{ 1222 dlm_local_nodeid = 0;
1223 init_waitqueue_head(&lowcomms_recv_wait);
1224 spin_lock_init(&write_nodes_lock);
1225 INIT_LIST_HEAD(&write_nodes);
1226 init_rwsem(&nodeinfo_lock);
1227 return 0;
1228}
1229
1230void dlm_lowcomms_exit(void)
1231{
1232 int i;
1233 1223
1234 for (i = 0; i < dlm_local_count; i++) 1224 for (i = 0; i < dlm_local_count; i++)
1235 kfree(dlm_local_addr[i]); 1225 kfree(dlm_local_addr[i]);
1236 dlm_local_count = 0;
1237 dlm_local_nodeid = 0;
1238} 1226}
1239 1227
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index 7289e59b4bd3..8f2791fc8447 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -54,44 +54,59 @@
54#include "config.h" 54#include "config.h"
55 55
56struct cbuf { 56struct cbuf {
57 unsigned base; 57 unsigned int base;
58 unsigned len; 58 unsigned int len;
59 unsigned mask; 59 unsigned int mask;
60}; 60};
61 61
62#ifndef FALSE
63#define FALSE 0
64#define TRUE 1
65#endif
66#define NODE_INCREMENT 32 62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n)
64{
65 cb->len += n;
66}
67 67
68#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) 68static int cbuf_data(struct cbuf *cb)
69#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 69{
70#define CBUF_EMPTY(cb) ((cb)->len == 0) 70 return ((cb->base + cb->len) & cb->mask);
71#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 71}
72#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ 72
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) 73static void cbuf_init(struct cbuf *cb, int size)
74#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 74{
75 cb->base = cb->len = 0;
76 cb->mask = size-1;
77}
78
79static void cbuf_eat(struct cbuf *cb, int n)
80{
81 cb->len -= n;
82 cb->base += n;
83 cb->base &= cb->mask;
84}
85
86static bool cbuf_empty(struct cbuf *cb)
87{
88 return cb->len == 0;
89}
75 90
76/* Maximum number of incoming messages to process before 91/* Maximum number of incoming messages to process before
77 doing a schedule() 92 doing a cond_resched()
78*/ 93*/
79#define MAX_RX_MSG_COUNT 25 94#define MAX_RX_MSG_COUNT 25
80 95
81struct connection { 96struct connection {
82 struct socket *sock; /* NULL if not connected */ 97 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */ 98 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */ 99 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */ 100 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */ 101 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */ 102 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 103 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89#define CF_READ_PENDING 1 104#define CF_READ_PENDING 1
90#define CF_WRITE_PENDING 2 105#define CF_WRITE_PENDING 2
91#define CF_CONNECT_PENDING 3 106#define CF_CONNECT_PENDING 3
92#define CF_IS_OTHERCON 4 107#define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */ 108 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */ 109 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock; 110 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */ 111 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page; 112 struct page *rx_page;
@@ -121,28 +136,27 @@ static struct task_struct *recv_task;
121static struct task_struct *send_task; 136static struct task_struct *send_task;
122 137
123static wait_queue_t lowcomms_send_waitq_head; 138static wait_queue_t lowcomms_send_waitq_head;
124static wait_queue_head_t lowcomms_send_waitq; 139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
125static wait_queue_t lowcomms_recv_waitq_head; 140static wait_queue_t lowcomms_recv_waitq_head;
126static wait_queue_head_t lowcomms_recv_waitq; 141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
127 142
128/* An array of pointers to connections, indexed by NODEID */ 143/* An array of pointers to connections, indexed by NODEID */
129static struct connection **connections; 144static struct connection **connections;
130static struct semaphore connections_lock; 145static DECLARE_MUTEX(connections_lock);
131static kmem_cache_t *con_cache; 146static kmem_cache_t *con_cache;
132static int conn_array_size; 147static int conn_array_size;
133static atomic_t accepting;
134 148
135/* List of sockets that have reads pending */ 149/* List of sockets that have reads pending */
136static struct list_head read_sockets; 150static LIST_HEAD(read_sockets);
137static spinlock_t read_sockets_lock; 151static DEFINE_SPINLOCK(read_sockets_lock);
138 152
139/* List of sockets which have writes pending */ 153/* List of sockets which have writes pending */
140static struct list_head write_sockets; 154static LIST_HEAD(write_sockets);
141static spinlock_t write_sockets_lock; 155static DEFINE_SPINLOCK(write_sockets_lock);
142 156
143/* List of sockets which have connects pending */ 157/* List of sockets which have connects pending */
144static struct list_head state_sockets; 158static LIST_HEAD(state_sockets);
145static spinlock_t state_sockets_lock; 159static DEFINE_SPINLOCK(state_sockets_lock);
146 160
147static struct connection *nodeid2con(int nodeid, gfp_t allocation) 161static struct connection *nodeid2con(int nodeid, gfp_t allocation)
148{ 162{
@@ -153,12 +167,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
153 int new_size = nodeid + NODE_INCREMENT; 167 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns; 168 struct connection **new_conns;
155 169
156 new_conns = kmalloc(sizeof(struct connection *) * 170 new_conns = kzalloc(sizeof(struct connection *) *
157 new_size, allocation); 171 new_size, allocation);
158 if (!new_conns) 172 if (!new_conns)
159 goto finish; 173 goto finish;
160 174
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); 175 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size; 176 conn_array_size = new_size;
164 kfree(connections); 177 kfree(connections);
@@ -168,11 +181,10 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
168 181
169 con = connections[nodeid]; 182 con = connections[nodeid];
170 if (con == NULL && allocation) { 183 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation); 184 con = kmem_cache_zalloc(con_cache, allocation);
172 if (!con) 185 if (!con)
173 goto finish; 186 goto finish;
174 187
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid; 188 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem); 189 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue); 190 INIT_LIST_HEAD(&con->writequeue);
@@ -181,7 +193,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
181 connections[nodeid] = con; 193 connections[nodeid] = con;
182 } 194 }
183 195
184 finish: 196finish:
185 up(&connections_lock); 197 up(&connections_lock);
186 return con; 198 return con;
187} 199}
@@ -220,8 +232,6 @@ static inline void lowcomms_connect_sock(struct connection *con)
220{ 232{
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return; 234 return;
223 if (!atomic_read(&accepting))
224 return;
225 235
226 spin_lock_bh(&state_sockets_lock); 236 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets); 237 list_add_tail(&con->state_list, &state_sockets);
@@ -232,31 +242,8 @@ static inline void lowcomms_connect_sock(struct connection *con)
232 242
233static void lowcomms_state_change(struct sock *sk) 243static void lowcomms_state_change(struct sock *sk)
234{ 244{
235/* struct connection *con = sock2con(sk); */ 245 if (sk->sk_state == TCP_ESTABLISHED)
236
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk); 246 lowcomms_write_space(sk);
240 break;
241
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
252 */
253 /* lowcomms_connect_sock(con); */
254 break;
255
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
259 }
260} 247}
261 248
262/* Make a socket active */ 249/* Make a socket active */
@@ -277,13 +264,12 @@ static int add_sock(struct socket *sock, struct connection *con)
277static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 264static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len) 265 int *addr_len)
279{ 266{
280 saddr->ss_family = dlm_local_addr.ss_family; 267 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) { 268 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 269 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port); 270 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in); 271 *addr_len = sizeof(struct sockaddr_in);
285 } 272 } else {
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 273 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port); 274 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6); 275 *addr_len = sizeof(struct sockaddr_in6);
@@ -291,7 +277,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
291} 277}
292 278
293/* Close a remote connection and tidy up */ 279/* Close a remote connection and tidy up */
294static void close_connection(struct connection *con, int and_other) 280static void close_connection(struct connection *con, bool and_other)
295{ 281{
296 down_write(&con->sock_sem); 282 down_write(&con->sock_sem);
297 283
@@ -300,11 +286,8 @@ static void close_connection(struct connection *con, int and_other)
300 con->sock = NULL; 286 con->sock = NULL;
301 } 287 }
302 if (con->othercon && and_other) { 288 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code! 289 /* Will only re-enter once. */
304 Actually, this isn't a list so it 290 close_connection(con->othercon, false);
305 will only re-enter once.
306 */
307 close_connection(con->othercon, FALSE);
308 } 291 }
309 if (con->rx_page) { 292 if (con->rx_page) {
310 __free_page(con->rx_page); 293 __free_page(con->rx_page);
@@ -337,7 +320,7 @@ static int receive_from_sock(struct connection *con)
337 con->rx_page = alloc_page(GFP_ATOMIC); 320 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL) 321 if (con->rx_page == NULL)
339 goto out_resched; 322 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE); 323 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
341 } 324 }
342 325
343 msg.msg_control = NULL; 326 msg.msg_control = NULL;
@@ -352,16 +335,16 @@ static int receive_from_sock(struct connection *con)
352 * iov[0] is the bit of the circular buffer between the current end 335 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer. 336 * point (cb.base + cb.len) and the end of the buffer.
354 */ 337 */
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb); 338 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb); 339 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
357 iov[1].iov_len = 0; 340 iov[1].iov_len = 0;
358 341
359 /* 342 /*
360 * iov[1] is the bit of the circular buffer between the start of the 343 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base) 344 * buffer and the start of the currently used section (cb.base)
362 */ 345 */
363 if (CBUF_DATA(&con->cb) >= con->cb.base) { 346 if (cbuf_data(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb); 347 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
365 iov[1].iov_len = con->cb.base; 348 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page); 349 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2; 350 msg.msg_iovlen = 2;
@@ -378,7 +361,7 @@ static int receive_from_sock(struct connection *con)
378 goto out_close; 361 goto out_close;
379 if (ret == len) 362 if (ret == len)
380 call_again_soon = 1; 363 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret); 364 cbuf_add(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid, 365 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page), 366 page_address(con->rx_page),
384 con->cb.base, con->cb.len, 367 con->cb.base, con->cb.len,
@@ -391,35 +374,32 @@ static int receive_from_sock(struct connection *con)
391 } 374 }
392 if (ret < 0) 375 if (ret < 0)
393 goto out_close; 376 goto out_close;
394 CBUF_EAT(&con->cb, ret); 377 cbuf_eat(&con->cb, ret);
395 378
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) { 379 if (cbuf_empty(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page); 380 __free_page(con->rx_page);
398 con->rx_page = NULL; 381 con->rx_page = NULL;
399 } 382 }
400 383
401 out: 384out:
402 if (call_again_soon) 385 if (call_again_soon)
403 goto out_resched; 386 goto out_resched;
404 up_read(&con->sock_sem); 387 up_read(&con->sock_sem);
405 ret = 0; 388 return 0;
406 goto out_ret;
407 389
408 out_resched: 390out_resched:
409 lowcomms_data_ready(con->sock->sk, 0); 391 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem); 392 up_read(&con->sock_sem);
411 ret = 0; 393 cond_resched();
412 schedule(); 394 return 0;
413 goto out_ret;
414 395
415 out_close: 396out_close:
416 up_read(&con->sock_sem); 397 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { 398 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE); 399 close_connection(con, false);
419 /* Reconnect when there is something to send */ 400 /* Reconnect when there is something to send */
420 } 401 }
421 402
422 out_ret:
423 return ret; 403 return ret;
424} 404}
425 405
@@ -434,7 +414,8 @@ static int accept_from_sock(struct connection *con)
434 struct connection *newcon; 414 struct connection *newcon;
435 415
436 memset(&peeraddr, 0, sizeof(peeraddr)); 416 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); 417 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418 IPPROTO_TCP, &newsock);
438 if (result < 0) 419 if (result < 0)
439 return -ENOMEM; 420 return -ENOMEM;
440 421
@@ -462,7 +443,7 @@ static int accept_from_sock(struct connection *con)
462 /* Get the new node's NODEID */ 443 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len); 444 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 445 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n"); 446 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock); 447 sock_release(newsock);
467 up_read(&con->sock_sem); 448 up_read(&con->sock_sem);
468 return -1; 449 return -1;
@@ -483,17 +464,16 @@ static int accept_from_sock(struct connection *con)
483 } 464 }
484 down_write(&newcon->sock_sem); 465 down_write(&newcon->sock_sem);
485 if (newcon->sock) { 466 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon; 467 struct connection *othercon = newcon->othercon;
487 468
488 if (!othercon) { 469 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL); 470 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
490 if (!othercon) { 471 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n"); 472 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem); 473 up_write(&newcon->sock_sem);
493 result = -ENOMEM; 474 result = -ENOMEM;
494 goto accept_err; 475 goto accept_err;
495 } 476 }
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid; 477 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock; 478 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem); 479 init_rwsem(&othercon->sock_sem);
@@ -523,7 +503,7 @@ static int accept_from_sock(struct connection *con)
523 503
524 return 0; 504 return 0;
525 505
526 accept_err: 506accept_err:
527 up_read(&con->sock_sem); 507 up_read(&con->sock_sem);
528 sock_release(newsock); 508 sock_release(newsock);
529 509
@@ -533,7 +513,7 @@ static int accept_from_sock(struct connection *con)
533} 513}
534 514
535/* Connect a new socket to its peer */ 515/* Connect a new socket to its peer */
536static int connect_to_sock(struct connection *con) 516static void connect_to_sock(struct connection *con)
537{ 517{
538 int result = -EHOSTUNREACH; 518 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr; 519 struct sockaddr_storage saddr;
@@ -542,7 +522,7 @@ static int connect_to_sock(struct connection *con)
542 522
543 if (con->nodeid == 0) { 523 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled"); 524 log_print("attempt to connect sock 0 foiled");
545 return 0; 525 return;
546 } 526 }
547 527
548 down_write(&con->sock_sem); 528 down_write(&con->sock_sem);
@@ -556,13 +536,14 @@ static int connect_to_sock(struct connection *con)
556 } 536 }
557 537
558 /* Create a socket to communicate with */ 538 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); 539 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540 IPPROTO_TCP, &sock);
560 if (result < 0) 541 if (result < 0)
561 goto out_err; 542 goto out_err;
562 543
563 memset(&saddr, 0, sizeof(saddr)); 544 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 545 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err; 546 goto out_err;
566 547
567 sock->sk->sk_user_data = con; 548 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock; 549 con->rx_action = receive_from_sock;
@@ -574,22 +555,13 @@ static int connect_to_sock(struct connection *con)
574 log_print("connecting to %d", con->nodeid); 555 log_print("connecting to %d", con->nodeid);
575 result = 556 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 557 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK); 558 O_NONBLOCK);
578 if (result == -EINPROGRESS) 559 if (result == -EINPROGRESS)
579 result = 0; 560 result = 0;
580 if (result != 0) 561 if (result == 0)
581 goto out_err; 562 goto out;
582
583 out:
584 up_write(&con->sock_sem);
585 /*
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
589 */
590 return result;
591 563
592 out_err: 564out_err:
593 if (con->sock) { 565 if (con->sock) {
594 sock_release(con->sock); 566 sock_release(con->sock);
595 con->sock = NULL; 567 con->sock = NULL;
@@ -604,12 +576,15 @@ static int connect_to_sock(struct connection *con)
604 lowcomms_connect_sock(con); 576 lowcomms_connect_sock(con);
605 result = 0; 577 result = 0;
606 } 578 }
607 goto out; 579out:
580 up_write(&con->sock_sem);
581 return;
608} 582}
609 583
610static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) 584static struct socket *create_listen_sock(struct connection *con,
585 struct sockaddr_storage *saddr)
611{ 586{
612 struct socket *sock = NULL; 587 struct socket *sock = NULL;
613 mm_segment_t fs; 588 mm_segment_t fs;
614 int result = 0; 589 int result = 0;
615 int one = 1; 590 int one = 1;
@@ -629,10 +604,12 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
629 604
630 fs = get_fs(); 605 fs = get_fs();
631 set_fs(get_ds()); 606 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); 607 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608 (char *)&one, sizeof(one));
633 set_fs(fs); 609 set_fs(fs);
634 if (result < 0) { 610 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); 611 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612 result);
636 } 613 }
637 sock->sk->sk_user_data = con; 614 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock; 615 con->rx_action = accept_from_sock;
@@ -652,7 +629,8 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
652 fs = get_fs(); 629 fs = get_fs();
653 set_fs(get_ds()); 630 set_fs(get_ds());
654 631
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); 632 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633 (char *)&one, sizeof(one));
656 set_fs(fs); 634 set_fs(fs);
657 if (result < 0) { 635 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result); 636 printk("dlm: Set keepalive failed: %d\n", result);
@@ -666,7 +644,7 @@ static struct socket *create_listen_sock(struct connection *con, struct sockaddr
666 goto create_out; 644 goto create_out;
667 } 645 }
668 646
669 create_out: 647create_out:
670 return sock; 648 return sock;
671} 649}
672 650
@@ -679,10 +657,6 @@ static int listen_for_all(void)
679 int result = -EINVAL; 657 int result = -EINVAL;
680 658
681 /* We don't support multi-homed hosts */ 659 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags); 660 set_bit(CF_IS_OTHERCON, &con->flags);
687 661
688 sock = create_listen_sock(con, &dlm_local_addr); 662 sock = create_listen_sock(con, &dlm_local_addr);
@@ -731,16 +705,12 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
731 int offset = 0; 705 int offset = 0;
732 int users = 0; 706 int users = 0;
733 707
734 if (!atomic_read(&accepting))
735 return NULL;
736
737 con = nodeid2con(nodeid, allocation); 708 con = nodeid2con(nodeid, allocation);
738 if (!con) 709 if (!con)
739 return NULL; 710 return NULL;
740 711
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 712 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) || 713 if ((&e->list == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) { 714 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL; 715 e = NULL;
746 } else { 716 } else {
@@ -751,7 +721,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
751 spin_unlock(&con->writequeue_lock); 721 spin_unlock(&con->writequeue_lock);
752 722
753 if (e) { 723 if (e) {
754 got_one: 724 got_one:
755 if (users == 0) 725 if (users == 0)
756 kmap(e->page); 726 kmap(e->page);
757 *ppc = page_address(e->page) + offset; 727 *ppc = page_address(e->page) + offset;
@@ -777,10 +747,6 @@ void dlm_lowcomms_commit_buffer(void *mh)
777 struct connection *con = e->con; 747 struct connection *con = e->con;
778 int users; 748 int users;
779 749
780 if (!atomic_read(&accepting))
781 return;
782
783 spin_lock(&con->writequeue_lock);
784 users = --e->users; 750 users = --e->users;
785 if (users) 751 if (users)
786 goto out; 752 goto out;
@@ -797,7 +763,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
797 } 763 }
798 return; 764 return;
799 765
800 out: 766out:
801 spin_unlock(&con->writequeue_lock); 767 spin_unlock(&con->writequeue_lock);
802 return; 768 return;
803} 769}
@@ -809,7 +775,7 @@ static void free_entry(struct writequeue_entry *e)
809} 775}
810 776
811/* Send a message */ 777/* Send a message */
812static int send_to_sock(struct connection *con) 778static void send_to_sock(struct connection *con)
813{ 779{
814 int ret = 0; 780 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); 781 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
@@ -846,7 +812,7 @@ static int send_to_sock(struct connection *con)
846 } 812 }
847 else { 813 else {
848 /* Don't starve people filling buffers */ 814 /* Don't starve people filling buffers */
849 schedule(); 815 cond_resched();
850 } 816 }
851 817
852 spin_lock(&con->writequeue_lock); 818 spin_lock(&con->writequeue_lock);
@@ -855,25 +821,26 @@ static int send_to_sock(struct connection *con)
855 821
856 if (e->len == 0 && e->users == 0) { 822 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list); 823 list_del(&e->list);
824 kunmap(e->page);
858 free_entry(e); 825 free_entry(e);
859 continue; 826 continue;
860 } 827 }
861 } 828 }
862 spin_unlock(&con->writequeue_lock); 829 spin_unlock(&con->writequeue_lock);
863 out: 830out:
864 up_read(&con->sock_sem); 831 up_read(&con->sock_sem);
865 return ret; 832 return;
866 833
867 send_error: 834send_error:
868 up_read(&con->sock_sem); 835 up_read(&con->sock_sem);
869 close_connection(con, FALSE); 836 close_connection(con, false);
870 lowcomms_connect_sock(con); 837 lowcomms_connect_sock(con);
871 return ret; 838 return;
872 839
873 out_connect: 840out_connect:
874 up_read(&con->sock_sem); 841 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con); 842 lowcomms_connect_sock(con);
876 return 0; 843 return;
877} 844}
878 845
879static void clean_one_writequeue(struct connection *con) 846static void clean_one_writequeue(struct connection *con)
@@ -904,12 +871,12 @@ int dlm_lowcomms_close(int nodeid)
904 con = nodeid2con(nodeid, 0); 871 con = nodeid2con(nodeid, 0);
905 if (con) { 872 if (con) {
906 clean_one_writequeue(con); 873 clean_one_writequeue(con);
907 close_connection(con, TRUE); 874 close_connection(con, true);
908 atomic_set(&con->waiting_requests, 0); 875 atomic_set(&con->waiting_requests, 0);
909 } 876 }
910 return 0; 877 return 0;
911 878
912 out: 879out:
913 return -1; 880 return -1;
914} 881}
915 882
@@ -940,7 +907,7 @@ static void process_sockets(void)
940 list_for_each_safe(list, temp, &read_sockets) { 907 list_for_each_safe(list, temp, &read_sockets) {
941 908
942 struct connection *con = 909 struct connection *con =
943 list_entry(list, struct connection, read_list); 910 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list); 911 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags); 912 clear_bit(CF_READ_PENDING, &con->flags);
946 913
@@ -959,7 +926,7 @@ static void process_sockets(void)
959 926
960 /* Don't starve out everyone else */ 927 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) { 928 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule(); 929 cond_resched();
963 count = 0; 930 count = 0;
964 } 931 }
965 932
@@ -977,20 +944,16 @@ static void process_output_queue(void)
977{ 944{
978 struct list_head *list; 945 struct list_head *list;
979 struct list_head *temp; 946 struct list_head *temp;
980 int ret;
981 947
982 spin_lock_bh(&write_sockets_lock); 948 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) { 949 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con = 950 struct connection *con =
985 list_entry(list, struct connection, write_list); 951 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags); 952 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list); 953 list_del(&con->write_list);
988 954
989 spin_unlock_bh(&write_sockets_lock); 955 spin_unlock_bh(&write_sockets_lock);
990 956 send_to_sock(con);
991 ret = send_to_sock(con);
992 if (ret < 0) {
993 }
994 spin_lock_bh(&write_sockets_lock); 957 spin_lock_bh(&write_sockets_lock);
995 } 958 }
996 spin_unlock_bh(&write_sockets_lock); 959 spin_unlock_bh(&write_sockets_lock);
@@ -1000,19 +963,16 @@ static void process_state_queue(void)
1000{ 963{
1001 struct list_head *list; 964 struct list_head *list;
1002 struct list_head *temp; 965 struct list_head *temp;
1003 int ret;
1004 966
1005 spin_lock_bh(&state_sockets_lock); 967 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) { 968 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con = 969 struct connection *con =
1008 list_entry(list, struct connection, state_list); 970 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list); 971 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags); 972 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock); 973 spin_unlock_bh(&state_sockets_lock);
1012 974
1013 ret = connect_to_sock(con); 975 connect_to_sock(con);
1014 if (ret < 0) {
1015 }
1016 spin_lock_bh(&state_sockets_lock); 976 spin_lock_bh(&state_sockets_lock);
1017 } 977 }
1018 spin_unlock_bh(&state_sockets_lock); 978 spin_unlock_bh(&state_sockets_lock);
@@ -1046,14 +1006,13 @@ static int read_list_empty(void)
1046/* DLM Transport comms receive daemon */ 1006/* DLM Transport comms receive daemon */
1047static int dlm_recvd(void *data) 1007static int dlm_recvd(void *data)
1048{ 1008{
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current); 1009 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); 1010 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1052 1011
1053 while (!kthread_should_stop()) { 1012 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE); 1013 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty()) 1014 if (read_list_empty())
1056 schedule(); 1015 cond_resched();
1057 set_current_state(TASK_RUNNING); 1016 set_current_state(TASK_RUNNING);
1058 1017
1059 process_sockets(); 1018 process_sockets();
@@ -1081,14 +1040,13 @@ static int write_and_state_lists_empty(void)
1081/* DLM Transport send daemon */ 1040/* DLM Transport send daemon */
1082static int dlm_sendd(void *data) 1041static int dlm_sendd(void *data)
1083{ 1042{
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current); 1043 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); 1044 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1087 1045
1088 while (!kthread_should_stop()) { 1046 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE); 1047 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty()) 1048 if (write_and_state_lists_empty())
1091 schedule(); 1049 cond_resched();
1092 set_current_state(TASK_RUNNING); 1050 set_current_state(TASK_RUNNING);
1093 1051
1094 process_state_queue(); 1052 process_state_queue();
@@ -1111,7 +1069,7 @@ static int daemons_start(void)
1111 1069
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1070 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p); 1071 error = IS_ERR(p);
1114 if (error) { 1072 if (error) {
1115 log_print("can't start dlm_recvd %d", error); 1073 log_print("can't start dlm_recvd %d", error);
1116 return error; 1074 return error;
1117 } 1075 }
@@ -1119,7 +1077,7 @@ static int daemons_start(void)
1119 1077
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1078 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p); 1079 error = IS_ERR(p);
1122 if (error) { 1080 if (error) {
1123 log_print("can't start dlm_sendd %d", error); 1081 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task); 1082 kthread_stop(recv_task);
1125 return error; 1083 return error;
@@ -1141,21 +1099,20 @@ void dlm_lowcomms_stop(void)
1141{ 1099{
1142 int i; 1100 int i;
1143 1101
1144 atomic_set(&accepting, 0); 1102 /* Set all the flags to prevent any
1145
1146 /* Set all the activity flags to prevent any
1147 socket activity. 1103 socket activity.
1148 */ 1104 */
1149 for (i = 0; i < conn_array_size; i++) { 1105 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i]) 1106 if (connections[i])
1151 connections[i]->flags |= 0x7; 1107 connections[i]->flags |= 0xFF;
1152 } 1108 }
1109
1153 daemons_stop(); 1110 daemons_stop();
1154 clean_writequeues(); 1111 clean_writequeues();
1155 1112
1156 for (i = 0; i < conn_array_size; i++) { 1113 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) { 1114 if (connections[i]) {
1158 close_connection(connections[i], TRUE); 1115 close_connection(connections[i], true);
1159 if (connections[i]->othercon) 1116 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon); 1117 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]); 1118 kmem_cache_free(con_cache, connections[i]);
@@ -1173,24 +1130,12 @@ int dlm_lowcomms_start(void)
1173{ 1130{
1174 int error = 0; 1131 int error = 0;
1175 1132
1176 error = -ENOTCONN;
1177
1178 /*
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1182 */
1183 init_waitqueue_head(&lowcomms_send_waitq);
1184
1185 error = -ENOMEM; 1133 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) * 1134 connections = kzalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL); 1135 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections) 1136 if (!connections)
1189 goto out; 1137 goto out;
1190 1138
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1193
1194 conn_array_size = NODE_INCREMENT; 1139 conn_array_size = NODE_INCREMENT;
1195 1140
1196 if (dlm_our_addr(&dlm_local_addr, 0)) { 1141 if (dlm_our_addr(&dlm_local_addr, 0)) {
@@ -1203,7 +1148,8 @@ int dlm_lowcomms_start(void)
1203 } 1148 }
1204 1149
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1150 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL); 1151 __alignof__(struct connection), 0,
1152 NULL, NULL);
1207 if (!con_cache) 1153 if (!con_cache)
1208 goto fail_free_conn; 1154 goto fail_free_conn;
1209 1155
@@ -1217,40 +1163,20 @@ int dlm_lowcomms_start(void)
1217 if (error) 1163 if (error)
1218 goto fail_unlisten; 1164 goto fail_unlisten;
1219 1165
1220 atomic_set(&accepting, 1);
1221
1222 return 0; 1166 return 0;
1223 1167
1224 fail_unlisten: 1168fail_unlisten:
1225 close_connection(connections[0], 0); 1169 close_connection(connections[0], false);
1226 kmem_cache_free(con_cache, connections[0]); 1170 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache); 1171 kmem_cache_destroy(con_cache);
1228 1172
1229 fail_free_conn: 1173fail_free_conn:
1230 kfree(connections); 1174 kfree(connections);
1231 1175
1232 out: 1176out:
1233 return error; 1177 return error;
1234} 1178}
1235 1179
1236int dlm_lowcomms_init(void)
1237{
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1241
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1246
1247 return 0;
1248}
1249
1250void dlm_lowcomms_exit(void)
1251{
1252}
1253
1254/* 1180/*
1255 * Overrides for Emacs so that we follow Linus's tabbing style. 1181 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically 1182 * Emacs will notice this stuff at the end of the file and automatically
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 2d045e0daae1..a9a9618c0d3f 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -14,8 +14,6 @@
14#ifndef __LOWCOMMS_DOT_H__ 14#ifndef __LOWCOMMS_DOT_H__
15#define __LOWCOMMS_DOT_H__ 15#define __LOWCOMMS_DOT_H__
16 16
17int dlm_lowcomms_init(void);
18void dlm_lowcomms_exit(void);
19int dlm_lowcomms_start(void); 17int dlm_lowcomms_start(void);
20void dlm_lowcomms_stop(void); 18void dlm_lowcomms_stop(void);
21int dlm_lowcomms_close(int nodeid); 19int dlm_lowcomms_close(int nodeid);
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index a8da8dc36b2e..162fbae58fe5 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -16,7 +16,6 @@
16#include "lock.h" 16#include "lock.h"
17#include "user.h" 17#include "user.h"
18#include "memory.h" 18#include "memory.h"
19#include "lowcomms.h"
20#include "config.h" 19#include "config.h"
21 20
22#ifdef CONFIG_DLM_DEBUG 21#ifdef CONFIG_DLM_DEBUG
@@ -47,20 +46,14 @@ static int __init init_dlm(void)
47 if (error) 46 if (error)
48 goto out_config; 47 goto out_config;
49 48
50 error = dlm_lowcomms_init();
51 if (error)
52 goto out_debug;
53
54 error = dlm_user_init(); 49 error = dlm_user_init();
55 if (error) 50 if (error)
56 goto out_lowcomms; 51 goto out_debug;
57 52
58 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__); 53 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);
59 54
60 return 0; 55 return 0;
61 56
62 out_lowcomms:
63 dlm_lowcomms_exit();
64 out_debug: 57 out_debug:
65 dlm_unregister_debugfs(); 58 dlm_unregister_debugfs();
66 out_config: 59 out_config:
@@ -76,7 +69,6 @@ static int __init init_dlm(void)
76static void __exit exit_dlm(void) 69static void __exit exit_dlm(void)
77{ 70{
78 dlm_user_exit(); 71 dlm_user_exit();
79 dlm_lowcomms_exit();
80 dlm_config_exit(); 72 dlm_config_exit();
81 dlm_memory_exit(); 73 dlm_memory_exit();
82 dlm_lockspace_exit(); 74 dlm_lockspace_exit();