diff options
author | Patrick Caulfield <pcaulfie@redhat.com> | 2007-01-15 09:33:34 -0500 |
---|---|---|
committer | Steven Whitehouse <swhiteho@redhat.com> | 2007-02-05 13:36:52 -0500 |
commit | 1d6e8131cf0064ef5ab5f3411a82b800afbfadee (patch) | |
tree | d165b0210a2c12de8848b2b8bc30237183b611dc /fs | |
parent | 03dc6a538e42bcc8d5dfabcee208b639db85a80c (diff) |
[DLM] Use workqueues for dlm lowcomms
This patch converts the DLM TCP lowcomms to use workqueues rather than using its
own daemon functions. Simultaneously removing a lot of code and making it more
scalable on multi-processor machines.
Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 145 | ||||
-rw-r--r-- | fs/dlm/lowcomms-tcp.c | 254 |
2 files changed, 115 insertions, 284 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c index 5aeadadd8afd..dc83a9d979b5 100644 --- a/fs/dlm/lowcomms-sctp.c +++ b/fs/dlm/lowcomms-sctp.c | |||
@@ -72,6 +72,8 @@ struct nodeinfo { | |||
72 | struct list_head writequeue; /* outgoing writequeue_entries */ | 72 | struct list_head writequeue; /* outgoing writequeue_entries */ |
73 | spinlock_t writequeue_lock; | 73 | spinlock_t writequeue_lock; |
74 | int nodeid; | 74 | int nodeid; |
75 | struct work_struct swork; /* Send workqueue */ | ||
76 | struct work_struct lwork; /* Locking workqueue */ | ||
75 | }; | 77 | }; |
76 | 78 | ||
77 | static DEFINE_IDR(nodeinfo_idr); | 79 | static DEFINE_IDR(nodeinfo_idr); |
@@ -96,6 +98,7 @@ struct connection { | |||
96 | atomic_t waiting_requests; | 98 | atomic_t waiting_requests; |
97 | struct cbuf cb; | 99 | struct cbuf cb; |
98 | int eagain_flag; | 100 | int eagain_flag; |
101 | struct work_struct work; /* Send workqueue */ | ||
99 | }; | 102 | }; |
100 | 103 | ||
101 | /* An entry waiting to be sent */ | 104 | /* An entry waiting to be sent */ |
@@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n) | |||
137 | static LIST_HEAD(write_nodes); | 140 | static LIST_HEAD(write_nodes); |
138 | static DEFINE_SPINLOCK(write_nodes_lock); | 141 | static DEFINE_SPINLOCK(write_nodes_lock); |
139 | 142 | ||
143 | |||
140 | /* Maximum number of incoming messages to process before | 144 | /* Maximum number of incoming messages to process before |
141 | * doing a schedule() | 145 | * doing a schedule() |
142 | */ | 146 | */ |
143 | #define MAX_RX_MSG_COUNT 25 | 147 | #define MAX_RX_MSG_COUNT 25 |
144 | 148 | ||
145 | /* Manage daemons */ | 149 | /* Work queues */ |
146 | static struct task_struct *recv_task; | 150 | static struct workqueue_struct *recv_workqueue; |
147 | static struct task_struct *send_task; | 151 | static struct workqueue_struct *send_workqueue; |
148 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); | 152 | static struct workqueue_struct *lock_workqueue; |
149 | 153 | ||
150 | /* The SCTP connection */ | 154 | /* The SCTP connection */ |
151 | static struct connection sctp_con; | 155 | static struct connection sctp_con; |
152 | 156 | ||
157 | static void process_send_sockets(struct work_struct *work); | ||
158 | static void process_recv_sockets(struct work_struct *work); | ||
159 | static void process_lock_request(struct work_struct *work); | ||
153 | 160 | ||
154 | static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) | 161 | static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) |
155 | { | 162 | { |
@@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) | |||
222 | spin_lock_init(&ni->lock); | 229 | spin_lock_init(&ni->lock); |
223 | INIT_LIST_HEAD(&ni->writequeue); | 230 | INIT_LIST_HEAD(&ni->writequeue); |
224 | spin_lock_init(&ni->writequeue_lock); | 231 | spin_lock_init(&ni->writequeue_lock); |
232 | INIT_WORK(&ni->lwork, process_lock_request); | ||
233 | INIT_WORK(&ni->swork, process_send_sockets); | ||
225 | ni->nodeid = nodeid; | 234 | ni->nodeid = nodeid; |
226 | 235 | ||
227 | if (nodeid > max_nodeid) | 236 | if (nodeid > max_nodeid) |
@@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) | |||
249 | /* Data or notification available on socket */ | 258 | /* Data or notification available on socket */ |
250 | static void lowcomms_data_ready(struct sock *sk, int count_unused) | 259 | static void lowcomms_data_ready(struct sock *sk, int count_unused) |
251 | { | 260 | { |
252 | atomic_inc(&sctp_con.waiting_requests); | ||
253 | if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) | 261 | if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) |
254 | return; | 262 | queue_work(recv_workqueue, &sctp_con.work); |
255 | |||
256 | wake_up_interruptible(&lowcomms_recv_wait); | ||
257 | } | 263 | } |
258 | 264 | ||
259 | 265 | ||
@@ -361,10 +367,10 @@ static void init_failed(void) | |||
361 | spin_lock_bh(&write_nodes_lock); | 367 | spin_lock_bh(&write_nodes_lock); |
362 | list_add_tail(&ni->write_list, &write_nodes); | 368 | list_add_tail(&ni->write_list, &write_nodes); |
363 | spin_unlock_bh(&write_nodes_lock); | 369 | spin_unlock_bh(&write_nodes_lock); |
370 | queue_work(send_workqueue, &ni->swork); | ||
364 | } | 371 | } |
365 | } | 372 | } |
366 | } | 373 | } |
367 | wake_up_process(send_task); | ||
368 | } | 374 | } |
369 | 375 | ||
370 | /* Something happened to an association */ | 376 | /* Something happened to an association */ |
@@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf) | |||
446 | spin_lock_bh(&write_nodes_lock); | 452 | spin_lock_bh(&write_nodes_lock); |
447 | list_add_tail(&ni->write_list, &write_nodes); | 453 | list_add_tail(&ni->write_list, &write_nodes); |
448 | spin_unlock_bh(&write_nodes_lock); | 454 | spin_unlock_bh(&write_nodes_lock); |
455 | queue_work(send_workqueue, &ni->swork); | ||
449 | } | 456 | } |
450 | wake_up_process(send_task); | ||
451 | } | 457 | } |
452 | break; | 458 | break; |
453 | 459 | ||
@@ -580,8 +586,8 @@ static int receive_from_sock(void) | |||
580 | spin_lock_bh(&write_nodes_lock); | 586 | spin_lock_bh(&write_nodes_lock); |
581 | list_add_tail(&ni->write_list, &write_nodes); | 587 | list_add_tail(&ni->write_list, &write_nodes); |
582 | spin_unlock_bh(&write_nodes_lock); | 588 | spin_unlock_bh(&write_nodes_lock); |
589 | queue_work(send_workqueue, &ni->swork); | ||
583 | } | 590 | } |
584 | wake_up_process(send_task); | ||
585 | } | 591 | } |
586 | } | 592 | } |
587 | 593 | ||
@@ -590,6 +596,7 @@ static int receive_from_sock(void) | |||
590 | return 0; | 596 | return 0; |
591 | 597 | ||
592 | cbuf_add(&sctp_con.cb, ret); | 598 | cbuf_add(&sctp_con.cb, ret); |
599 | // PJC: TODO: Add to node's workqueue....can we ?? | ||
593 | ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), | 600 | ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), |
594 | page_address(sctp_con.rx_page), | 601 | page_address(sctp_con.rx_page), |
595 | sctp_con.cb.base, sctp_con.cb.len, | 602 | sctp_con.cb.base, sctp_con.cb.len, |
@@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg) | |||
820 | spin_lock_bh(&write_nodes_lock); | 827 | spin_lock_bh(&write_nodes_lock); |
821 | list_add_tail(&ni->write_list, &write_nodes); | 828 | list_add_tail(&ni->write_list, &write_nodes); |
822 | spin_unlock_bh(&write_nodes_lock); | 829 | spin_unlock_bh(&write_nodes_lock); |
823 | wake_up_process(send_task); | 830 | |
831 | queue_work(send_workqueue, &ni->swork); | ||
824 | } | 832 | } |
825 | return; | 833 | return; |
826 | 834 | ||
@@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid) | |||
1088 | return 0; | 1096 | return 0; |
1089 | } | 1097 | } |
1090 | 1098 | ||
1091 | static int write_list_empty(void) | 1099 | // PJC: The work queue function for receiving. |
1100 | static void process_recv_sockets(struct work_struct *work) | ||
1092 | { | 1101 | { |
1093 | int status; | 1102 | if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { |
1103 | int ret; | ||
1104 | int count = 0; | ||
1094 | 1105 | ||
1095 | spin_lock_bh(&write_nodes_lock); | 1106 | do { |
1096 | status = list_empty(&write_nodes); | 1107 | ret = receive_from_sock(); |
1097 | spin_unlock_bh(&write_nodes_lock); | ||
1098 | 1108 | ||
1099 | return status; | 1109 | /* Don't starve out everyone else */ |
1110 | if (++count >= MAX_RX_MSG_COUNT) { | ||
1111 | cond_resched(); | ||
1112 | count = 0; | ||
1113 | } | ||
1114 | } while (!kthread_should_stop() && ret >=0); | ||
1115 | } | ||
1116 | cond_resched(); | ||
1100 | } | 1117 | } |
1101 | 1118 | ||
1102 | static int dlm_recvd(void *data) | 1119 | // PJC: the work queue function for sending |
1120 | static void process_send_sockets(struct work_struct *work) | ||
1103 | { | 1121 | { |
1104 | DECLARE_WAITQUEUE(wait, current); | 1122 | if (sctp_con.eagain_flag) { |
1105 | 1123 | sctp_con.eagain_flag = 0; | |
1106 | while (!kthread_should_stop()) { | 1124 | refill_write_queue(); |
1107 | int count = 0; | ||
1108 | |||
1109 | set_current_state(TASK_INTERRUPTIBLE); | ||
1110 | add_wait_queue(&lowcomms_recv_wait, &wait); | ||
1111 | if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) | ||
1112 | schedule(); | ||
1113 | remove_wait_queue(&lowcomms_recv_wait, &wait); | ||
1114 | set_current_state(TASK_RUNNING); | ||
1115 | |||
1116 | if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { | ||
1117 | int ret; | ||
1118 | |||
1119 | do { | ||
1120 | ret = receive_from_sock(); | ||
1121 | |||
1122 | /* Don't starve out everyone else */ | ||
1123 | if (++count >= MAX_RX_MSG_COUNT) { | ||
1124 | cond_resched(); | ||
1125 | count = 0; | ||
1126 | } | ||
1127 | } while (!kthread_should_stop() && ret >=0); | ||
1128 | } | ||
1129 | cond_resched(); | ||
1130 | } | 1125 | } |
1131 | 1126 | process_output_queue(); | |
1132 | return 0; | ||
1133 | } | 1127 | } |
1134 | 1128 | ||
1135 | static int dlm_sendd(void *data) | 1129 | // PJC: Process lock requests from a particular node. |
1130 | // TODO: can we optimise this out on UP ?? | ||
1131 | static void process_lock_request(struct work_struct *work) | ||
1136 | { | 1132 | { |
1137 | DECLARE_WAITQUEUE(wait, current); | ||
1138 | |||
1139 | add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); | ||
1140 | |||
1141 | while (!kthread_should_stop()) { | ||
1142 | set_current_state(TASK_INTERRUPTIBLE); | ||
1143 | if (write_list_empty()) | ||
1144 | schedule(); | ||
1145 | set_current_state(TASK_RUNNING); | ||
1146 | |||
1147 | if (sctp_con.eagain_flag) { | ||
1148 | sctp_con.eagain_flag = 0; | ||
1149 | refill_write_queue(); | ||
1150 | } | ||
1151 | process_output_queue(); | ||
1152 | } | ||
1153 | |||
1154 | remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); | ||
1155 | |||
1156 | return 0; | ||
1157 | } | 1133 | } |
1158 | 1134 | ||
1159 | static void daemons_stop(void) | 1135 | static void daemons_stop(void) |
1160 | { | 1136 | { |
1161 | kthread_stop(recv_task); | 1137 | destroy_workqueue(recv_workqueue); |
1162 | kthread_stop(send_task); | 1138 | destroy_workqueue(send_workqueue); |
1139 | destroy_workqueue(lock_workqueue); | ||
1163 | } | 1140 | } |
1164 | 1141 | ||
1165 | static int daemons_start(void) | 1142 | static int daemons_start(void) |
1166 | { | 1143 | { |
1167 | struct task_struct *p; | ||
1168 | int error; | 1144 | int error; |
1145 | recv_workqueue = create_workqueue("dlm_recv"); | ||
1146 | error = IS_ERR(recv_workqueue); | ||
1147 | if (error) { | ||
1148 | log_print("can't start dlm_recv %d", error); | ||
1149 | return error; | ||
1150 | } | ||
1169 | 1151 | ||
1170 | p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); | 1152 | send_workqueue = create_singlethread_workqueue("dlm_send"); |
1171 | error = IS_ERR(p); | 1153 | error = IS_ERR(send_workqueue); |
1172 | if (error) { | 1154 | if (error) { |
1173 | log_print("can't start dlm_recvd %d", error); | 1155 | log_print("can't start dlm_send %d", error); |
1156 | destroy_workqueue(recv_workqueue); | ||
1174 | return error; | 1157 | return error; |
1175 | } | 1158 | } |
1176 | recv_task = p; | ||
1177 | 1159 | ||
1178 | p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); | 1160 | lock_workqueue = create_workqueue("dlm_rlock"); |
1179 | error = IS_ERR(p); | 1161 | error = IS_ERR(lock_workqueue); |
1180 | if (error) { | 1162 | if (error) { |
1181 | log_print("can't start dlm_sendd %d", error); | 1163 | log_print("can't start dlm_rlock %d", error); |
1182 | kthread_stop(recv_task); | 1164 | destroy_workqueue(send_workqueue); |
1165 | destroy_workqueue(recv_workqueue); | ||
1183 | return error; | 1166 | return error; |
1184 | } | 1167 | } |
1185 | send_task = p; | ||
1186 | 1168 | ||
1187 | return 0; | 1169 | return 0; |
1188 | } | 1170 | } |
@@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void) | |||
1194 | { | 1176 | { |
1195 | int error; | 1177 | int error; |
1196 | 1178 | ||
1179 | INIT_WORK(&sctp_con.work, process_recv_sockets); | ||
1180 | |||
1197 | error = init_sock(); | 1181 | error = init_sock(); |
1198 | if (error) | 1182 | if (error) |
1199 | goto fail_sock; | 1183 | goto fail_sock; |
@@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void) | |||
1224 | for (i = 0; i < dlm_local_count; i++) | 1208 | for (i = 0; i < dlm_local_count; i++) |
1225 | kfree(dlm_local_addr[i]); | 1209 | kfree(dlm_local_addr[i]); |
1226 | } | 1210 | } |
1227 | |||
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index b4fb5783ef8a..86e5f81da7cb 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c | |||
@@ -115,6 +115,8 @@ struct connection { | |||
115 | atomic_t waiting_requests; | 115 | atomic_t waiting_requests; |
116 | #define MAX_CONNECT_RETRIES 3 | 116 | #define MAX_CONNECT_RETRIES 3 |
117 | struct connection *othercon; | 117 | struct connection *othercon; |
118 | struct work_struct rwork; /* Receive workqueue */ | ||
119 | struct work_struct swork; /* Send workqueue */ | ||
118 | }; | 120 | }; |
119 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) | 121 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) |
120 | 122 | ||
@@ -131,14 +133,9 @@ struct writequeue_entry { | |||
131 | 133 | ||
132 | static struct sockaddr_storage dlm_local_addr; | 134 | static struct sockaddr_storage dlm_local_addr; |
133 | 135 | ||
134 | /* Manage daemons */ | 136 | /* Work queues */ |
135 | static struct task_struct *recv_task; | 137 | static struct workqueue_struct *recv_workqueue; |
136 | static struct task_struct *send_task; | 138 | static struct workqueue_struct *send_workqueue; |
137 | |||
138 | static wait_queue_t lowcomms_send_waitq_head; | ||
139 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq); | ||
140 | static wait_queue_t lowcomms_recv_waitq_head; | ||
141 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq); | ||
142 | 139 | ||
143 | /* An array of pointers to connections, indexed by NODEID */ | 140 | /* An array of pointers to connections, indexed by NODEID */ |
144 | static struct connection **connections; | 141 | static struct connection **connections; |
@@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock); | |||
146 | static struct kmem_cache *con_cache; | 143 | static struct kmem_cache *con_cache; |
147 | static int conn_array_size; | 144 | static int conn_array_size; |
148 | 145 | ||
149 | /* List of sockets that have reads pending */ | 146 | static void process_recv_sockets(struct work_struct *work); |
150 | static LIST_HEAD(read_sockets); | 147 | static void process_send_sockets(struct work_struct *work); |
151 | static DEFINE_SPINLOCK(read_sockets_lock); | ||
152 | |||
153 | /* List of sockets which have writes pending */ | ||
154 | static LIST_HEAD(write_sockets); | ||
155 | static DEFINE_SPINLOCK(write_sockets_lock); | ||
156 | |||
157 | /* List of sockets which have connects pending */ | ||
158 | static LIST_HEAD(state_sockets); | ||
159 | static DEFINE_SPINLOCK(state_sockets_lock); | ||
160 | 148 | ||
161 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) | 149 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) |
162 | { | 150 | { |
@@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) | |||
189 | init_rwsem(&con->sock_sem); | 177 | init_rwsem(&con->sock_sem); |
190 | INIT_LIST_HEAD(&con->writequeue); | 178 | INIT_LIST_HEAD(&con->writequeue); |
191 | spin_lock_init(&con->writequeue_lock); | 179 | spin_lock_init(&con->writequeue_lock); |
180 | INIT_WORK(&con->swork, process_send_sockets); | ||
181 | INIT_WORK(&con->rwork, process_recv_sockets); | ||
192 | 182 | ||
193 | connections[nodeid] = con; | 183 | connections[nodeid] = con; |
194 | } | 184 | } |
@@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused) | |||
203 | { | 193 | { |
204 | struct connection *con = sock2con(sk); | 194 | struct connection *con = sock2con(sk); |
205 | 195 | ||
206 | atomic_inc(&con->waiting_requests); | 196 | if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) |
207 | if (test_and_set_bit(CF_READ_PENDING, &con->flags)) | 197 | queue_work(recv_workqueue, &con->rwork); |
208 | return; | ||
209 | |||
210 | spin_lock_bh(&read_sockets_lock); | ||
211 | list_add_tail(&con->read_list, &read_sockets); | ||
212 | spin_unlock_bh(&read_sockets_lock); | ||
213 | |||
214 | wake_up_interruptible(&lowcomms_recv_waitq); | ||
215 | } | 198 | } |
216 | 199 | ||
217 | static void lowcomms_write_space(struct sock *sk) | 200 | static void lowcomms_write_space(struct sock *sk) |
218 | { | 201 | { |
219 | struct connection *con = sock2con(sk); | 202 | struct connection *con = sock2con(sk); |
220 | 203 | ||
221 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) | 204 | if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) |
222 | return; | 205 | queue_work(send_workqueue, &con->swork); |
223 | |||
224 | spin_lock_bh(&write_sockets_lock); | ||
225 | list_add_tail(&con->write_list, &write_sockets); | ||
226 | spin_unlock_bh(&write_sockets_lock); | ||
227 | |||
228 | wake_up_interruptible(&lowcomms_send_waitq); | ||
229 | } | 206 | } |
230 | 207 | ||
231 | static inline void lowcomms_connect_sock(struct connection *con) | 208 | static inline void lowcomms_connect_sock(struct connection *con) |
232 | { | 209 | { |
233 | if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) | 210 | if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) |
234 | return; | 211 | queue_work(send_workqueue, &con->swork); |
235 | |||
236 | spin_lock_bh(&state_sockets_lock); | ||
237 | list_add_tail(&con->state_list, &state_sockets); | ||
238 | spin_unlock_bh(&state_sockets_lock); | ||
239 | |||
240 | wake_up_interruptible(&lowcomms_send_waitq); | ||
241 | } | 212 | } |
242 | 213 | ||
243 | static void lowcomms_state_change(struct sock *sk) | 214 | static void lowcomms_state_change(struct sock *sk) |
@@ -388,7 +359,8 @@ out: | |||
388 | return 0; | 359 | return 0; |
389 | 360 | ||
390 | out_resched: | 361 | out_resched: |
391 | lowcomms_data_ready(con->sock->sk, 0); | 362 | if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) |
363 | queue_work(recv_workqueue, &con->rwork); | ||
392 | up_read(&con->sock_sem); | 364 | up_read(&con->sock_sem); |
393 | cond_resched(); | 365 | cond_resched(); |
394 | return 0; | 366 | return 0; |
@@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con) | |||
477 | othercon->nodeid = nodeid; | 449 | othercon->nodeid = nodeid; |
478 | othercon->rx_action = receive_from_sock; | 450 | othercon->rx_action = receive_from_sock; |
479 | init_rwsem(&othercon->sock_sem); | 451 | init_rwsem(&othercon->sock_sem); |
452 | INIT_WORK(&othercon->swork, process_send_sockets); | ||
453 | INIT_WORK(&othercon->rwork, process_recv_sockets); | ||
480 | set_bit(CF_IS_OTHERCON, &othercon->flags); | 454 | set_bit(CF_IS_OTHERCON, &othercon->flags); |
481 | newcon->othercon = othercon; | 455 | newcon->othercon = othercon; |
482 | } | 456 | } |
@@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con) | |||
498 | * beween processing the accept adding the socket | 472 | * beween processing the accept adding the socket |
499 | * to the read_sockets list | 473 | * to the read_sockets list |
500 | */ | 474 | */ |
501 | lowcomms_data_ready(newsock->sk, 0); | 475 | if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags)) |
476 | queue_work(recv_workqueue, &newcon->rwork); | ||
502 | up_read(&con->sock_sem); | 477 | up_read(&con->sock_sem); |
503 | 478 | ||
504 | return 0; | 479 | return 0; |
@@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh) | |||
757 | kunmap(e->page); | 732 | kunmap(e->page); |
758 | spin_unlock(&con->writequeue_lock); | 733 | spin_unlock(&con->writequeue_lock); |
759 | 734 | ||
760 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { | 735 | if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { |
761 | spin_lock_bh(&write_sockets_lock); | 736 | queue_work(send_workqueue, &con->swork); |
762 | list_add_tail(&con->write_list, &write_sockets); | ||
763 | spin_unlock_bh(&write_sockets_lock); | ||
764 | |||
765 | wake_up_interruptible(&lowcomms_send_waitq); | ||
766 | } | 737 | } |
767 | return; | 738 | return; |
768 | 739 | ||
@@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con) | |||
803 | offset = e->offset; | 774 | offset = e->offset; |
804 | BUG_ON(len == 0 && e->users == 0); | 775 | BUG_ON(len == 0 && e->users == 0); |
805 | spin_unlock(&con->writequeue_lock); | 776 | spin_unlock(&con->writequeue_lock); |
777 | kmap(e->page); | ||
806 | 778 | ||
807 | ret = 0; | 779 | ret = 0; |
808 | if (len) { | 780 | if (len) { |
@@ -884,85 +856,29 @@ out: | |||
884 | } | 856 | } |
885 | 857 | ||
886 | /* Look for activity on active sockets */ | 858 | /* Look for activity on active sockets */ |
887 | static void process_sockets(void) | 859 | static void process_recv_sockets(struct work_struct *work) |
888 | { | 860 | { |
889 | struct list_head *list; | 861 | struct connection *con = container_of(work, struct connection, rwork); |
890 | struct list_head *temp; | 862 | int err; |
891 | int count = 0; | ||
892 | |||
893 | spin_lock_bh(&read_sockets_lock); | ||
894 | list_for_each_safe(list, temp, &read_sockets) { | ||
895 | |||
896 | struct connection *con = | ||
897 | list_entry(list, struct connection, read_list); | ||
898 | list_del(&con->read_list); | ||
899 | clear_bit(CF_READ_PENDING, &con->flags); | ||
900 | |||
901 | spin_unlock_bh(&read_sockets_lock); | ||
902 | 863 | ||
903 | /* This can reach zero if we are processing requests | 864 | clear_bit(CF_READ_PENDING, &con->flags); |
904 | * as they come in. | 865 | do { |
905 | */ | 866 | err = con->rx_action(con); |
906 | if (atomic_read(&con->waiting_requests) == 0) { | 867 | } while (!err); |
907 | spin_lock_bh(&read_sockets_lock); | ||
908 | continue; | ||
909 | } | ||
910 | |||
911 | do { | ||
912 | con->rx_action(con); | ||
913 | |||
914 | /* Don't starve out everyone else */ | ||
915 | if (++count >= MAX_RX_MSG_COUNT) { | ||
916 | cond_resched(); | ||
917 | count = 0; | ||
918 | } | ||
919 | |||
920 | } while (!atomic_dec_and_test(&con->waiting_requests) && | ||
921 | !kthread_should_stop()); | ||
922 | |||
923 | spin_lock_bh(&read_sockets_lock); | ||
924 | } | ||
925 | spin_unlock_bh(&read_sockets_lock); | ||
926 | } | 868 | } |
927 | 869 | ||
928 | /* Try to send any messages that are pending | ||
929 | */ | ||
930 | static void process_output_queue(void) | ||
931 | { | ||
932 | struct list_head *list; | ||
933 | struct list_head *temp; | ||
934 | |||
935 | spin_lock_bh(&write_sockets_lock); | ||
936 | list_for_each_safe(list, temp, &write_sockets) { | ||
937 | struct connection *con = | ||
938 | list_entry(list, struct connection, write_list); | ||
939 | clear_bit(CF_WRITE_PENDING, &con->flags); | ||
940 | list_del(&con->write_list); | ||
941 | |||
942 | spin_unlock_bh(&write_sockets_lock); | ||
943 | send_to_sock(con); | ||
944 | spin_lock_bh(&write_sockets_lock); | ||
945 | } | ||
946 | spin_unlock_bh(&write_sockets_lock); | ||
947 | } | ||
948 | 870 | ||
949 | static void process_state_queue(void) | 871 | static void process_send_sockets(struct work_struct *work) |
950 | { | 872 | { |
951 | struct list_head *list; | 873 | struct connection *con = container_of(work, struct connection, swork); |
952 | struct list_head *temp; | ||
953 | |||
954 | spin_lock_bh(&state_sockets_lock); | ||
955 | list_for_each_safe(list, temp, &state_sockets) { | ||
956 | struct connection *con = | ||
957 | list_entry(list, struct connection, state_list); | ||
958 | list_del(&con->state_list); | ||
959 | clear_bit(CF_CONNECT_PENDING, &con->flags); | ||
960 | spin_unlock_bh(&state_sockets_lock); | ||
961 | 874 | ||
875 | if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { | ||
962 | connect_to_sock(con); | 876 | connect_to_sock(con); |
963 | spin_lock_bh(&state_sockets_lock); | ||
964 | } | 877 | } |
965 | spin_unlock_bh(&state_sockets_lock); | 878 | |
879 | if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) { | ||
880 | send_to_sock(con); | ||
881 | } | ||
966 | } | 882 | } |
967 | 883 | ||
968 | 884 | ||
@@ -979,97 +895,29 @@ static void clean_writequeues(void) | |||
979 | } | 895 | } |
980 | } | 896 | } |
981 | 897 | ||
982 | static int read_list_empty(void) | 898 | static void work_stop(void) |
983 | { | ||
984 | int status; | ||
985 | |||
986 | spin_lock_bh(&read_sockets_lock); | ||
987 | status = list_empty(&read_sockets); | ||
988 | spin_unlock_bh(&read_sockets_lock); | ||
989 | |||
990 | return status; | ||
991 | } | ||
992 | |||
993 | /* DLM Transport comms receive daemon */ | ||
994 | static int dlm_recvd(void *data) | ||
995 | { | 899 | { |
996 | init_waitqueue_entry(&lowcomms_recv_waitq_head, current); | 900 | destroy_workqueue(recv_workqueue); |
997 | add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); | 901 | destroy_workqueue(send_workqueue); |
998 | |||
999 | while (!kthread_should_stop()) { | ||
1000 | set_current_state(TASK_INTERRUPTIBLE); | ||
1001 | if (read_list_empty()) | ||
1002 | schedule(); | ||
1003 | set_current_state(TASK_RUNNING); | ||
1004 | |||
1005 | process_sockets(); | ||
1006 | } | ||
1007 | |||
1008 | return 0; | ||
1009 | } | 902 | } |
1010 | 903 | ||
1011 | static int write_and_state_lists_empty(void) | 904 | static int work_start(void) |
1012 | { | 905 | { |
1013 | int status; | ||
1014 | |||
1015 | spin_lock_bh(&write_sockets_lock); | ||
1016 | status = list_empty(&write_sockets); | ||
1017 | spin_unlock_bh(&write_sockets_lock); | ||
1018 | |||
1019 | spin_lock_bh(&state_sockets_lock); | ||
1020 | if (list_empty(&state_sockets) == 0) | ||
1021 | status = 0; | ||
1022 | spin_unlock_bh(&state_sockets_lock); | ||
1023 | |||
1024 | return status; | ||
1025 | } | ||
1026 | |||
1027 | /* DLM Transport send daemon */ | ||
1028 | static int dlm_sendd(void *data) | ||
1029 | { | ||
1030 | init_waitqueue_entry(&lowcomms_send_waitq_head, current); | ||
1031 | add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); | ||
1032 | |||
1033 | while (!kthread_should_stop()) { | ||
1034 | set_current_state(TASK_INTERRUPTIBLE); | ||
1035 | if (write_and_state_lists_empty()) | ||
1036 | schedule(); | ||
1037 | set_current_state(TASK_RUNNING); | ||
1038 | |||
1039 | process_state_queue(); | ||
1040 | process_output_queue(); | ||
1041 | } | ||
1042 | |||
1043 | return 0; | ||
1044 | } | ||
1045 | |||
1046 | static void daemons_stop(void) | ||
1047 | { | ||
1048 | kthread_stop(recv_task); | ||
1049 | kthread_stop(send_task); | ||
1050 | } | ||
1051 | |||
1052 | static int daemons_start(void) | ||
1053 | { | ||
1054 | struct task_struct *p; | ||
1055 | int error; | 906 | int error; |
1056 | 907 | recv_workqueue = create_workqueue("dlm_recv"); | |
1057 | p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); | 908 | error = IS_ERR(recv_workqueue); |
1058 | error = IS_ERR(p); | ||
1059 | if (error) { | 909 | if (error) { |
1060 | log_print("can't start dlm_recvd %d", error); | 910 | log_print("can't start dlm_recv %d", error); |
1061 | return error; | 911 | return error; |
1062 | } | 912 | } |
1063 | recv_task = p; | ||
1064 | 913 | ||
1065 | p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); | 914 | send_workqueue = create_singlethread_workqueue("dlm_send"); |
1066 | error = IS_ERR(p); | 915 | error = IS_ERR(send_workqueue); |
1067 | if (error) { | 916 | if (error) { |
1068 | log_print("can't start dlm_sendd %d", error); | 917 | log_print("can't start dlm_send %d", error); |
1069 | kthread_stop(recv_task); | 918 | destroy_workqueue(recv_workqueue); |
1070 | return error; | 919 | return error; |
1071 | } | 920 | } |
1072 | send_task = p; | ||
1073 | 921 | ||
1074 | return 0; | 922 | return 0; |
1075 | } | 923 | } |
@@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void) | |||
1086 | connections[i]->flags |= 0xFF; | 934 | connections[i]->flags |= 0xFF; |
1087 | } | 935 | } |
1088 | 936 | ||
1089 | daemons_stop(); | 937 | work_stop(); |
1090 | clean_writequeues(); | 938 | clean_writequeues(); |
1091 | 939 | ||
1092 | for (i = 0; i < conn_array_size; i++) { | 940 | for (i = 0; i < conn_array_size; i++) { |
@@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void) | |||
1138 | if (error) | 986 | if (error) |
1139 | goto fail_unlisten; | 987 | goto fail_unlisten; |
1140 | 988 | ||
1141 | error = daemons_start(); | 989 | error = work_start(); |
1142 | if (error) | 990 | if (error) |
1143 | goto fail_unlisten; | 991 | goto fail_unlisten; |
1144 | 992 | ||