aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r--fs/dlm/lowcomms-tcp.c361
1 files changed, 95 insertions, 266 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index 9be3a440c42a..f1efd17b2614 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -96,10 +96,7 @@ static bool cbuf_empty(struct cbuf *cb)
96struct connection { 96struct connection {
97 struct socket *sock; /* NULL if not connected */ 97 struct socket *sock; /* NULL if not connected */
98 uint32_t nodeid; /* So we know who we are in the list */ 98 uint32_t nodeid; /* So we know who we are in the list */
99 struct rw_semaphore sock_sem; /* Stop connect races */ 99 struct mutex sock_mutex;
100 struct list_head read_list; /* On this list when ready for reading */
101 struct list_head write_list; /* On this list when ready for writing */
102 struct list_head state_list; /* On this list when ready to connect */
103 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 100 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
104#define CF_READ_PENDING 1 101#define CF_READ_PENDING 1
105#define CF_WRITE_PENDING 2 102#define CF_WRITE_PENDING 2
@@ -112,9 +109,10 @@ struct connection {
112 struct page *rx_page; 109 struct page *rx_page;
113 struct cbuf cb; 110 struct cbuf cb;
114 int retries; 111 int retries;
115 atomic_t waiting_requests;
116#define MAX_CONNECT_RETRIES 3 112#define MAX_CONNECT_RETRIES 3
117 struct connection *othercon; 113 struct connection *othercon;
114 struct work_struct rwork; /* Receive workqueue */
115 struct work_struct swork; /* Send workqueue */
118}; 116};
119#define sock2con(x) ((struct connection *)(x)->sk_user_data) 117#define sock2con(x) ((struct connection *)(x)->sk_user_data)
120 118
@@ -131,14 +129,9 @@ struct writequeue_entry {
131 129
132static struct sockaddr_storage dlm_local_addr; 130static struct sockaddr_storage dlm_local_addr;
133 131
134/* Manage daemons */ 132/* Work queues */
135static struct task_struct *recv_task; 133static struct workqueue_struct *recv_workqueue;
136static struct task_struct *send_task; 134static struct workqueue_struct *send_workqueue;
137
138static wait_queue_t lowcomms_send_waitq_head;
139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
140static wait_queue_t lowcomms_recv_waitq_head;
141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
142 135
143/* An array of pointers to connections, indexed by NODEID */ 136/* An array of pointers to connections, indexed by NODEID */
144static struct connection **connections; 137static struct connection **connections;
@@ -146,17 +139,8 @@ static DECLARE_MUTEX(connections_lock);
146static struct kmem_cache *con_cache; 139static struct kmem_cache *con_cache;
147static int conn_array_size; 140static int conn_array_size;
148 141
149/* List of sockets that have reads pending */ 142static void process_recv_sockets(struct work_struct *work);
150static LIST_HEAD(read_sockets); 143static void process_send_sockets(struct work_struct *work);
151static DEFINE_SPINLOCK(read_sockets_lock);
152
153/* List of sockets which have writes pending */
154static LIST_HEAD(write_sockets);
155static DEFINE_SPINLOCK(write_sockets_lock);
156
157/* List of sockets which have connects pending */
158static LIST_HEAD(state_sockets);
159static DEFINE_SPINLOCK(state_sockets_lock);
160 144
161static struct connection *nodeid2con(int nodeid, gfp_t allocation) 145static struct connection *nodeid2con(int nodeid, gfp_t allocation)
162{ 146{
@@ -186,9 +170,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
186 goto finish; 170 goto finish;
187 171
188 con->nodeid = nodeid; 172 con->nodeid = nodeid;
189 init_rwsem(&con->sock_sem); 173 mutex_init(&con->sock_mutex);
190 INIT_LIST_HEAD(&con->writequeue); 174 INIT_LIST_HEAD(&con->writequeue);
191 spin_lock_init(&con->writequeue_lock); 175 spin_lock_init(&con->writequeue_lock);
176 INIT_WORK(&con->swork, process_send_sockets);
177 INIT_WORK(&con->rwork, process_recv_sockets);
192 178
193 connections[nodeid] = con; 179 connections[nodeid] = con;
194 } 180 }
@@ -203,41 +189,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
203{ 189{
204 struct connection *con = sock2con(sk); 190 struct connection *con = sock2con(sk);
205 191
206 atomic_inc(&con->waiting_requests); 192 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
207 if (test_and_set_bit(CF_READ_PENDING, &con->flags)) 193 queue_work(recv_workqueue, &con->rwork);
208 return;
209
210 spin_lock_bh(&read_sockets_lock);
211 list_add_tail(&con->read_list, &read_sockets);
212 spin_unlock_bh(&read_sockets_lock);
213
214 wake_up_interruptible(&lowcomms_recv_waitq);
215} 194}
216 195
217static void lowcomms_write_space(struct sock *sk) 196static void lowcomms_write_space(struct sock *sk)
218{ 197{
219 struct connection *con = sock2con(sk); 198 struct connection *con = sock2con(sk);
220 199
221 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 200 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
222 return; 201 queue_work(send_workqueue, &con->swork);
223
224 spin_lock_bh(&write_sockets_lock);
225 list_add_tail(&con->write_list, &write_sockets);
226 spin_unlock_bh(&write_sockets_lock);
227
228 wake_up_interruptible(&lowcomms_send_waitq);
229} 202}
230 203
231static inline void lowcomms_connect_sock(struct connection *con) 204static inline void lowcomms_connect_sock(struct connection *con)
232{ 205{
233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 206 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
234 return; 207 queue_work(send_workqueue, &con->swork);
235
236 spin_lock_bh(&state_sockets_lock);
237 list_add_tail(&con->state_list, &state_sockets);
238 spin_unlock_bh(&state_sockets_lock);
239
240 wake_up_interruptible(&lowcomms_send_waitq);
241} 208}
242 209
243static void lowcomms_state_change(struct sock *sk) 210static void lowcomms_state_change(struct sock *sk)
@@ -279,7 +246,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
279/* Close a remote connection and tidy up */ 246/* Close a remote connection and tidy up */
280static void close_connection(struct connection *con, bool and_other) 247static void close_connection(struct connection *con, bool and_other)
281{ 248{
282 down_write(&con->sock_sem); 249 mutex_lock(&con->sock_mutex);
283 250
284 if (con->sock) { 251 if (con->sock) {
285 sock_release(con->sock); 252 sock_release(con->sock);
@@ -294,7 +261,7 @@ static void close_connection(struct connection *con, bool and_other)
294 con->rx_page = NULL; 261 con->rx_page = NULL;
295 } 262 }
296 con->retries = 0; 263 con->retries = 0;
297 up_write(&con->sock_sem); 264 mutex_unlock(&con->sock_mutex);
298} 265}
299 266
300/* Data received from remote end */ 267/* Data received from remote end */
@@ -308,10 +275,13 @@ static int receive_from_sock(struct connection *con)
308 int r; 275 int r;
309 int call_again_soon = 0; 276 int call_again_soon = 0;
310 277
311 down_read(&con->sock_sem); 278 mutex_lock(&con->sock_mutex);
279
280 if (con->sock == NULL) {
281 ret = -EAGAIN;
282 goto out_close;
283 }
312 284
313 if (con->sock == NULL)
314 goto out;
315 if (con->rx_page == NULL) { 285 if (con->rx_page == NULL) {
316 /* 286 /*
317 * This doesn't need to be atomic, but I think it should 287 * This doesn't need to be atomic, but I think it should
@@ -359,6 +329,9 @@ static int receive_from_sock(struct connection *con)
359 329
360 if (ret <= 0) 330 if (ret <= 0)
361 goto out_close; 331 goto out_close;
332 if (ret == -EAGAIN)
333 goto out_resched;
334
362 if (ret == len) 335 if (ret == len)
363 call_again_soon = 1; 336 call_again_soon = 1;
364 cbuf_add(&con->cb, ret); 337 cbuf_add(&con->cb, ret);
@@ -381,24 +354,26 @@ static int receive_from_sock(struct connection *con)
381 con->rx_page = NULL; 354 con->rx_page = NULL;
382 } 355 }
383 356
384out:
385 if (call_again_soon) 357 if (call_again_soon)
386 goto out_resched; 358 goto out_resched;
387 up_read(&con->sock_sem); 359 mutex_unlock(&con->sock_mutex);
388 return 0; 360 return 0;
389 361
390out_resched: 362out_resched:
391 lowcomms_data_ready(con->sock->sk, 0); 363 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
392 up_read(&con->sock_sem); 364 queue_work(recv_workqueue, &con->rwork);
393 cond_resched(); 365 mutex_unlock(&con->sock_mutex);
394 return 0; 366 return -EAGAIN;
395 367
396out_close: 368out_close:
397 up_read(&con->sock_sem); 369 mutex_unlock(&con->sock_mutex);
398 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { 370 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
399 close_connection(con, false); 371 close_connection(con, false);
400 /* Reconnect when there is something to send */ 372 /* Reconnect when there is something to send */
401 } 373 }
374 /* Don't return success if we really got EOF */
375 if (ret == 0)
376 ret = -EAGAIN;
402 377
403 return ret; 378 return ret;
404} 379}
@@ -412,6 +387,7 @@ static int accept_from_sock(struct connection *con)
412 int len; 387 int len;
413 int nodeid; 388 int nodeid;
414 struct connection *newcon; 389 struct connection *newcon;
390 struct connection *addcon;
415 391
416 memset(&peeraddr, 0, sizeof(peeraddr)); 392 memset(&peeraddr, 0, sizeof(peeraddr));
417 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, 393 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
@@ -419,7 +395,7 @@ static int accept_from_sock(struct connection *con)
419 if (result < 0) 395 if (result < 0)
420 return -ENOMEM; 396 return -ENOMEM;
421 397
422 down_read(&con->sock_sem); 398 mutex_lock_nested(&con->sock_mutex, 0);
423 399
424 result = -ENOTCONN; 400 result = -ENOTCONN;
425 if (con->sock == NULL) 401 if (con->sock == NULL)
@@ -445,7 +421,7 @@ static int accept_from_sock(struct connection *con)
445 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 421 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
446 printk("dlm: connect from non cluster node\n"); 422 printk("dlm: connect from non cluster node\n");
447 sock_release(newsock); 423 sock_release(newsock);
448 up_read(&con->sock_sem); 424 mutex_unlock(&con->sock_mutex);
449 return -1; 425 return -1;
450 } 426 }
451 427
@@ -462,7 +438,7 @@ static int accept_from_sock(struct connection *con)
462 result = -ENOMEM; 438 result = -ENOMEM;
463 goto accept_err; 439 goto accept_err;
464 } 440 }
465 down_write(&newcon->sock_sem); 441 mutex_lock_nested(&newcon->sock_mutex, 1);
466 if (newcon->sock) { 442 if (newcon->sock) {
467 struct connection *othercon = newcon->othercon; 443 struct connection *othercon = newcon->othercon;
468 444
@@ -470,41 +446,45 @@ static int accept_from_sock(struct connection *con)
470 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); 446 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
471 if (!othercon) { 447 if (!othercon) {
472 printk("dlm: failed to allocate incoming socket\n"); 448 printk("dlm: failed to allocate incoming socket\n");
473 up_write(&newcon->sock_sem); 449 mutex_unlock(&newcon->sock_mutex);
474 result = -ENOMEM; 450 result = -ENOMEM;
475 goto accept_err; 451 goto accept_err;
476 } 452 }
477 othercon->nodeid = nodeid; 453 othercon->nodeid = nodeid;
478 othercon->rx_action = receive_from_sock; 454 othercon->rx_action = receive_from_sock;
479 init_rwsem(&othercon->sock_sem); 455 mutex_init(&othercon->sock_mutex);
456 INIT_WORK(&othercon->swork, process_send_sockets);
457 INIT_WORK(&othercon->rwork, process_recv_sockets);
480 set_bit(CF_IS_OTHERCON, &othercon->flags); 458 set_bit(CF_IS_OTHERCON, &othercon->flags);
481 newcon->othercon = othercon; 459 newcon->othercon = othercon;
482 } 460 }
483 othercon->sock = newsock; 461 othercon->sock = newsock;
484 newsock->sk->sk_user_data = othercon; 462 newsock->sk->sk_user_data = othercon;
485 add_sock(newsock, othercon); 463 add_sock(newsock, othercon);
464 addcon = othercon;
486 } 465 }
487 else { 466 else {
488 newsock->sk->sk_user_data = newcon; 467 newsock->sk->sk_user_data = newcon;
489 newcon->rx_action = receive_from_sock; 468 newcon->rx_action = receive_from_sock;
490 add_sock(newsock, newcon); 469 add_sock(newsock, newcon);
491 470 addcon = newcon;
492 } 471 }
493 472
494 up_write(&newcon->sock_sem); 473 mutex_unlock(&newcon->sock_mutex);
495 474
496 /* 475 /*
497 * Add it to the active queue in case we got data 476 * Add it to the active queue in case we got data
498 * beween processing the accept adding the socket 477 * beween processing the accept adding the socket
499 * to the read_sockets list 478 * to the read_sockets list
500 */ 479 */
501 lowcomms_data_ready(newsock->sk, 0); 480 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
502 up_read(&con->sock_sem); 481 queue_work(recv_workqueue, &addcon->rwork);
482 mutex_unlock(&con->sock_mutex);
503 483
504 return 0; 484 return 0;
505 485
506accept_err: 486accept_err:
507 up_read(&con->sock_sem); 487 mutex_unlock(&con->sock_mutex);
508 sock_release(newsock); 488 sock_release(newsock);
509 489
510 if (result != -EAGAIN) 490 if (result != -EAGAIN)
@@ -525,7 +505,7 @@ static void connect_to_sock(struct connection *con)
525 return; 505 return;
526 } 506 }
527 507
528 down_write(&con->sock_sem); 508 mutex_lock(&con->sock_mutex);
529 if (con->retries++ > MAX_CONNECT_RETRIES) 509 if (con->retries++ > MAX_CONNECT_RETRIES)
530 goto out; 510 goto out;
531 511
@@ -548,7 +528,7 @@ static void connect_to_sock(struct connection *con)
548 sock->sk->sk_user_data = con; 528 sock->sk->sk_user_data = con;
549 con->rx_action = receive_from_sock; 529 con->rx_action = receive_from_sock;
550 530
551 make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len); 531 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
552 532
553 add_sock(sock, con); 533 add_sock(sock, con);
554 534
@@ -577,7 +557,7 @@ out_err:
577 result = 0; 557 result = 0;
578 } 558 }
579out: 559out:
580 up_write(&con->sock_sem); 560 mutex_unlock(&con->sock_mutex);
581 return; 561 return;
582} 562}
583 563
@@ -616,10 +596,10 @@ static struct socket *create_listen_sock(struct connection *con,
616 con->sock = sock; 596 con->sock = sock;
617 597
618 /* Bind to our port */ 598 /* Bind to our port */
619 make_sockaddr(saddr, dlm_config.tcp_port, &addr_len); 599 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
620 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 600 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
621 if (result < 0) { 601 if (result < 0) {
622 printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port); 602 printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
623 sock_release(sock); 603 sock_release(sock);
624 sock = NULL; 604 sock = NULL;
625 con->sock = NULL; 605 con->sock = NULL;
@@ -638,7 +618,7 @@ static struct socket *create_listen_sock(struct connection *con,
638 618
639 result = sock->ops->listen(sock, 5); 619 result = sock->ops->listen(sock, 5);
640 if (result < 0) { 620 if (result < 0) {
641 printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port); 621 printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port);
642 sock_release(sock); 622 sock_release(sock);
643 sock = NULL; 623 sock = NULL;
644 goto create_out; 624 goto create_out;
@@ -709,6 +689,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
709 if (!con) 689 if (!con)
710 return NULL; 690 return NULL;
711 691
692 spin_lock(&con->writequeue_lock);
712 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 693 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
713 if ((&e->list == &con->writequeue) || 694 if ((&e->list == &con->writequeue) ||
714 (PAGE_CACHE_SIZE - e->end < len)) { 695 (PAGE_CACHE_SIZE - e->end < len)) {
@@ -747,6 +728,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
747 struct connection *con = e->con; 728 struct connection *con = e->con;
748 int users; 729 int users;
749 730
731 spin_lock(&con->writequeue_lock);
750 users = --e->users; 732 users = --e->users;
751 if (users) 733 if (users)
752 goto out; 734 goto out;
@@ -754,12 +736,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
754 kunmap(e->page); 736 kunmap(e->page);
755 spin_unlock(&con->writequeue_lock); 737 spin_unlock(&con->writequeue_lock);
756 738
757 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { 739 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
758 spin_lock_bh(&write_sockets_lock); 740 queue_work(send_workqueue, &con->swork);
759 list_add_tail(&con->write_list, &write_sockets);
760 spin_unlock_bh(&write_sockets_lock);
761
762 wake_up_interruptible(&lowcomms_send_waitq);
763 } 741 }
764 return; 742 return;
765 743
@@ -783,7 +761,7 @@ static void send_to_sock(struct connection *con)
783 struct writequeue_entry *e; 761 struct writequeue_entry *e;
784 int len, offset; 762 int len, offset;
785 763
786 down_read(&con->sock_sem); 764 mutex_lock(&con->sock_mutex);
787 if (con->sock == NULL) 765 if (con->sock == NULL)
788 goto out_connect; 766 goto out_connect;
789 767
@@ -800,6 +778,7 @@ static void send_to_sock(struct connection *con)
800 offset = e->offset; 778 offset = e->offset;
801 BUG_ON(len == 0 && e->users == 0); 779 BUG_ON(len == 0 && e->users == 0);
802 spin_unlock(&con->writequeue_lock); 780 spin_unlock(&con->writequeue_lock);
781 kmap(e->page);
803 782
804 ret = 0; 783 ret = 0;
805 if (len) { 784 if (len) {
@@ -828,18 +807,18 @@ static void send_to_sock(struct connection *con)
828 } 807 }
829 spin_unlock(&con->writequeue_lock); 808 spin_unlock(&con->writequeue_lock);
830out: 809out:
831 up_read(&con->sock_sem); 810 mutex_unlock(&con->sock_mutex);
832 return; 811 return;
833 812
834send_error: 813send_error:
835 up_read(&con->sock_sem); 814 mutex_unlock(&con->sock_mutex);
836 close_connection(con, false); 815 close_connection(con, false);
837 lowcomms_connect_sock(con); 816 lowcomms_connect_sock(con);
838 return; 817 return;
839 818
840out_connect: 819out_connect:
841 up_read(&con->sock_sem); 820 mutex_unlock(&con->sock_mutex);
842 lowcomms_connect_sock(con); 821 connect_to_sock(con);
843 return; 822 return;
844} 823}
845 824
@@ -872,7 +851,6 @@ int dlm_lowcomms_close(int nodeid)
872 if (con) { 851 if (con) {
873 clean_one_writequeue(con); 852 clean_one_writequeue(con);
874 close_connection(con, true); 853 close_connection(con, true);
875 atomic_set(&con->waiting_requests, 0);
876 } 854 }
877 return 0; 855 return 0;
878 856
@@ -880,102 +858,29 @@ out:
880 return -1; 858 return -1;
881} 859}
882 860
883/* API send message call, may queue the request */
884/* N.B. This is the old interface - use the new one for new calls */
885int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
886{
887 struct writequeue_entry *e;
888 char *b;
889
890 e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
891 if (e) {
892 memcpy(b, buf, len);
893 dlm_lowcomms_commit_buffer(e);
894 return 0;
895 }
896 return -ENOBUFS;
897}
898
899/* Look for activity on active sockets */ 861/* Look for activity on active sockets */
900static void process_sockets(void) 862static void process_recv_sockets(struct work_struct *work)
901{ 863{
902 struct list_head *list; 864 struct connection *con = container_of(work, struct connection, rwork);
903 struct list_head *temp; 865 int err;
904 int count = 0;
905
906 spin_lock_bh(&read_sockets_lock);
907 list_for_each_safe(list, temp, &read_sockets) {
908 866
909 struct connection *con = 867 clear_bit(CF_READ_PENDING, &con->flags);
910 list_entry(list, struct connection, read_list); 868 do {
911 list_del(&con->read_list); 869 err = con->rx_action(con);
912 clear_bit(CF_READ_PENDING, &con->flags); 870 } while (!err);
913
914 spin_unlock_bh(&read_sockets_lock);
915
916 /* This can reach zero if we are processing requests
917 * as they come in.
918 */
919 if (atomic_read(&con->waiting_requests) == 0) {
920 spin_lock_bh(&read_sockets_lock);
921 continue;
922 }
923
924 do {
925 con->rx_action(con);
926
927 /* Don't starve out everyone else */
928 if (++count >= MAX_RX_MSG_COUNT) {
929 cond_resched();
930 count = 0;
931 }
932
933 } while (!atomic_dec_and_test(&con->waiting_requests) &&
934 !kthread_should_stop());
935
936 spin_lock_bh(&read_sockets_lock);
937 }
938 spin_unlock_bh(&read_sockets_lock);
939} 871}
940 872
941/* Try to send any messages that are pending
942 */
943static void process_output_queue(void)
944{
945 struct list_head *list;
946 struct list_head *temp;
947
948 spin_lock_bh(&write_sockets_lock);
949 list_for_each_safe(list, temp, &write_sockets) {
950 struct connection *con =
951 list_entry(list, struct connection, write_list);
952 clear_bit(CF_WRITE_PENDING, &con->flags);
953 list_del(&con->write_list);
954
955 spin_unlock_bh(&write_sockets_lock);
956 send_to_sock(con);
957 spin_lock_bh(&write_sockets_lock);
958 }
959 spin_unlock_bh(&write_sockets_lock);
960}
961 873
962static void process_state_queue(void) 874static void process_send_sockets(struct work_struct *work)
963{ 875{
964 struct list_head *list; 876 struct connection *con = container_of(work, struct connection, swork);
965 struct list_head *temp;
966
967 spin_lock_bh(&state_sockets_lock);
968 list_for_each_safe(list, temp, &state_sockets) {
969 struct connection *con =
970 list_entry(list, struct connection, state_list);
971 list_del(&con->state_list);
972 clear_bit(CF_CONNECT_PENDING, &con->flags);
973 spin_unlock_bh(&state_sockets_lock);
974 877
878 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
975 connect_to_sock(con); 879 connect_to_sock(con);
976 spin_lock_bh(&state_sockets_lock);
977 } 880 }
978 spin_unlock_bh(&state_sockets_lock); 881
882 clear_bit(CF_WRITE_PENDING, &con->flags);
883 send_to_sock(con);
979} 884}
980 885
981 886
@@ -992,109 +897,33 @@ static void clean_writequeues(void)
992 } 897 }
993} 898}
994 899
995static int read_list_empty(void) 900static void work_stop(void)
996{ 901{
997 int status; 902 destroy_workqueue(recv_workqueue);
998 903 destroy_workqueue(send_workqueue);
999 spin_lock_bh(&read_sockets_lock);
1000 status = list_empty(&read_sockets);
1001 spin_unlock_bh(&read_sockets_lock);
1002
1003 return status;
1004}
1005
1006/* DLM Transport comms receive daemon */
1007static int dlm_recvd(void *data)
1008{
1009 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1010 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1011
1012 while (!kthread_should_stop()) {
1013 set_current_state(TASK_INTERRUPTIBLE);
1014 if (read_list_empty())
1015 cond_resched();
1016 set_current_state(TASK_RUNNING);
1017
1018 process_sockets();
1019 }
1020
1021 return 0;
1022} 904}
1023 905
1024static int write_and_state_lists_empty(void) 906static int work_start(void)
1025{ 907{
1026 int status;
1027
1028 spin_lock_bh(&write_sockets_lock);
1029 status = list_empty(&write_sockets);
1030 spin_unlock_bh(&write_sockets_lock);
1031
1032 spin_lock_bh(&state_sockets_lock);
1033 if (list_empty(&state_sockets) == 0)
1034 status = 0;
1035 spin_unlock_bh(&state_sockets_lock);
1036
1037 return status;
1038}
1039
1040/* DLM Transport send daemon */
1041static int dlm_sendd(void *data)
1042{
1043 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1044 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1045
1046 while (!kthread_should_stop()) {
1047 set_current_state(TASK_INTERRUPTIBLE);
1048 if (write_and_state_lists_empty())
1049 cond_resched();
1050 set_current_state(TASK_RUNNING);
1051
1052 process_state_queue();
1053 process_output_queue();
1054 }
1055
1056 return 0;
1057}
1058
1059static void daemons_stop(void)
1060{
1061 kthread_stop(recv_task);
1062 kthread_stop(send_task);
1063}
1064
1065static int daemons_start(void)
1066{
1067 struct task_struct *p;
1068 int error; 908 int error;
1069 909 recv_workqueue = create_workqueue("dlm_recv");
1070 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 910 error = IS_ERR(recv_workqueue);
1071 error = IS_ERR(p);
1072 if (error) { 911 if (error) {
1073 log_print("can't start dlm_recvd %d", error); 912 log_print("can't start dlm_recv %d", error);
1074 return error; 913 return error;
1075 } 914 }
1076 recv_task = p;
1077 915
1078 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 916 send_workqueue = create_singlethread_workqueue("dlm_send");
1079 error = IS_ERR(p); 917 error = IS_ERR(send_workqueue);
1080 if (error) { 918 if (error) {
1081 log_print("can't start dlm_sendd %d", error); 919 log_print("can't start dlm_send %d", error);
1082 kthread_stop(recv_task); 920 destroy_workqueue(recv_workqueue);
1083 return error; 921 return error;
1084 } 922 }
1085 send_task = p;
1086 923
1087 return 0; 924 return 0;
1088} 925}
1089 926
1090/*
1091 * Return the largest buffer size we can cope with.
1092 */
1093int lowcomms_max_buffer_size(void)
1094{
1095 return PAGE_CACHE_SIZE;
1096}
1097
1098void dlm_lowcomms_stop(void) 927void dlm_lowcomms_stop(void)
1099{ 928{
1100 int i; 929 int i;
@@ -1107,7 +936,7 @@ void dlm_lowcomms_stop(void)
1107 connections[i]->flags |= 0xFF; 936 connections[i]->flags |= 0xFF;
1108 } 937 }
1109 938
1110 daemons_stop(); 939 work_stop();
1111 clean_writequeues(); 940 clean_writequeues();
1112 941
1113 for (i = 0; i < conn_array_size; i++) { 942 for (i = 0; i < conn_array_size; i++) {
@@ -1159,7 +988,7 @@ int dlm_lowcomms_start(void)
1159 if (error) 988 if (error)
1160 goto fail_unlisten; 989 goto fail_unlisten;
1161 990
1162 error = daemons_start(); 991 error = work_start();
1163 if (error) 992 if (error)
1164 goto fail_unlisten; 993 goto fail_unlisten;
1165 994