aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r--fs/dlm/lowcomms-tcp.c254
1 files changed, 51 insertions, 203 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index b4fb5783ef8a..86e5f81da7cb 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -115,6 +115,8 @@ struct connection {
115 atomic_t waiting_requests; 115 atomic_t waiting_requests;
116#define MAX_CONNECT_RETRIES 3 116#define MAX_CONNECT_RETRIES 3
117 struct connection *othercon; 117 struct connection *othercon;
118 struct work_struct rwork; /* Receive workqueue */
119 struct work_struct swork; /* Send workqueue */
118}; 120};
119#define sock2con(x) ((struct connection *)(x)->sk_user_data) 121#define sock2con(x) ((struct connection *)(x)->sk_user_data)
120 122
@@ -131,14 +133,9 @@ struct writequeue_entry {
131 133
132static struct sockaddr_storage dlm_local_addr; 134static struct sockaddr_storage dlm_local_addr;
133 135
134/* Manage daemons */ 136/* Work queues */
135static struct task_struct *recv_task; 137static struct workqueue_struct *recv_workqueue;
136static struct task_struct *send_task; 138static struct workqueue_struct *send_workqueue;
137
138static wait_queue_t lowcomms_send_waitq_head;
139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
140static wait_queue_t lowcomms_recv_waitq_head;
141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
142 139
143/* An array of pointers to connections, indexed by NODEID */ 140/* An array of pointers to connections, indexed by NODEID */
144static struct connection **connections; 141static struct connection **connections;
@@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock);
146static struct kmem_cache *con_cache; 143static struct kmem_cache *con_cache;
147static int conn_array_size; 144static int conn_array_size;
148 145
149/* List of sockets that have reads pending */ 146static void process_recv_sockets(struct work_struct *work);
150static LIST_HEAD(read_sockets); 147static void process_send_sockets(struct work_struct *work);
151static DEFINE_SPINLOCK(read_sockets_lock);
152
153/* List of sockets which have writes pending */
154static LIST_HEAD(write_sockets);
155static DEFINE_SPINLOCK(write_sockets_lock);
156
157/* List of sockets which have connects pending */
158static LIST_HEAD(state_sockets);
159static DEFINE_SPINLOCK(state_sockets_lock);
160 148
161static struct connection *nodeid2con(int nodeid, gfp_t allocation) 149static struct connection *nodeid2con(int nodeid, gfp_t allocation)
162{ 150{
@@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
189 init_rwsem(&con->sock_sem); 177 init_rwsem(&con->sock_sem);
190 INIT_LIST_HEAD(&con->writequeue); 178 INIT_LIST_HEAD(&con->writequeue);
191 spin_lock_init(&con->writequeue_lock); 179 spin_lock_init(&con->writequeue_lock);
180 INIT_WORK(&con->swork, process_send_sockets);
181 INIT_WORK(&con->rwork, process_recv_sockets);
192 182
193 connections[nodeid] = con; 183 connections[nodeid] = con;
194 } 184 }
@@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
203{ 193{
204 struct connection *con = sock2con(sk); 194 struct connection *con = sock2con(sk);
205 195
206 atomic_inc(&con->waiting_requests); 196 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
207 if (test_and_set_bit(CF_READ_PENDING, &con->flags)) 197 queue_work(recv_workqueue, &con->rwork);
208 return;
209
210 spin_lock_bh(&read_sockets_lock);
211 list_add_tail(&con->read_list, &read_sockets);
212 spin_unlock_bh(&read_sockets_lock);
213
214 wake_up_interruptible(&lowcomms_recv_waitq);
215} 198}
216 199
217static void lowcomms_write_space(struct sock *sk) 200static void lowcomms_write_space(struct sock *sk)
218{ 201{
219 struct connection *con = sock2con(sk); 202 struct connection *con = sock2con(sk);
220 203
221 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 204 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
222 return; 205 queue_work(send_workqueue, &con->swork);
223
224 spin_lock_bh(&write_sockets_lock);
225 list_add_tail(&con->write_list, &write_sockets);
226 spin_unlock_bh(&write_sockets_lock);
227
228 wake_up_interruptible(&lowcomms_send_waitq);
229} 206}
230 207
231static inline void lowcomms_connect_sock(struct connection *con) 208static inline void lowcomms_connect_sock(struct connection *con)
232{ 209{
233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 210 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
234 return; 211 queue_work(send_workqueue, &con->swork);
235
236 spin_lock_bh(&state_sockets_lock);
237 list_add_tail(&con->state_list, &state_sockets);
238 spin_unlock_bh(&state_sockets_lock);
239
240 wake_up_interruptible(&lowcomms_send_waitq);
241} 212}
242 213
243static void lowcomms_state_change(struct sock *sk) 214static void lowcomms_state_change(struct sock *sk)
@@ -388,7 +359,8 @@ out:
388 return 0; 359 return 0;
389 360
390out_resched: 361out_resched:
391 lowcomms_data_ready(con->sock->sk, 0); 362 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
363 queue_work(recv_workqueue, &con->rwork);
392 up_read(&con->sock_sem); 364 up_read(&con->sock_sem);
393 cond_resched(); 365 cond_resched();
394 return 0; 366 return 0;
@@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con)
477 othercon->nodeid = nodeid; 449 othercon->nodeid = nodeid;
478 othercon->rx_action = receive_from_sock; 450 othercon->rx_action = receive_from_sock;
479 init_rwsem(&othercon->sock_sem); 451 init_rwsem(&othercon->sock_sem);
452 INIT_WORK(&othercon->swork, process_send_sockets);
453 INIT_WORK(&othercon->rwork, process_recv_sockets);
480 set_bit(CF_IS_OTHERCON, &othercon->flags); 454 set_bit(CF_IS_OTHERCON, &othercon->flags);
481 newcon->othercon = othercon; 455 newcon->othercon = othercon;
482 } 456 }
@@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con)
498 * beween processing the accept adding the socket 472 * beween processing the accept adding the socket
499 * to the read_sockets list 473 * to the read_sockets list
500 */ 474 */
501 lowcomms_data_ready(newsock->sk, 0); 475 if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags))
476 queue_work(recv_workqueue, &newcon->rwork);
502 up_read(&con->sock_sem); 477 up_read(&con->sock_sem);
503 478
504 return 0; 479 return 0;
@@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
757 kunmap(e->page); 732 kunmap(e->page);
758 spin_unlock(&con->writequeue_lock); 733 spin_unlock(&con->writequeue_lock);
759 734
760 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { 735 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
761 spin_lock_bh(&write_sockets_lock); 736 queue_work(send_workqueue, &con->swork);
762 list_add_tail(&con->write_list, &write_sockets);
763 spin_unlock_bh(&write_sockets_lock);
764
765 wake_up_interruptible(&lowcomms_send_waitq);
766 } 737 }
767 return; 738 return;
768 739
@@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con)
803 offset = e->offset; 774 offset = e->offset;
804 BUG_ON(len == 0 && e->users == 0); 775 BUG_ON(len == 0 && e->users == 0);
805 spin_unlock(&con->writequeue_lock); 776 spin_unlock(&con->writequeue_lock);
777 kmap(e->page);
806 778
807 ret = 0; 779 ret = 0;
808 if (len) { 780 if (len) {
@@ -884,85 +856,29 @@ out:
884} 856}
885 857
886/* Look for activity on active sockets */ 858/* Look for activity on active sockets */
887static void process_sockets(void) 859static void process_recv_sockets(struct work_struct *work)
888{ 860{
889 struct list_head *list; 861 struct connection *con = container_of(work, struct connection, rwork);
890 struct list_head *temp; 862 int err;
891 int count = 0;
892
893 spin_lock_bh(&read_sockets_lock);
894 list_for_each_safe(list, temp, &read_sockets) {
895
896 struct connection *con =
897 list_entry(list, struct connection, read_list);
898 list_del(&con->read_list);
899 clear_bit(CF_READ_PENDING, &con->flags);
900
901 spin_unlock_bh(&read_sockets_lock);
902 863
903 /* This can reach zero if we are processing requests 864 clear_bit(CF_READ_PENDING, &con->flags);
904 * as they come in. 865 do {
905 */ 866 err = con->rx_action(con);
906 if (atomic_read(&con->waiting_requests) == 0) { 867 } while (!err);
907 spin_lock_bh(&read_sockets_lock);
908 continue;
909 }
910
911 do {
912 con->rx_action(con);
913
914 /* Don't starve out everyone else */
915 if (++count >= MAX_RX_MSG_COUNT) {
916 cond_resched();
917 count = 0;
918 }
919
920 } while (!atomic_dec_and_test(&con->waiting_requests) &&
921 !kthread_should_stop());
922
923 spin_lock_bh(&read_sockets_lock);
924 }
925 spin_unlock_bh(&read_sockets_lock);
926} 868}
927 869
928/* Try to send any messages that are pending
929 */
930static void process_output_queue(void)
931{
932 struct list_head *list;
933 struct list_head *temp;
934
935 spin_lock_bh(&write_sockets_lock);
936 list_for_each_safe(list, temp, &write_sockets) {
937 struct connection *con =
938 list_entry(list, struct connection, write_list);
939 clear_bit(CF_WRITE_PENDING, &con->flags);
940 list_del(&con->write_list);
941
942 spin_unlock_bh(&write_sockets_lock);
943 send_to_sock(con);
944 spin_lock_bh(&write_sockets_lock);
945 }
946 spin_unlock_bh(&write_sockets_lock);
947}
948 870
949static void process_state_queue(void) 871static void process_send_sockets(struct work_struct *work)
950{ 872{
951 struct list_head *list; 873 struct connection *con = container_of(work, struct connection, swork);
952 struct list_head *temp;
953
954 spin_lock_bh(&state_sockets_lock);
955 list_for_each_safe(list, temp, &state_sockets) {
956 struct connection *con =
957 list_entry(list, struct connection, state_list);
958 list_del(&con->state_list);
959 clear_bit(CF_CONNECT_PENDING, &con->flags);
960 spin_unlock_bh(&state_sockets_lock);
961 874
875 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
962 connect_to_sock(con); 876 connect_to_sock(con);
963 spin_lock_bh(&state_sockets_lock);
964 } 877 }
965 spin_unlock_bh(&state_sockets_lock); 878
879 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) {
880 send_to_sock(con);
881 }
966} 882}
967 883
968 884
@@ -979,97 +895,29 @@ static void clean_writequeues(void)
979 } 895 }
980} 896}
981 897
982static int read_list_empty(void) 898static void work_stop(void)
983{
984 int status;
985
986 spin_lock_bh(&read_sockets_lock);
987 status = list_empty(&read_sockets);
988 spin_unlock_bh(&read_sockets_lock);
989
990 return status;
991}
992
993/* DLM Transport comms receive daemon */
994static int dlm_recvd(void *data)
995{ 899{
996 init_waitqueue_entry(&lowcomms_recv_waitq_head, current); 900 destroy_workqueue(recv_workqueue);
997 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); 901 destroy_workqueue(send_workqueue);
998
999 while (!kthread_should_stop()) {
1000 set_current_state(TASK_INTERRUPTIBLE);
1001 if (read_list_empty())
1002 schedule();
1003 set_current_state(TASK_RUNNING);
1004
1005 process_sockets();
1006 }
1007
1008 return 0;
1009} 902}
1010 903
1011static int write_and_state_lists_empty(void) 904static int work_start(void)
1012{ 905{
1013 int status;
1014
1015 spin_lock_bh(&write_sockets_lock);
1016 status = list_empty(&write_sockets);
1017 spin_unlock_bh(&write_sockets_lock);
1018
1019 spin_lock_bh(&state_sockets_lock);
1020 if (list_empty(&state_sockets) == 0)
1021 status = 0;
1022 spin_unlock_bh(&state_sockets_lock);
1023
1024 return status;
1025}
1026
1027/* DLM Transport send daemon */
1028static int dlm_sendd(void *data)
1029{
1030 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1031 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1032
1033 while (!kthread_should_stop()) {
1034 set_current_state(TASK_INTERRUPTIBLE);
1035 if (write_and_state_lists_empty())
1036 schedule();
1037 set_current_state(TASK_RUNNING);
1038
1039 process_state_queue();
1040 process_output_queue();
1041 }
1042
1043 return 0;
1044}
1045
1046static void daemons_stop(void)
1047{
1048 kthread_stop(recv_task);
1049 kthread_stop(send_task);
1050}
1051
1052static int daemons_start(void)
1053{
1054 struct task_struct *p;
1055 int error; 906 int error;
1056 907 recv_workqueue = create_workqueue("dlm_recv");
1057 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 908 error = IS_ERR(recv_workqueue);
1058 error = IS_ERR(p);
1059 if (error) { 909 if (error) {
1060 log_print("can't start dlm_recvd %d", error); 910 log_print("can't start dlm_recv %d", error);
1061 return error; 911 return error;
1062 } 912 }
1063 recv_task = p;
1064 913
1065 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 914 send_workqueue = create_singlethread_workqueue("dlm_send");
1066 error = IS_ERR(p); 915 error = IS_ERR(send_workqueue);
1067 if (error) { 916 if (error) {
1068 log_print("can't start dlm_sendd %d", error); 917 log_print("can't start dlm_send %d", error);
1069 kthread_stop(recv_task); 918 destroy_workqueue(recv_workqueue);
1070 return error; 919 return error;
1071 } 920 }
1072 send_task = p;
1073 921
1074 return 0; 922 return 0;
1075} 923}
@@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void)
1086 connections[i]->flags |= 0xFF; 934 connections[i]->flags |= 0xFF;
1087 } 935 }
1088 936
1089 daemons_stop(); 937 work_stop();
1090 clean_writequeues(); 938 clean_writequeues();
1091 939
1092 for (i = 0; i < conn_array_size; i++) { 940 for (i = 0; i < conn_array_size; i++) {
@@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void)
1138 if (error) 986 if (error)
1139 goto fail_unlisten; 987 goto fail_unlisten;
1140 988
1141 error = daemons_start(); 989 error = work_start();
1142 if (error) 990 if (error)
1143 goto fail_unlisten; 991 goto fail_unlisten;
1144 992