aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms-sctp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms-sctp.c')
-rw-r--r--fs/dlm/lowcomms-sctp.c151
1 files changed, 67 insertions, 84 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
index fe158d7a9285..dc83a9d979b5 100644
--- a/fs/dlm/lowcomms-sctp.c
+++ b/fs/dlm/lowcomms-sctp.c
@@ -72,6 +72,8 @@ struct nodeinfo {
72 struct list_head writequeue; /* outgoing writequeue_entries */ 72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock; 73 spinlock_t writequeue_lock;
74 int nodeid; 74 int nodeid;
75 struct work_struct swork; /* Send workqueue */
76 struct work_struct lwork; /* Locking workqueue */
75}; 77};
76 78
77static DEFINE_IDR(nodeinfo_idr); 79static DEFINE_IDR(nodeinfo_idr);
@@ -96,6 +98,7 @@ struct connection {
96 atomic_t waiting_requests; 98 atomic_t waiting_requests;
97 struct cbuf cb; 99 struct cbuf cb;
98 int eagain_flag; 100 int eagain_flag;
101 struct work_struct work; /* Send workqueue */
99}; 102};
100 103
101/* An entry waiting to be sent */ 104/* An entry waiting to be sent */
@@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n)
137static LIST_HEAD(write_nodes); 140static LIST_HEAD(write_nodes);
138static DEFINE_SPINLOCK(write_nodes_lock); 141static DEFINE_SPINLOCK(write_nodes_lock);
139 142
143
140/* Maximum number of incoming messages to process before 144/* Maximum number of incoming messages to process before
141 * doing a schedule() 145 * doing a schedule()
142 */ 146 */
143#define MAX_RX_MSG_COUNT 25 147#define MAX_RX_MSG_COUNT 25
144 148
145/* Manage daemons */ 149/* Work queues */
146static struct task_struct *recv_task; 150static struct workqueue_struct *recv_workqueue;
147static struct task_struct *send_task; 151static struct workqueue_struct *send_workqueue;
148static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); 152static struct workqueue_struct *lock_workqueue;
149 153
150/* The SCTP connection */ 154/* The SCTP connection */
151static struct connection sctp_con; 155static struct connection sctp_con;
152 156
157static void process_send_sockets(struct work_struct *work);
158static void process_recv_sockets(struct work_struct *work);
159static void process_lock_request(struct work_struct *work);
153 160
154static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 161static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
155{ 162{
@@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
222 spin_lock_init(&ni->lock); 229 spin_lock_init(&ni->lock);
223 INIT_LIST_HEAD(&ni->writequeue); 230 INIT_LIST_HEAD(&ni->writequeue);
224 spin_lock_init(&ni->writequeue_lock); 231 spin_lock_init(&ni->writequeue_lock);
232 INIT_WORK(&ni->lwork, process_lock_request);
233 INIT_WORK(&ni->swork, process_send_sockets);
225 ni->nodeid = nodeid; 234 ni->nodeid = nodeid;
226 235
227 if (nodeid > max_nodeid) 236 if (nodeid > max_nodeid)
@@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
249/* Data or notification available on socket */ 258/* Data or notification available on socket */
250static void lowcomms_data_ready(struct sock *sk, int count_unused) 259static void lowcomms_data_ready(struct sock *sk, int count_unused)
251{ 260{
252 atomic_inc(&sctp_con.waiting_requests);
253 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) 261 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
254 return; 262 queue_work(recv_workqueue, &sctp_con.work);
255
256 wake_up_interruptible(&lowcomms_recv_wait);
257} 263}
258 264
259 265
@@ -361,10 +367,10 @@ static void init_failed(void)
361 spin_lock_bh(&write_nodes_lock); 367 spin_lock_bh(&write_nodes_lock);
362 list_add_tail(&ni->write_list, &write_nodes); 368 list_add_tail(&ni->write_list, &write_nodes);
363 spin_unlock_bh(&write_nodes_lock); 369 spin_unlock_bh(&write_nodes_lock);
370 queue_work(send_workqueue, &ni->swork);
364 } 371 }
365 } 372 }
366 } 373 }
367 wake_up_process(send_task);
368} 374}
369 375
370/* Something happened to an association */ 376/* Something happened to an association */
@@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
446 spin_lock_bh(&write_nodes_lock); 452 spin_lock_bh(&write_nodes_lock);
447 list_add_tail(&ni->write_list, &write_nodes); 453 list_add_tail(&ni->write_list, &write_nodes);
448 spin_unlock_bh(&write_nodes_lock); 454 spin_unlock_bh(&write_nodes_lock);
455 queue_work(send_workqueue, &ni->swork);
449 } 456 }
450 wake_up_process(send_task);
451 } 457 }
452 break; 458 break;
453 459
@@ -580,8 +586,8 @@ static int receive_from_sock(void)
580 spin_lock_bh(&write_nodes_lock); 586 spin_lock_bh(&write_nodes_lock);
581 list_add_tail(&ni->write_list, &write_nodes); 587 list_add_tail(&ni->write_list, &write_nodes);
582 spin_unlock_bh(&write_nodes_lock); 588 spin_unlock_bh(&write_nodes_lock);
589 queue_work(send_workqueue, &ni->swork);
583 } 590 }
584 wake_up_process(send_task);
585 } 591 }
586 } 592 }
587 593
@@ -590,6 +596,7 @@ static int receive_from_sock(void)
590 return 0; 596 return 0;
591 597
592 cbuf_add(&sctp_con.cb, ret); 598 cbuf_add(&sctp_con.cb, ret);
599 // PJC: TODO: Add to node's workqueue....can we ??
593 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), 600 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
594 page_address(sctp_con.rx_page), 601 page_address(sctp_con.rx_page),
595 sctp_con.cb.base, sctp_con.cb.len, 602 sctp_con.cb.base, sctp_con.cb.len,
@@ -635,7 +642,7 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
635 642
636 if (result < 0) 643 if (result < 0)
637 log_print("Can't bind to port %d addr number %d", 644 log_print("Can't bind to port %d addr number %d",
638 dlm_config.tcp_port, num); 645 dlm_config.ci_tcp_port, num);
639 646
640 return result; 647 return result;
641} 648}
@@ -711,7 +718,7 @@ static int init_sock(void)
711 /* Bind to all interfaces. */ 718 /* Bind to all interfaces. */
712 for (i = 0; i < dlm_local_count; i++) { 719 for (i = 0; i < dlm_local_count; i++) {
713 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 720 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
714 make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len); 721 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
715 722
716 result = add_bind_addr(&localaddr, addr_len, num); 723 result = add_bind_addr(&localaddr, addr_len, num);
717 if (result) 724 if (result)
@@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg)
820 spin_lock_bh(&write_nodes_lock); 827 spin_lock_bh(&write_nodes_lock);
821 list_add_tail(&ni->write_list, &write_nodes); 828 list_add_tail(&ni->write_list, &write_nodes);
822 spin_unlock_bh(&write_nodes_lock); 829 spin_unlock_bh(&write_nodes_lock);
823 wake_up_process(send_task); 830
831 queue_work(send_workqueue, &ni->swork);
824 } 832 }
825 return; 833 return;
826 834
@@ -863,7 +871,7 @@ static void initiate_association(int nodeid)
863 return; 871 return;
864 } 872 }
865 873
866 make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen); 874 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
867 875
868 outmessage.msg_name = &rem_addr; 876 outmessage.msg_name = &rem_addr;
869 outmessage.msg_namelen = addrlen; 877 outmessage.msg_namelen = addrlen;
@@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid)
1088 return 0; 1096 return 0;
1089} 1097}
1090 1098
1091static int write_list_empty(void) 1099// PJC: The work queue function for receiving.
1100static void process_recv_sockets(struct work_struct *work)
1092{ 1101{
1093 int status; 1102 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1094 1103 int ret;
1095 spin_lock_bh(&write_nodes_lock);
1096 status = list_empty(&write_nodes);
1097 spin_unlock_bh(&write_nodes_lock);
1098
1099 return status;
1100}
1101
1102static int dlm_recvd(void *data)
1103{
1104 DECLARE_WAITQUEUE(wait, current);
1105
1106 while (!kthread_should_stop()) {
1107 int count = 0; 1104 int count = 0;
1108 1105
1109 set_current_state(TASK_INTERRUPTIBLE); 1106 do {
1110 add_wait_queue(&lowcomms_recv_wait, &wait); 1107 ret = receive_from_sock();
1111 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1112 cond_resched();
1113 remove_wait_queue(&lowcomms_recv_wait, &wait);
1114 set_current_state(TASK_RUNNING);
1115
1116 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1117 int ret;
1118
1119 do {
1120 ret = receive_from_sock();
1121 1108
1122 /* Don't starve out everyone else */ 1109 /* Don't starve out everyone else */
1123 if (++count >= MAX_RX_MSG_COUNT) { 1110 if (++count >= MAX_RX_MSG_COUNT) {
1124 cond_resched(); 1111 cond_resched();
1125 count = 0; 1112 count = 0;
1126 } 1113 }
1127 } while (!kthread_should_stop() && ret >=0); 1114 } while (!kthread_should_stop() && ret >=0);
1128 }
1129 cond_resched();
1130 } 1115 }
1131 1116 cond_resched();
1132 return 0;
1133} 1117}
1134 1118
1135static int dlm_sendd(void *data) 1119// PJC: the work queue function for sending
1120static void process_send_sockets(struct work_struct *work)
1136{ 1121{
1137 DECLARE_WAITQUEUE(wait, current); 1122 if (sctp_con.eagain_flag) {
1138 1123 sctp_con.eagain_flag = 0;
1139 add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); 1124 refill_write_queue();
1140
1141 while (!kthread_should_stop()) {
1142 set_current_state(TASK_INTERRUPTIBLE);
1143 if (write_list_empty())
1144 cond_resched();
1145 set_current_state(TASK_RUNNING);
1146
1147 if (sctp_con.eagain_flag) {
1148 sctp_con.eagain_flag = 0;
1149 refill_write_queue();
1150 }
1151 process_output_queue();
1152 } 1125 }
1126 process_output_queue();
1127}
1153 1128
1154 remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); 1129// PJC: Process lock requests from a particular node.
1155 1130// TODO: can we optimise this out on UP ??
1156 return 0; 1131static void process_lock_request(struct work_struct *work)
1132{
1157} 1133}
1158 1134
1159static void daemons_stop(void) 1135static void daemons_stop(void)
1160{ 1136{
1161 kthread_stop(recv_task); 1137 destroy_workqueue(recv_workqueue);
1162 kthread_stop(send_task); 1138 destroy_workqueue(send_workqueue);
1139 destroy_workqueue(lock_workqueue);
1163} 1140}
1164 1141
1165static int daemons_start(void) 1142static int daemons_start(void)
1166{ 1143{
1167 struct task_struct *p;
1168 int error; 1144 int error;
1145 recv_workqueue = create_workqueue("dlm_recv");
1146 error = IS_ERR(recv_workqueue);
1147 if (error) {
1148 log_print("can't start dlm_recv %d", error);
1149 return error;
1150 }
1169 1151
1170 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1152 send_workqueue = create_singlethread_workqueue("dlm_send");
1171 error = IS_ERR(p); 1153 error = IS_ERR(send_workqueue);
1172 if (error) { 1154 if (error) {
1173 log_print("can't start dlm_recvd %d", error); 1155 log_print("can't start dlm_send %d", error);
1156 destroy_workqueue(recv_workqueue);
1174 return error; 1157 return error;
1175 } 1158 }
1176 recv_task = p;
1177 1159
1178 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1160 lock_workqueue = create_workqueue("dlm_rlock");
1179 error = IS_ERR(p); 1161 error = IS_ERR(lock_workqueue);
1180 if (error) { 1162 if (error) {
1181 log_print("can't start dlm_sendd %d", error); 1163 log_print("can't start dlm_rlock %d", error);
1182 kthread_stop(recv_task); 1164 destroy_workqueue(send_workqueue);
1165 destroy_workqueue(recv_workqueue);
1183 return error; 1166 return error;
1184 } 1167 }
1185 send_task = p;
1186 1168
1187 return 0; 1169 return 0;
1188} 1170}
@@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void)
1194{ 1176{
1195 int error; 1177 int error;
1196 1178
1179 INIT_WORK(&sctp_con.work, process_recv_sockets);
1180
1197 error = init_sock(); 1181 error = init_sock();
1198 if (error) 1182 if (error)
1199 goto fail_sock; 1183 goto fail_sock;
@@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void)
1224 for (i = 0; i < dlm_local_count; i++) 1208 for (i = 0; i < dlm_local_count; i++)
1225 kfree(dlm_local_addr[i]); 1209 kfree(dlm_local_addr[i]);
1226} 1210}
1227