diff options
author | Patrick Caulfield <pcaulfie@redhat.com> | 2007-01-15 09:33:34 -0500 |
---|---|---|
committer | Steven Whitehouse <swhiteho@redhat.com> | 2007-02-05 13:36:52 -0500 |
commit | 1d6e8131cf0064ef5ab5f3411a82b800afbfadee (patch) | |
tree | d165b0210a2c12de8848b2b8bc30237183b611dc /fs/dlm/lowcomms-sctp.c | |
parent | 03dc6a538e42bcc8d5dfabcee208b639db85a80c (diff) |
[DLM] Use workqueues for dlm lowcomms
This patch converts the DLM TCP lowcomms to use workqueues rather than using its
own daemon functions. Simultaneously removing a lot of code and making it more
scalable on multi-processor machines.
Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lowcomms-sctp.c')
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 145 |
1 files changed, 64 insertions, 81 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c index 5aeadadd8afd..dc83a9d979b5 100644 --- a/fs/dlm/lowcomms-sctp.c +++ b/fs/dlm/lowcomms-sctp.c | |||
@@ -72,6 +72,8 @@ struct nodeinfo { | |||
72 | struct list_head writequeue; /* outgoing writequeue_entries */ | 72 | struct list_head writequeue; /* outgoing writequeue_entries */ |
73 | spinlock_t writequeue_lock; | 73 | spinlock_t writequeue_lock; |
74 | int nodeid; | 74 | int nodeid; |
75 | struct work_struct swork; /* Send workqueue */ | ||
76 | struct work_struct lwork; /* Locking workqueue */ | ||
75 | }; | 77 | }; |
76 | 78 | ||
77 | static DEFINE_IDR(nodeinfo_idr); | 79 | static DEFINE_IDR(nodeinfo_idr); |
@@ -96,6 +98,7 @@ struct connection { | |||
96 | atomic_t waiting_requests; | 98 | atomic_t waiting_requests; |
97 | struct cbuf cb; | 99 | struct cbuf cb; |
98 | int eagain_flag; | 100 | int eagain_flag; |
101 | struct work_struct work; /* Send workqueue */ | ||
99 | }; | 102 | }; |
100 | 103 | ||
101 | /* An entry waiting to be sent */ | 104 | /* An entry waiting to be sent */ |
@@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n) | |||
137 | static LIST_HEAD(write_nodes); | 140 | static LIST_HEAD(write_nodes); |
138 | static DEFINE_SPINLOCK(write_nodes_lock); | 141 | static DEFINE_SPINLOCK(write_nodes_lock); |
139 | 142 | ||
143 | |||
140 | /* Maximum number of incoming messages to process before | 144 | /* Maximum number of incoming messages to process before |
141 | * doing a schedule() | 145 | * doing a schedule() |
142 | */ | 146 | */ |
143 | #define MAX_RX_MSG_COUNT 25 | 147 | #define MAX_RX_MSG_COUNT 25 |
144 | 148 | ||
145 | /* Manage daemons */ | 149 | /* Work queues */ |
146 | static struct task_struct *recv_task; | 150 | static struct workqueue_struct *recv_workqueue; |
147 | static struct task_struct *send_task; | 151 | static struct workqueue_struct *send_workqueue; |
148 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); | 152 | static struct workqueue_struct *lock_workqueue; |
149 | 153 | ||
150 | /* The SCTP connection */ | 154 | /* The SCTP connection */ |
151 | static struct connection sctp_con; | 155 | static struct connection sctp_con; |
152 | 156 | ||
157 | static void process_send_sockets(struct work_struct *work); | ||
158 | static void process_recv_sockets(struct work_struct *work); | ||
159 | static void process_lock_request(struct work_struct *work); | ||
153 | 160 | ||
154 | static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) | 161 | static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) |
155 | { | 162 | { |
@@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) | |||
222 | spin_lock_init(&ni->lock); | 229 | spin_lock_init(&ni->lock); |
223 | INIT_LIST_HEAD(&ni->writequeue); | 230 | INIT_LIST_HEAD(&ni->writequeue); |
224 | spin_lock_init(&ni->writequeue_lock); | 231 | spin_lock_init(&ni->writequeue_lock); |
232 | INIT_WORK(&ni->lwork, process_lock_request); | ||
233 | INIT_WORK(&ni->swork, process_send_sockets); | ||
225 | ni->nodeid = nodeid; | 234 | ni->nodeid = nodeid; |
226 | 235 | ||
227 | if (nodeid > max_nodeid) | 236 | if (nodeid > max_nodeid) |
@@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) | |||
249 | /* Data or notification available on socket */ | 258 | /* Data or notification available on socket */ |
250 | static void lowcomms_data_ready(struct sock *sk, int count_unused) | 259 | static void lowcomms_data_ready(struct sock *sk, int count_unused) |
251 | { | 260 | { |
252 | atomic_inc(&sctp_con.waiting_requests); | ||
253 | if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) | 261 | if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) |
254 | return; | 262 | queue_work(recv_workqueue, &sctp_con.work); |
255 | |||
256 | wake_up_interruptible(&lowcomms_recv_wait); | ||
257 | } | 263 | } |
258 | 264 | ||
259 | 265 | ||
@@ -361,10 +367,10 @@ static void init_failed(void) | |||
361 | spin_lock_bh(&write_nodes_lock); | 367 | spin_lock_bh(&write_nodes_lock); |
362 | list_add_tail(&ni->write_list, &write_nodes); | 368 | list_add_tail(&ni->write_list, &write_nodes); |
363 | spin_unlock_bh(&write_nodes_lock); | 369 | spin_unlock_bh(&write_nodes_lock); |
370 | queue_work(send_workqueue, &ni->swork); | ||
364 | } | 371 | } |
365 | } | 372 | } |
366 | } | 373 | } |
367 | wake_up_process(send_task); | ||
368 | } | 374 | } |
369 | 375 | ||
370 | /* Something happened to an association */ | 376 | /* Something happened to an association */ |
@@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf) | |||
446 | spin_lock_bh(&write_nodes_lock); | 452 | spin_lock_bh(&write_nodes_lock); |
447 | list_add_tail(&ni->write_list, &write_nodes); | 453 | list_add_tail(&ni->write_list, &write_nodes); |
448 | spin_unlock_bh(&write_nodes_lock); | 454 | spin_unlock_bh(&write_nodes_lock); |
455 | queue_work(send_workqueue, &ni->swork); | ||
449 | } | 456 | } |
450 | wake_up_process(send_task); | ||
451 | } | 457 | } |
452 | break; | 458 | break; |
453 | 459 | ||
@@ -580,8 +586,8 @@ static int receive_from_sock(void) | |||
580 | spin_lock_bh(&write_nodes_lock); | 586 | spin_lock_bh(&write_nodes_lock); |
581 | list_add_tail(&ni->write_list, &write_nodes); | 587 | list_add_tail(&ni->write_list, &write_nodes); |
582 | spin_unlock_bh(&write_nodes_lock); | 588 | spin_unlock_bh(&write_nodes_lock); |
589 | queue_work(send_workqueue, &ni->swork); | ||
583 | } | 590 | } |
584 | wake_up_process(send_task); | ||
585 | } | 591 | } |
586 | } | 592 | } |
587 | 593 | ||
@@ -590,6 +596,7 @@ static int receive_from_sock(void) | |||
590 | return 0; | 596 | return 0; |
591 | 597 | ||
592 | cbuf_add(&sctp_con.cb, ret); | 598 | cbuf_add(&sctp_con.cb, ret); |
599 | // PJC: TODO: Add to node's workqueue....can we ?? | ||
593 | ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), | 600 | ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), |
594 | page_address(sctp_con.rx_page), | 601 | page_address(sctp_con.rx_page), |
595 | sctp_con.cb.base, sctp_con.cb.len, | 602 | sctp_con.cb.base, sctp_con.cb.len, |
@@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg) | |||
820 | spin_lock_bh(&write_nodes_lock); | 827 | spin_lock_bh(&write_nodes_lock); |
821 | list_add_tail(&ni->write_list, &write_nodes); | 828 | list_add_tail(&ni->write_list, &write_nodes); |
822 | spin_unlock_bh(&write_nodes_lock); | 829 | spin_unlock_bh(&write_nodes_lock); |
823 | wake_up_process(send_task); | 830 | |
831 | queue_work(send_workqueue, &ni->swork); | ||
824 | } | 832 | } |
825 | return; | 833 | return; |
826 | 834 | ||
@@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid) | |||
1088 | return 0; | 1096 | return 0; |
1089 | } | 1097 | } |
1090 | 1098 | ||
1091 | static int write_list_empty(void) | 1099 | // PJC: The work queue function for receiving. |
1100 | static void process_recv_sockets(struct work_struct *work) | ||
1092 | { | 1101 | { |
1093 | int status; | 1102 | if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { |
1103 | int ret; | ||
1104 | int count = 0; | ||
1094 | 1105 | ||
1095 | spin_lock_bh(&write_nodes_lock); | 1106 | do { |
1096 | status = list_empty(&write_nodes); | 1107 | ret = receive_from_sock(); |
1097 | spin_unlock_bh(&write_nodes_lock); | ||
1098 | 1108 | ||
1099 | return status; | 1109 | /* Don't starve out everyone else */ |
1110 | if (++count >= MAX_RX_MSG_COUNT) { | ||
1111 | cond_resched(); | ||
1112 | count = 0; | ||
1113 | } | ||
1114 | } while (!kthread_should_stop() && ret >=0); | ||
1115 | } | ||
1116 | cond_resched(); | ||
1100 | } | 1117 | } |
1101 | 1118 | ||
1102 | static int dlm_recvd(void *data) | 1119 | // PJC: the work queue function for sending |
1120 | static void process_send_sockets(struct work_struct *work) | ||
1103 | { | 1121 | { |
1104 | DECLARE_WAITQUEUE(wait, current); | 1122 | if (sctp_con.eagain_flag) { |
1105 | 1123 | sctp_con.eagain_flag = 0; | |
1106 | while (!kthread_should_stop()) { | 1124 | refill_write_queue(); |
1107 | int count = 0; | ||
1108 | |||
1109 | set_current_state(TASK_INTERRUPTIBLE); | ||
1110 | add_wait_queue(&lowcomms_recv_wait, &wait); | ||
1111 | if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) | ||
1112 | schedule(); | ||
1113 | remove_wait_queue(&lowcomms_recv_wait, &wait); | ||
1114 | set_current_state(TASK_RUNNING); | ||
1115 | |||
1116 | if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { | ||
1117 | int ret; | ||
1118 | |||
1119 | do { | ||
1120 | ret = receive_from_sock(); | ||
1121 | |||
1122 | /* Don't starve out everyone else */ | ||
1123 | if (++count >= MAX_RX_MSG_COUNT) { | ||
1124 | cond_resched(); | ||
1125 | count = 0; | ||
1126 | } | ||
1127 | } while (!kthread_should_stop() && ret >=0); | ||
1128 | } | ||
1129 | cond_resched(); | ||
1130 | } | 1125 | } |
1131 | 1126 | process_output_queue(); | |
1132 | return 0; | ||
1133 | } | 1127 | } |
1134 | 1128 | ||
1135 | static int dlm_sendd(void *data) | 1129 | // PJC: Process lock requests from a particular node. |
1130 | // TODO: can we optimise this out on UP ?? | ||
1131 | static void process_lock_request(struct work_struct *work) | ||
1136 | { | 1132 | { |
1137 | DECLARE_WAITQUEUE(wait, current); | ||
1138 | |||
1139 | add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); | ||
1140 | |||
1141 | while (!kthread_should_stop()) { | ||
1142 | set_current_state(TASK_INTERRUPTIBLE); | ||
1143 | if (write_list_empty()) | ||
1144 | schedule(); | ||
1145 | set_current_state(TASK_RUNNING); | ||
1146 | |||
1147 | if (sctp_con.eagain_flag) { | ||
1148 | sctp_con.eagain_flag = 0; | ||
1149 | refill_write_queue(); | ||
1150 | } | ||
1151 | process_output_queue(); | ||
1152 | } | ||
1153 | |||
1154 | remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); | ||
1155 | |||
1156 | return 0; | ||
1157 | } | 1133 | } |
1158 | 1134 | ||
1159 | static void daemons_stop(void) | 1135 | static void daemons_stop(void) |
1160 | { | 1136 | { |
1161 | kthread_stop(recv_task); | 1137 | destroy_workqueue(recv_workqueue); |
1162 | kthread_stop(send_task); | 1138 | destroy_workqueue(send_workqueue); |
1139 | destroy_workqueue(lock_workqueue); | ||
1163 | } | 1140 | } |
1164 | 1141 | ||
1165 | static int daemons_start(void) | 1142 | static int daemons_start(void) |
1166 | { | 1143 | { |
1167 | struct task_struct *p; | ||
1168 | int error; | 1144 | int error; |
1145 | recv_workqueue = create_workqueue("dlm_recv"); | ||
1146 | error = IS_ERR(recv_workqueue); | ||
1147 | if (error) { | ||
1148 | log_print("can't start dlm_recv %d", error); | ||
1149 | return error; | ||
1150 | } | ||
1169 | 1151 | ||
1170 | p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); | 1152 | send_workqueue = create_singlethread_workqueue("dlm_send"); |
1171 | error = IS_ERR(p); | 1153 | error = IS_ERR(send_workqueue); |
1172 | if (error) { | 1154 | if (error) { |
1173 | log_print("can't start dlm_recvd %d", error); | 1155 | log_print("can't start dlm_send %d", error); |
1156 | destroy_workqueue(recv_workqueue); | ||
1174 | return error; | 1157 | return error; |
1175 | } | 1158 | } |
1176 | recv_task = p; | ||
1177 | 1159 | ||
1178 | p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); | 1160 | lock_workqueue = create_workqueue("dlm_rlock"); |
1179 | error = IS_ERR(p); | 1161 | error = IS_ERR(lock_workqueue); |
1180 | if (error) { | 1162 | if (error) { |
1181 | log_print("can't start dlm_sendd %d", error); | 1163 | log_print("can't start dlm_rlock %d", error); |
1182 | kthread_stop(recv_task); | 1164 | destroy_workqueue(send_workqueue); |
1165 | destroy_workqueue(recv_workqueue); | ||
1183 | return error; | 1166 | return error; |
1184 | } | 1167 | } |
1185 | send_task = p; | ||
1186 | 1168 | ||
1187 | return 0; | 1169 | return 0; |
1188 | } | 1170 | } |
@@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void) | |||
1194 | { | 1176 | { |
1195 | int error; | 1177 | int error; |
1196 | 1178 | ||
1179 | INIT_WORK(&sctp_con.work, process_recv_sockets); | ||
1180 | |||
1197 | error = init_sock(); | 1181 | error = init_sock(); |
1198 | if (error) | 1182 | if (error) |
1199 | goto fail_sock; | 1183 | goto fail_sock; |
@@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void) | |||
1224 | for (i = 0; i < dlm_local_count; i++) | 1208 | for (i = 0; i < dlm_local_count; i++) |
1225 | kfree(dlm_local_addr[i]); | 1209 | kfree(dlm_local_addr[i]); |
1226 | } | 1210 | } |
1227 | |||