diff options
author | Patrick Caulfield <pcaulfie@redhat.com> | 2007-01-15 09:33:34 -0500 |
---|---|---|
committer | Steven Whitehouse <swhiteho@redhat.com> | 2007-02-05 13:36:52 -0500 |
commit | 1d6e8131cf0064ef5ab5f3411a82b800afbfadee (patch) | |
tree | d165b0210a2c12de8848b2b8bc30237183b611dc /fs/dlm/lowcomms-tcp.c | |
parent | 03dc6a538e42bcc8d5dfabcee208b639db85a80c (diff) |
[DLM] Use workqueues for dlm lowcomms
This patch converts the DLM TCP lowcomms to use workqueues rather than using its
own daemon functions. Simultaneously removing a lot of code and making it more
scalable on multi-processor machines.
Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r-- | fs/dlm/lowcomms-tcp.c | 254 |
1 files changed, 51 insertions, 203 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index b4fb5783ef8a..86e5f81da7cb 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c | |||
@@ -115,6 +115,8 @@ struct connection { | |||
115 | atomic_t waiting_requests; | 115 | atomic_t waiting_requests; |
116 | #define MAX_CONNECT_RETRIES 3 | 116 | #define MAX_CONNECT_RETRIES 3 |
117 | struct connection *othercon; | 117 | struct connection *othercon; |
118 | struct work_struct rwork; /* Receive workqueue */ | ||
119 | struct work_struct swork; /* Send workqueue */ | ||
118 | }; | 120 | }; |
119 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) | 121 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) |
120 | 122 | ||
@@ -131,14 +133,9 @@ struct writequeue_entry { | |||
131 | 133 | ||
132 | static struct sockaddr_storage dlm_local_addr; | 134 | static struct sockaddr_storage dlm_local_addr; |
133 | 135 | ||
134 | /* Manage daemons */ | 136 | /* Work queues */ |
135 | static struct task_struct *recv_task; | 137 | static struct workqueue_struct *recv_workqueue; |
136 | static struct task_struct *send_task; | 138 | static struct workqueue_struct *send_workqueue; |
137 | |||
138 | static wait_queue_t lowcomms_send_waitq_head; | ||
139 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq); | ||
140 | static wait_queue_t lowcomms_recv_waitq_head; | ||
141 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq); | ||
142 | 139 | ||
143 | /* An array of pointers to connections, indexed by NODEID */ | 140 | /* An array of pointers to connections, indexed by NODEID */ |
144 | static struct connection **connections; | 141 | static struct connection **connections; |
@@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock); | |||
146 | static struct kmem_cache *con_cache; | 143 | static struct kmem_cache *con_cache; |
147 | static int conn_array_size; | 144 | static int conn_array_size; |
148 | 145 | ||
149 | /* List of sockets that have reads pending */ | 146 | static void process_recv_sockets(struct work_struct *work); |
150 | static LIST_HEAD(read_sockets); | 147 | static void process_send_sockets(struct work_struct *work); |
151 | static DEFINE_SPINLOCK(read_sockets_lock); | ||
152 | |||
153 | /* List of sockets which have writes pending */ | ||
154 | static LIST_HEAD(write_sockets); | ||
155 | static DEFINE_SPINLOCK(write_sockets_lock); | ||
156 | |||
157 | /* List of sockets which have connects pending */ | ||
158 | static LIST_HEAD(state_sockets); | ||
159 | static DEFINE_SPINLOCK(state_sockets_lock); | ||
160 | 148 | ||
161 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) | 149 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) |
162 | { | 150 | { |
@@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) | |||
189 | init_rwsem(&con->sock_sem); | 177 | init_rwsem(&con->sock_sem); |
190 | INIT_LIST_HEAD(&con->writequeue); | 178 | INIT_LIST_HEAD(&con->writequeue); |
191 | spin_lock_init(&con->writequeue_lock); | 179 | spin_lock_init(&con->writequeue_lock); |
180 | INIT_WORK(&con->swork, process_send_sockets); | ||
181 | INIT_WORK(&con->rwork, process_recv_sockets); | ||
192 | 182 | ||
193 | connections[nodeid] = con; | 183 | connections[nodeid] = con; |
194 | } | 184 | } |
@@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused) | |||
203 | { | 193 | { |
204 | struct connection *con = sock2con(sk); | 194 | struct connection *con = sock2con(sk); |
205 | 195 | ||
206 | atomic_inc(&con->waiting_requests); | 196 | if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) |
207 | if (test_and_set_bit(CF_READ_PENDING, &con->flags)) | 197 | queue_work(recv_workqueue, &con->rwork); |
208 | return; | ||
209 | |||
210 | spin_lock_bh(&read_sockets_lock); | ||
211 | list_add_tail(&con->read_list, &read_sockets); | ||
212 | spin_unlock_bh(&read_sockets_lock); | ||
213 | |||
214 | wake_up_interruptible(&lowcomms_recv_waitq); | ||
215 | } | 198 | } |
216 | 199 | ||
217 | static void lowcomms_write_space(struct sock *sk) | 200 | static void lowcomms_write_space(struct sock *sk) |
218 | { | 201 | { |
219 | struct connection *con = sock2con(sk); | 202 | struct connection *con = sock2con(sk); |
220 | 203 | ||
221 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) | 204 | if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) |
222 | return; | 205 | queue_work(send_workqueue, &con->swork); |
223 | |||
224 | spin_lock_bh(&write_sockets_lock); | ||
225 | list_add_tail(&con->write_list, &write_sockets); | ||
226 | spin_unlock_bh(&write_sockets_lock); | ||
227 | |||
228 | wake_up_interruptible(&lowcomms_send_waitq); | ||
229 | } | 206 | } |
230 | 207 | ||
231 | static inline void lowcomms_connect_sock(struct connection *con) | 208 | static inline void lowcomms_connect_sock(struct connection *con) |
232 | { | 209 | { |
233 | if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) | 210 | if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) |
234 | return; | 211 | queue_work(send_workqueue, &con->swork); |
235 | |||
236 | spin_lock_bh(&state_sockets_lock); | ||
237 | list_add_tail(&con->state_list, &state_sockets); | ||
238 | spin_unlock_bh(&state_sockets_lock); | ||
239 | |||
240 | wake_up_interruptible(&lowcomms_send_waitq); | ||
241 | } | 212 | } |
242 | 213 | ||
243 | static void lowcomms_state_change(struct sock *sk) | 214 | static void lowcomms_state_change(struct sock *sk) |
@@ -388,7 +359,8 @@ out: | |||
388 | return 0; | 359 | return 0; |
389 | 360 | ||
390 | out_resched: | 361 | out_resched: |
391 | lowcomms_data_ready(con->sock->sk, 0); | 362 | if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) |
363 | queue_work(recv_workqueue, &con->rwork); | ||
392 | up_read(&con->sock_sem); | 364 | up_read(&con->sock_sem); |
393 | cond_resched(); | 365 | cond_resched(); |
394 | return 0; | 366 | return 0; |
@@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con) | |||
477 | othercon->nodeid = nodeid; | 449 | othercon->nodeid = nodeid; |
478 | othercon->rx_action = receive_from_sock; | 450 | othercon->rx_action = receive_from_sock; |
479 | init_rwsem(&othercon->sock_sem); | 451 | init_rwsem(&othercon->sock_sem); |
452 | INIT_WORK(&othercon->swork, process_send_sockets); | ||
453 | INIT_WORK(&othercon->rwork, process_recv_sockets); | ||
480 | set_bit(CF_IS_OTHERCON, &othercon->flags); | 454 | set_bit(CF_IS_OTHERCON, &othercon->flags); |
481 | newcon->othercon = othercon; | 455 | newcon->othercon = othercon; |
482 | } | 456 | } |
@@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con) | |||
498 | * beween processing the accept adding the socket | 472 | * beween processing the accept adding the socket |
499 | * to the read_sockets list | 473 | * to the read_sockets list |
500 | */ | 474 | */ |
501 | lowcomms_data_ready(newsock->sk, 0); | 475 | if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags)) |
476 | queue_work(recv_workqueue, &newcon->rwork); | ||
502 | up_read(&con->sock_sem); | 477 | up_read(&con->sock_sem); |
503 | 478 | ||
504 | return 0; | 479 | return 0; |
@@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh) | |||
757 | kunmap(e->page); | 732 | kunmap(e->page); |
758 | spin_unlock(&con->writequeue_lock); | 733 | spin_unlock(&con->writequeue_lock); |
759 | 734 | ||
760 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { | 735 | if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { |
761 | spin_lock_bh(&write_sockets_lock); | 736 | queue_work(send_workqueue, &con->swork); |
762 | list_add_tail(&con->write_list, &write_sockets); | ||
763 | spin_unlock_bh(&write_sockets_lock); | ||
764 | |||
765 | wake_up_interruptible(&lowcomms_send_waitq); | ||
766 | } | 737 | } |
767 | return; | 738 | return; |
768 | 739 | ||
@@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con) | |||
803 | offset = e->offset; | 774 | offset = e->offset; |
804 | BUG_ON(len == 0 && e->users == 0); | 775 | BUG_ON(len == 0 && e->users == 0); |
805 | spin_unlock(&con->writequeue_lock); | 776 | spin_unlock(&con->writequeue_lock); |
777 | kmap(e->page); | ||
806 | 778 | ||
807 | ret = 0; | 779 | ret = 0; |
808 | if (len) { | 780 | if (len) { |
@@ -884,85 +856,29 @@ out: | |||
884 | } | 856 | } |
885 | 857 | ||
886 | /* Look for activity on active sockets */ | 858 | /* Look for activity on active sockets */ |
887 | static void process_sockets(void) | 859 | static void process_recv_sockets(struct work_struct *work) |
888 | { | 860 | { |
889 | struct list_head *list; | 861 | struct connection *con = container_of(work, struct connection, rwork); |
890 | struct list_head *temp; | 862 | int err; |
891 | int count = 0; | ||
892 | |||
893 | spin_lock_bh(&read_sockets_lock); | ||
894 | list_for_each_safe(list, temp, &read_sockets) { | ||
895 | |||
896 | struct connection *con = | ||
897 | list_entry(list, struct connection, read_list); | ||
898 | list_del(&con->read_list); | ||
899 | clear_bit(CF_READ_PENDING, &con->flags); | ||
900 | |||
901 | spin_unlock_bh(&read_sockets_lock); | ||
902 | 863 | ||
903 | /* This can reach zero if we are processing requests | 864 | clear_bit(CF_READ_PENDING, &con->flags); |
904 | * as they come in. | 865 | do { |
905 | */ | 866 | err = con->rx_action(con); |
906 | if (atomic_read(&con->waiting_requests) == 0) { | 867 | } while (!err); |
907 | spin_lock_bh(&read_sockets_lock); | ||
908 | continue; | ||
909 | } | ||
910 | |||
911 | do { | ||
912 | con->rx_action(con); | ||
913 | |||
914 | /* Don't starve out everyone else */ | ||
915 | if (++count >= MAX_RX_MSG_COUNT) { | ||
916 | cond_resched(); | ||
917 | count = 0; | ||
918 | } | ||
919 | |||
920 | } while (!atomic_dec_and_test(&con->waiting_requests) && | ||
921 | !kthread_should_stop()); | ||
922 | |||
923 | spin_lock_bh(&read_sockets_lock); | ||
924 | } | ||
925 | spin_unlock_bh(&read_sockets_lock); | ||
926 | } | 868 | } |
927 | 869 | ||
928 | /* Try to send any messages that are pending | ||
929 | */ | ||
930 | static void process_output_queue(void) | ||
931 | { | ||
932 | struct list_head *list; | ||
933 | struct list_head *temp; | ||
934 | |||
935 | spin_lock_bh(&write_sockets_lock); | ||
936 | list_for_each_safe(list, temp, &write_sockets) { | ||
937 | struct connection *con = | ||
938 | list_entry(list, struct connection, write_list); | ||
939 | clear_bit(CF_WRITE_PENDING, &con->flags); | ||
940 | list_del(&con->write_list); | ||
941 | |||
942 | spin_unlock_bh(&write_sockets_lock); | ||
943 | send_to_sock(con); | ||
944 | spin_lock_bh(&write_sockets_lock); | ||
945 | } | ||
946 | spin_unlock_bh(&write_sockets_lock); | ||
947 | } | ||
948 | 870 | ||
949 | static void process_state_queue(void) | 871 | static void process_send_sockets(struct work_struct *work) |
950 | { | 872 | { |
951 | struct list_head *list; | 873 | struct connection *con = container_of(work, struct connection, swork); |
952 | struct list_head *temp; | ||
953 | |||
954 | spin_lock_bh(&state_sockets_lock); | ||
955 | list_for_each_safe(list, temp, &state_sockets) { | ||
956 | struct connection *con = | ||
957 | list_entry(list, struct connection, state_list); | ||
958 | list_del(&con->state_list); | ||
959 | clear_bit(CF_CONNECT_PENDING, &con->flags); | ||
960 | spin_unlock_bh(&state_sockets_lock); | ||
961 | 874 | ||
875 | if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { | ||
962 | connect_to_sock(con); | 876 | connect_to_sock(con); |
963 | spin_lock_bh(&state_sockets_lock); | ||
964 | } | 877 | } |
965 | spin_unlock_bh(&state_sockets_lock); | 878 | |
879 | if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) { | ||
880 | send_to_sock(con); | ||
881 | } | ||
966 | } | 882 | } |
967 | 883 | ||
968 | 884 | ||
@@ -979,97 +895,29 @@ static void clean_writequeues(void) | |||
979 | } | 895 | } |
980 | } | 896 | } |
981 | 897 | ||
982 | static int read_list_empty(void) | 898 | static void work_stop(void) |
983 | { | ||
984 | int status; | ||
985 | |||
986 | spin_lock_bh(&read_sockets_lock); | ||
987 | status = list_empty(&read_sockets); | ||
988 | spin_unlock_bh(&read_sockets_lock); | ||
989 | |||
990 | return status; | ||
991 | } | ||
992 | |||
993 | /* DLM Transport comms receive daemon */ | ||
994 | static int dlm_recvd(void *data) | ||
995 | { | 899 | { |
996 | init_waitqueue_entry(&lowcomms_recv_waitq_head, current); | 900 | destroy_workqueue(recv_workqueue); |
997 | add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); | 901 | destroy_workqueue(send_workqueue); |
998 | |||
999 | while (!kthread_should_stop()) { | ||
1000 | set_current_state(TASK_INTERRUPTIBLE); | ||
1001 | if (read_list_empty()) | ||
1002 | schedule(); | ||
1003 | set_current_state(TASK_RUNNING); | ||
1004 | |||
1005 | process_sockets(); | ||
1006 | } | ||
1007 | |||
1008 | return 0; | ||
1009 | } | 902 | } |
1010 | 903 | ||
1011 | static int write_and_state_lists_empty(void) | 904 | static int work_start(void) |
1012 | { | 905 | { |
1013 | int status; | ||
1014 | |||
1015 | spin_lock_bh(&write_sockets_lock); | ||
1016 | status = list_empty(&write_sockets); | ||
1017 | spin_unlock_bh(&write_sockets_lock); | ||
1018 | |||
1019 | spin_lock_bh(&state_sockets_lock); | ||
1020 | if (list_empty(&state_sockets) == 0) | ||
1021 | status = 0; | ||
1022 | spin_unlock_bh(&state_sockets_lock); | ||
1023 | |||
1024 | return status; | ||
1025 | } | ||
1026 | |||
1027 | /* DLM Transport send daemon */ | ||
1028 | static int dlm_sendd(void *data) | ||
1029 | { | ||
1030 | init_waitqueue_entry(&lowcomms_send_waitq_head, current); | ||
1031 | add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); | ||
1032 | |||
1033 | while (!kthread_should_stop()) { | ||
1034 | set_current_state(TASK_INTERRUPTIBLE); | ||
1035 | if (write_and_state_lists_empty()) | ||
1036 | schedule(); | ||
1037 | set_current_state(TASK_RUNNING); | ||
1038 | |||
1039 | process_state_queue(); | ||
1040 | process_output_queue(); | ||
1041 | } | ||
1042 | |||
1043 | return 0; | ||
1044 | } | ||
1045 | |||
1046 | static void daemons_stop(void) | ||
1047 | { | ||
1048 | kthread_stop(recv_task); | ||
1049 | kthread_stop(send_task); | ||
1050 | } | ||
1051 | |||
1052 | static int daemons_start(void) | ||
1053 | { | ||
1054 | struct task_struct *p; | ||
1055 | int error; | 906 | int error; |
1056 | 907 | recv_workqueue = create_workqueue("dlm_recv"); | |
1057 | p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); | 908 | error = IS_ERR(recv_workqueue); |
1058 | error = IS_ERR(p); | ||
1059 | if (error) { | 909 | if (error) { |
1060 | log_print("can't start dlm_recvd %d", error); | 910 | log_print("can't start dlm_recv %d", error); |
1061 | return error; | 911 | return error; |
1062 | } | 912 | } |
1063 | recv_task = p; | ||
1064 | 913 | ||
1065 | p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); | 914 | send_workqueue = create_singlethread_workqueue("dlm_send"); |
1066 | error = IS_ERR(p); | 915 | error = IS_ERR(send_workqueue); |
1067 | if (error) { | 916 | if (error) { |
1068 | log_print("can't start dlm_sendd %d", error); | 917 | log_print("can't start dlm_send %d", error); |
1069 | kthread_stop(recv_task); | 918 | destroy_workqueue(recv_workqueue); |
1070 | return error; | 919 | return error; |
1071 | } | 920 | } |
1072 | send_task = p; | ||
1073 | 921 | ||
1074 | return 0; | 922 | return 0; |
1075 | } | 923 | } |
@@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void) | |||
1086 | connections[i]->flags |= 0xFF; | 934 | connections[i]->flags |= 0xFF; |
1087 | } | 935 | } |
1088 | 936 | ||
1089 | daemons_stop(); | 937 | work_stop(); |
1090 | clean_writequeues(); | 938 | clean_writequeues(); |
1091 | 939 | ||
1092 | for (i = 0; i < conn_array_size; i++) { | 940 | for (i = 0; i < conn_array_size; i++) { |
@@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void) | |||
1138 | if (error) | 986 | if (error) |
1139 | goto fail_unlisten; | 987 | goto fail_unlisten; |
1140 | 988 | ||
1141 | error = daemons_start(); | 989 | error = work_start(); |
1142 | if (error) | 990 | if (error) |
1143 | goto fail_unlisten; | 991 | goto fail_unlisten; |
1144 | 992 | ||