aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorPatrick Caulfield <pcaulfie@redhat.com>2007-01-15 09:33:34 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2007-02-05 13:36:52 -0500
commit1d6e8131cf0064ef5ab5f3411a82b800afbfadee (patch)
treed165b0210a2c12de8848b2b8bc30237183b611dc /fs
parent03dc6a538e42bcc8d5dfabcee208b639db85a80c (diff)
[DLM] Use workqueues for dlm lowcomms
This patch converts the DLM TCP lowcomms to use workqueues rather than using its own daemon functions. Simultaneously removing a lot of code and making it more scalable on multi-processor machines. Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/lowcomms-sctp.c145
-rw-r--r--fs/dlm/lowcomms-tcp.c254
2 files changed, 115 insertions, 284 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
index 5aeadadd8afd..dc83a9d979b5 100644
--- a/fs/dlm/lowcomms-sctp.c
+++ b/fs/dlm/lowcomms-sctp.c
@@ -72,6 +72,8 @@ struct nodeinfo {
72 struct list_head writequeue; /* outgoing writequeue_entries */ 72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock; 73 spinlock_t writequeue_lock;
74 int nodeid; 74 int nodeid;
75 struct work_struct swork; /* Send workqueue */
76 struct work_struct lwork; /* Locking workqueue */
75}; 77};
76 78
77static DEFINE_IDR(nodeinfo_idr); 79static DEFINE_IDR(nodeinfo_idr);
@@ -96,6 +98,7 @@ struct connection {
96 atomic_t waiting_requests; 98 atomic_t waiting_requests;
97 struct cbuf cb; 99 struct cbuf cb;
98 int eagain_flag; 100 int eagain_flag;
101 struct work_struct work; /* Send workqueue */
99}; 102};
100 103
101/* An entry waiting to be sent */ 104/* An entry waiting to be sent */
@@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n)
137static LIST_HEAD(write_nodes); 140static LIST_HEAD(write_nodes);
138static DEFINE_SPINLOCK(write_nodes_lock); 141static DEFINE_SPINLOCK(write_nodes_lock);
139 142
143
140/* Maximum number of incoming messages to process before 144/* Maximum number of incoming messages to process before
141 * doing a schedule() 145 * doing a schedule()
142 */ 146 */
143#define MAX_RX_MSG_COUNT 25 147#define MAX_RX_MSG_COUNT 25
144 148
145/* Manage daemons */ 149/* Work queues */
146static struct task_struct *recv_task; 150static struct workqueue_struct *recv_workqueue;
147static struct task_struct *send_task; 151static struct workqueue_struct *send_workqueue;
148static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); 152static struct workqueue_struct *lock_workqueue;
149 153
150/* The SCTP connection */ 154/* The SCTP connection */
151static struct connection sctp_con; 155static struct connection sctp_con;
152 156
157static void process_send_sockets(struct work_struct *work);
158static void process_recv_sockets(struct work_struct *work);
159static void process_lock_request(struct work_struct *work);
153 160
154static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 161static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
155{ 162{
@@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
222 spin_lock_init(&ni->lock); 229 spin_lock_init(&ni->lock);
223 INIT_LIST_HEAD(&ni->writequeue); 230 INIT_LIST_HEAD(&ni->writequeue);
224 spin_lock_init(&ni->writequeue_lock); 231 spin_lock_init(&ni->writequeue_lock);
232 INIT_WORK(&ni->lwork, process_lock_request);
233 INIT_WORK(&ni->swork, process_send_sockets);
225 ni->nodeid = nodeid; 234 ni->nodeid = nodeid;
226 235
227 if (nodeid > max_nodeid) 236 if (nodeid > max_nodeid)
@@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
249/* Data or notification available on socket */ 258/* Data or notification available on socket */
250static void lowcomms_data_ready(struct sock *sk, int count_unused) 259static void lowcomms_data_ready(struct sock *sk, int count_unused)
251{ 260{
252 atomic_inc(&sctp_con.waiting_requests);
253 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) 261 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
254 return; 262 queue_work(recv_workqueue, &sctp_con.work);
255
256 wake_up_interruptible(&lowcomms_recv_wait);
257} 263}
258 264
259 265
@@ -361,10 +367,10 @@ static void init_failed(void)
361 spin_lock_bh(&write_nodes_lock); 367 spin_lock_bh(&write_nodes_lock);
362 list_add_tail(&ni->write_list, &write_nodes); 368 list_add_tail(&ni->write_list, &write_nodes);
363 spin_unlock_bh(&write_nodes_lock); 369 spin_unlock_bh(&write_nodes_lock);
370 queue_work(send_workqueue, &ni->swork);
364 } 371 }
365 } 372 }
366 } 373 }
367 wake_up_process(send_task);
368} 374}
369 375
370/* Something happened to an association */ 376/* Something happened to an association */
@@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
446 spin_lock_bh(&write_nodes_lock); 452 spin_lock_bh(&write_nodes_lock);
447 list_add_tail(&ni->write_list, &write_nodes); 453 list_add_tail(&ni->write_list, &write_nodes);
448 spin_unlock_bh(&write_nodes_lock); 454 spin_unlock_bh(&write_nodes_lock);
455 queue_work(send_workqueue, &ni->swork);
449 } 456 }
450 wake_up_process(send_task);
451 } 457 }
452 break; 458 break;
453 459
@@ -580,8 +586,8 @@ static int receive_from_sock(void)
580 spin_lock_bh(&write_nodes_lock); 586 spin_lock_bh(&write_nodes_lock);
581 list_add_tail(&ni->write_list, &write_nodes); 587 list_add_tail(&ni->write_list, &write_nodes);
582 spin_unlock_bh(&write_nodes_lock); 588 spin_unlock_bh(&write_nodes_lock);
589 queue_work(send_workqueue, &ni->swork);
583 } 590 }
584 wake_up_process(send_task);
585 } 591 }
586 } 592 }
587 593
@@ -590,6 +596,7 @@ static int receive_from_sock(void)
590 return 0; 596 return 0;
591 597
592 cbuf_add(&sctp_con.cb, ret); 598 cbuf_add(&sctp_con.cb, ret);
599 // PJC: TODO: Add to node's workqueue....can we ??
593 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), 600 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
594 page_address(sctp_con.rx_page), 601 page_address(sctp_con.rx_page),
595 sctp_con.cb.base, sctp_con.cb.len, 602 sctp_con.cb.base, sctp_con.cb.len,
@@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg)
820 spin_lock_bh(&write_nodes_lock); 827 spin_lock_bh(&write_nodes_lock);
821 list_add_tail(&ni->write_list, &write_nodes); 828 list_add_tail(&ni->write_list, &write_nodes);
822 spin_unlock_bh(&write_nodes_lock); 829 spin_unlock_bh(&write_nodes_lock);
823 wake_up_process(send_task); 830
831 queue_work(send_workqueue, &ni->swork);
824 } 832 }
825 return; 833 return;
826 834
@@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid)
1088 return 0; 1096 return 0;
1089} 1097}
1090 1098
1091static int write_list_empty(void) 1099// PJC: The work queue function for receiving.
1100static void process_recv_sockets(struct work_struct *work)
1092{ 1101{
1093 int status; 1102 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1103 int ret;
1104 int count = 0;
1094 1105
1095 spin_lock_bh(&write_nodes_lock); 1106 do {
1096 status = list_empty(&write_nodes); 1107 ret = receive_from_sock();
1097 spin_unlock_bh(&write_nodes_lock);
1098 1108
1099 return status; 1109 /* Don't starve out everyone else */
1110 if (++count >= MAX_RX_MSG_COUNT) {
1111 cond_resched();
1112 count = 0;
1113 }
1114 } while (!kthread_should_stop() && ret >=0);
1115 }
1116 cond_resched();
1100} 1117}
1101 1118
1102static int dlm_recvd(void *data) 1119// PJC: the work queue function for sending
1120static void process_send_sockets(struct work_struct *work)
1103{ 1121{
1104 DECLARE_WAITQUEUE(wait, current); 1122 if (sctp_con.eagain_flag) {
1105 1123 sctp_con.eagain_flag = 0;
1106 while (!kthread_should_stop()) { 1124 refill_write_queue();
1107 int count = 0;
1108
1109 set_current_state(TASK_INTERRUPTIBLE);
1110 add_wait_queue(&lowcomms_recv_wait, &wait);
1111 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1112 schedule();
1113 remove_wait_queue(&lowcomms_recv_wait, &wait);
1114 set_current_state(TASK_RUNNING);
1115
1116 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1117 int ret;
1118
1119 do {
1120 ret = receive_from_sock();
1121
1122 /* Don't starve out everyone else */
1123 if (++count >= MAX_RX_MSG_COUNT) {
1124 cond_resched();
1125 count = 0;
1126 }
1127 } while (!kthread_should_stop() && ret >=0);
1128 }
1129 cond_resched();
1130 } 1125 }
1131 1126 process_output_queue();
1132 return 0;
1133} 1127}
1134 1128
1135static int dlm_sendd(void *data) 1129// PJC: Process lock requests from a particular node.
1130// TODO: can we optimise this out on UP ??
1131static void process_lock_request(struct work_struct *work)
1136{ 1132{
1137 DECLARE_WAITQUEUE(wait, current);
1138
1139 add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1140
1141 while (!kthread_should_stop()) {
1142 set_current_state(TASK_INTERRUPTIBLE);
1143 if (write_list_empty())
1144 schedule();
1145 set_current_state(TASK_RUNNING);
1146
1147 if (sctp_con.eagain_flag) {
1148 sctp_con.eagain_flag = 0;
1149 refill_write_queue();
1150 }
1151 process_output_queue();
1152 }
1153
1154 remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1155
1156 return 0;
1157} 1133}
1158 1134
1159static void daemons_stop(void) 1135static void daemons_stop(void)
1160{ 1136{
1161 kthread_stop(recv_task); 1137 destroy_workqueue(recv_workqueue);
1162 kthread_stop(send_task); 1138 destroy_workqueue(send_workqueue);
1139 destroy_workqueue(lock_workqueue);
1163} 1140}
1164 1141
1165static int daemons_start(void) 1142static int daemons_start(void)
1166{ 1143{
1167 struct task_struct *p;
1168 int error; 1144 int error;
1145 recv_workqueue = create_workqueue("dlm_recv");
1146 error = IS_ERR(recv_workqueue);
1147 if (error) {
1148 log_print("can't start dlm_recv %d", error);
1149 return error;
1150 }
1169 1151
1170 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1152 send_workqueue = create_singlethread_workqueue("dlm_send");
1171 error = IS_ERR(p); 1153 error = IS_ERR(send_workqueue);
1172 if (error) { 1154 if (error) {
1173 log_print("can't start dlm_recvd %d", error); 1155 log_print("can't start dlm_send %d", error);
1156 destroy_workqueue(recv_workqueue);
1174 return error; 1157 return error;
1175 } 1158 }
1176 recv_task = p;
1177 1159
1178 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1160 lock_workqueue = create_workqueue("dlm_rlock");
1179 error = IS_ERR(p); 1161 error = IS_ERR(lock_workqueue);
1180 if (error) { 1162 if (error) {
1181 log_print("can't start dlm_sendd %d", error); 1163 log_print("can't start dlm_rlock %d", error);
1182 kthread_stop(recv_task); 1164 destroy_workqueue(send_workqueue);
1165 destroy_workqueue(recv_workqueue);
1183 return error; 1166 return error;
1184 } 1167 }
1185 send_task = p;
1186 1168
1187 return 0; 1169 return 0;
1188} 1170}
@@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void)
1194{ 1176{
1195 int error; 1177 int error;
1196 1178
1179 INIT_WORK(&sctp_con.work, process_recv_sockets);
1180
1197 error = init_sock(); 1181 error = init_sock();
1198 if (error) 1182 if (error)
1199 goto fail_sock; 1183 goto fail_sock;
@@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void)
1224 for (i = 0; i < dlm_local_count; i++) 1208 for (i = 0; i < dlm_local_count; i++)
1225 kfree(dlm_local_addr[i]); 1209 kfree(dlm_local_addr[i]);
1226} 1210}
1227
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index b4fb5783ef8a..86e5f81da7cb 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -115,6 +115,8 @@ struct connection {
115 atomic_t waiting_requests; 115 atomic_t waiting_requests;
116#define MAX_CONNECT_RETRIES 3 116#define MAX_CONNECT_RETRIES 3
117 struct connection *othercon; 117 struct connection *othercon;
118 struct work_struct rwork; /* Receive workqueue */
119 struct work_struct swork; /* Send workqueue */
118}; 120};
119#define sock2con(x) ((struct connection *)(x)->sk_user_data) 121#define sock2con(x) ((struct connection *)(x)->sk_user_data)
120 122
@@ -131,14 +133,9 @@ struct writequeue_entry {
131 133
132static struct sockaddr_storage dlm_local_addr; 134static struct sockaddr_storage dlm_local_addr;
133 135
134/* Manage daemons */ 136/* Work queues */
135static struct task_struct *recv_task; 137static struct workqueue_struct *recv_workqueue;
136static struct task_struct *send_task; 138static struct workqueue_struct *send_workqueue;
137
138static wait_queue_t lowcomms_send_waitq_head;
139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
140static wait_queue_t lowcomms_recv_waitq_head;
141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
142 139
143/* An array of pointers to connections, indexed by NODEID */ 140/* An array of pointers to connections, indexed by NODEID */
144static struct connection **connections; 141static struct connection **connections;
@@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock);
146static struct kmem_cache *con_cache; 143static struct kmem_cache *con_cache;
147static int conn_array_size; 144static int conn_array_size;
148 145
149/* List of sockets that have reads pending */ 146static void process_recv_sockets(struct work_struct *work);
150static LIST_HEAD(read_sockets); 147static void process_send_sockets(struct work_struct *work);
151static DEFINE_SPINLOCK(read_sockets_lock);
152
153/* List of sockets which have writes pending */
154static LIST_HEAD(write_sockets);
155static DEFINE_SPINLOCK(write_sockets_lock);
156
157/* List of sockets which have connects pending */
158static LIST_HEAD(state_sockets);
159static DEFINE_SPINLOCK(state_sockets_lock);
160 148
161static struct connection *nodeid2con(int nodeid, gfp_t allocation) 149static struct connection *nodeid2con(int nodeid, gfp_t allocation)
162{ 150{
@@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
189 init_rwsem(&con->sock_sem); 177 init_rwsem(&con->sock_sem);
190 INIT_LIST_HEAD(&con->writequeue); 178 INIT_LIST_HEAD(&con->writequeue);
191 spin_lock_init(&con->writequeue_lock); 179 spin_lock_init(&con->writequeue_lock);
180 INIT_WORK(&con->swork, process_send_sockets);
181 INIT_WORK(&con->rwork, process_recv_sockets);
192 182
193 connections[nodeid] = con; 183 connections[nodeid] = con;
194 } 184 }
@@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
203{ 193{
204 struct connection *con = sock2con(sk); 194 struct connection *con = sock2con(sk);
205 195
206 atomic_inc(&con->waiting_requests); 196 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
207 if (test_and_set_bit(CF_READ_PENDING, &con->flags)) 197 queue_work(recv_workqueue, &con->rwork);
208 return;
209
210 spin_lock_bh(&read_sockets_lock);
211 list_add_tail(&con->read_list, &read_sockets);
212 spin_unlock_bh(&read_sockets_lock);
213
214 wake_up_interruptible(&lowcomms_recv_waitq);
215} 198}
216 199
217static void lowcomms_write_space(struct sock *sk) 200static void lowcomms_write_space(struct sock *sk)
218{ 201{
219 struct connection *con = sock2con(sk); 202 struct connection *con = sock2con(sk);
220 203
221 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 204 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
222 return; 205 queue_work(send_workqueue, &con->swork);
223
224 spin_lock_bh(&write_sockets_lock);
225 list_add_tail(&con->write_list, &write_sockets);
226 spin_unlock_bh(&write_sockets_lock);
227
228 wake_up_interruptible(&lowcomms_send_waitq);
229} 206}
230 207
231static inline void lowcomms_connect_sock(struct connection *con) 208static inline void lowcomms_connect_sock(struct connection *con)
232{ 209{
233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 210 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
234 return; 211 queue_work(send_workqueue, &con->swork);
235
236 spin_lock_bh(&state_sockets_lock);
237 list_add_tail(&con->state_list, &state_sockets);
238 spin_unlock_bh(&state_sockets_lock);
239
240 wake_up_interruptible(&lowcomms_send_waitq);
241} 212}
242 213
243static void lowcomms_state_change(struct sock *sk) 214static void lowcomms_state_change(struct sock *sk)
@@ -388,7 +359,8 @@ out:
388 return 0; 359 return 0;
389 360
390out_resched: 361out_resched:
391 lowcomms_data_ready(con->sock->sk, 0); 362 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
363 queue_work(recv_workqueue, &con->rwork);
392 up_read(&con->sock_sem); 364 up_read(&con->sock_sem);
393 cond_resched(); 365 cond_resched();
394 return 0; 366 return 0;
@@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con)
477 othercon->nodeid = nodeid; 449 othercon->nodeid = nodeid;
478 othercon->rx_action = receive_from_sock; 450 othercon->rx_action = receive_from_sock;
479 init_rwsem(&othercon->sock_sem); 451 init_rwsem(&othercon->sock_sem);
452 INIT_WORK(&othercon->swork, process_send_sockets);
453 INIT_WORK(&othercon->rwork, process_recv_sockets);
480 set_bit(CF_IS_OTHERCON, &othercon->flags); 454 set_bit(CF_IS_OTHERCON, &othercon->flags);
481 newcon->othercon = othercon; 455 newcon->othercon = othercon;
482 } 456 }
@@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con)
498 * beween processing the accept adding the socket 472 * beween processing the accept adding the socket
499 * to the read_sockets list 473 * to the read_sockets list
500 */ 474 */
501 lowcomms_data_ready(newsock->sk, 0); 475 if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags))
476 queue_work(recv_workqueue, &newcon->rwork);
502 up_read(&con->sock_sem); 477 up_read(&con->sock_sem);
503 478
504 return 0; 479 return 0;
@@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
757 kunmap(e->page); 732 kunmap(e->page);
758 spin_unlock(&con->writequeue_lock); 733 spin_unlock(&con->writequeue_lock);
759 734
760 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { 735 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
761 spin_lock_bh(&write_sockets_lock); 736 queue_work(send_workqueue, &con->swork);
762 list_add_tail(&con->write_list, &write_sockets);
763 spin_unlock_bh(&write_sockets_lock);
764
765 wake_up_interruptible(&lowcomms_send_waitq);
766 } 737 }
767 return; 738 return;
768 739
@@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con)
803 offset = e->offset; 774 offset = e->offset;
804 BUG_ON(len == 0 && e->users == 0); 775 BUG_ON(len == 0 && e->users == 0);
805 spin_unlock(&con->writequeue_lock); 776 spin_unlock(&con->writequeue_lock);
777 kmap(e->page);
806 778
807 ret = 0; 779 ret = 0;
808 if (len) { 780 if (len) {
@@ -884,85 +856,29 @@ out:
884} 856}
885 857
886/* Look for activity on active sockets */ 858/* Look for activity on active sockets */
887static void process_sockets(void) 859static void process_recv_sockets(struct work_struct *work)
888{ 860{
889 struct list_head *list; 861 struct connection *con = container_of(work, struct connection, rwork);
890 struct list_head *temp; 862 int err;
891 int count = 0;
892
893 spin_lock_bh(&read_sockets_lock);
894 list_for_each_safe(list, temp, &read_sockets) {
895
896 struct connection *con =
897 list_entry(list, struct connection, read_list);
898 list_del(&con->read_list);
899 clear_bit(CF_READ_PENDING, &con->flags);
900
901 spin_unlock_bh(&read_sockets_lock);
902 863
903 /* This can reach zero if we are processing requests 864 clear_bit(CF_READ_PENDING, &con->flags);
904 * as they come in. 865 do {
905 */ 866 err = con->rx_action(con);
906 if (atomic_read(&con->waiting_requests) == 0) { 867 } while (!err);
907 spin_lock_bh(&read_sockets_lock);
908 continue;
909 }
910
911 do {
912 con->rx_action(con);
913
914 /* Don't starve out everyone else */
915 if (++count >= MAX_RX_MSG_COUNT) {
916 cond_resched();
917 count = 0;
918 }
919
920 } while (!atomic_dec_and_test(&con->waiting_requests) &&
921 !kthread_should_stop());
922
923 spin_lock_bh(&read_sockets_lock);
924 }
925 spin_unlock_bh(&read_sockets_lock);
926} 868}
927 869
928/* Try to send any messages that are pending
929 */
930static void process_output_queue(void)
931{
932 struct list_head *list;
933 struct list_head *temp;
934
935 spin_lock_bh(&write_sockets_lock);
936 list_for_each_safe(list, temp, &write_sockets) {
937 struct connection *con =
938 list_entry(list, struct connection, write_list);
939 clear_bit(CF_WRITE_PENDING, &con->flags);
940 list_del(&con->write_list);
941
942 spin_unlock_bh(&write_sockets_lock);
943 send_to_sock(con);
944 spin_lock_bh(&write_sockets_lock);
945 }
946 spin_unlock_bh(&write_sockets_lock);
947}
948 870
949static void process_state_queue(void) 871static void process_send_sockets(struct work_struct *work)
950{ 872{
951 struct list_head *list; 873 struct connection *con = container_of(work, struct connection, swork);
952 struct list_head *temp;
953
954 spin_lock_bh(&state_sockets_lock);
955 list_for_each_safe(list, temp, &state_sockets) {
956 struct connection *con =
957 list_entry(list, struct connection, state_list);
958 list_del(&con->state_list);
959 clear_bit(CF_CONNECT_PENDING, &con->flags);
960 spin_unlock_bh(&state_sockets_lock);
961 874
875 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
962 connect_to_sock(con); 876 connect_to_sock(con);
963 spin_lock_bh(&state_sockets_lock);
964 } 877 }
965 spin_unlock_bh(&state_sockets_lock); 878
879 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) {
880 send_to_sock(con);
881 }
966} 882}
967 883
968 884
@@ -979,97 +895,29 @@ static void clean_writequeues(void)
979 } 895 }
980} 896}
981 897
982static int read_list_empty(void) 898static void work_stop(void)
983{
984 int status;
985
986 spin_lock_bh(&read_sockets_lock);
987 status = list_empty(&read_sockets);
988 spin_unlock_bh(&read_sockets_lock);
989
990 return status;
991}
992
993/* DLM Transport comms receive daemon */
994static int dlm_recvd(void *data)
995{ 899{
996 init_waitqueue_entry(&lowcomms_recv_waitq_head, current); 900 destroy_workqueue(recv_workqueue);
997 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); 901 destroy_workqueue(send_workqueue);
998
999 while (!kthread_should_stop()) {
1000 set_current_state(TASK_INTERRUPTIBLE);
1001 if (read_list_empty())
1002 schedule();
1003 set_current_state(TASK_RUNNING);
1004
1005 process_sockets();
1006 }
1007
1008 return 0;
1009} 902}
1010 903
1011static int write_and_state_lists_empty(void) 904static int work_start(void)
1012{ 905{
1013 int status;
1014
1015 spin_lock_bh(&write_sockets_lock);
1016 status = list_empty(&write_sockets);
1017 spin_unlock_bh(&write_sockets_lock);
1018
1019 spin_lock_bh(&state_sockets_lock);
1020 if (list_empty(&state_sockets) == 0)
1021 status = 0;
1022 spin_unlock_bh(&state_sockets_lock);
1023
1024 return status;
1025}
1026
1027/* DLM Transport send daemon */
1028static int dlm_sendd(void *data)
1029{
1030 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1031 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1032
1033 while (!kthread_should_stop()) {
1034 set_current_state(TASK_INTERRUPTIBLE);
1035 if (write_and_state_lists_empty())
1036 schedule();
1037 set_current_state(TASK_RUNNING);
1038
1039 process_state_queue();
1040 process_output_queue();
1041 }
1042
1043 return 0;
1044}
1045
1046static void daemons_stop(void)
1047{
1048 kthread_stop(recv_task);
1049 kthread_stop(send_task);
1050}
1051
1052static int daemons_start(void)
1053{
1054 struct task_struct *p;
1055 int error; 906 int error;
1056 907 recv_workqueue = create_workqueue("dlm_recv");
1057 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 908 error = IS_ERR(recv_workqueue);
1058 error = IS_ERR(p);
1059 if (error) { 909 if (error) {
1060 log_print("can't start dlm_recvd %d", error); 910 log_print("can't start dlm_recv %d", error);
1061 return error; 911 return error;
1062 } 912 }
1063 recv_task = p;
1064 913
1065 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 914 send_workqueue = create_singlethread_workqueue("dlm_send");
1066 error = IS_ERR(p); 915 error = IS_ERR(send_workqueue);
1067 if (error) { 916 if (error) {
1068 log_print("can't start dlm_sendd %d", error); 917 log_print("can't start dlm_send %d", error);
1069 kthread_stop(recv_task); 918 destroy_workqueue(recv_workqueue);
1070 return error; 919 return error;
1071 } 920 }
1072 send_task = p;
1073 921
1074 return 0; 922 return 0;
1075} 923}
@@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void)
1086 connections[i]->flags |= 0xFF; 934 connections[i]->flags |= 0xFF;
1087 } 935 }
1088 936
1089 daemons_stop(); 937 work_stop();
1090 clean_writequeues(); 938 clean_writequeues();
1091 939
1092 for (i = 0; i < conn_array_size; i++) { 940 for (i = 0; i < conn_array_size; i++) {
@@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void)
1138 if (error) 986 if (error)
1139 goto fail_unlisten; 987 goto fail_unlisten;
1140 988
1141 error = daemons_start(); 989 error = work_start();
1142 if (error) 990 if (error)
1143 goto fail_unlisten; 991 goto fail_unlisten;
1144 992