diff options
Diffstat (limited to 'fs/dlm/lowcomms-tcp.c')
-rw-r--r-- | fs/dlm/lowcomms-tcp.c | 361 |
1 files changed, 95 insertions, 266 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c index 9be3a440c42a..f1efd17b2614 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms-tcp.c | |||
@@ -2,7 +2,7 @@ | |||
2 | ******************************************************************************* | 2 | ******************************************************************************* |
3 | ** | 3 | ** |
4 | ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. | 4 | ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. |
5 | ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. | 5 | ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. |
6 | ** | 6 | ** |
7 | ** This copyrighted material is made available to anyone wishing to use, | 7 | ** This copyrighted material is made available to anyone wishing to use, |
8 | ** modify, copy, or redistribute it subject to the terms and conditions | 8 | ** modify, copy, or redistribute it subject to the terms and conditions |
@@ -96,10 +96,7 @@ static bool cbuf_empty(struct cbuf *cb) | |||
96 | struct connection { | 96 | struct connection { |
97 | struct socket *sock; /* NULL if not connected */ | 97 | struct socket *sock; /* NULL if not connected */ |
98 | uint32_t nodeid; /* So we know who we are in the list */ | 98 | uint32_t nodeid; /* So we know who we are in the list */ |
99 | struct rw_semaphore sock_sem; /* Stop connect races */ | 99 | struct mutex sock_mutex; |
100 | struct list_head read_list; /* On this list when ready for reading */ | ||
101 | struct list_head write_list; /* On this list when ready for writing */ | ||
102 | struct list_head state_list; /* On this list when ready to connect */ | ||
103 | unsigned long flags; /* bit 1,2 = We are on the read/write lists */ | 100 | unsigned long flags; /* bit 1,2 = We are on the read/write lists */ |
104 | #define CF_READ_PENDING 1 | 101 | #define CF_READ_PENDING 1 |
105 | #define CF_WRITE_PENDING 2 | 102 | #define CF_WRITE_PENDING 2 |
@@ -112,9 +109,10 @@ struct connection { | |||
112 | struct page *rx_page; | 109 | struct page *rx_page; |
113 | struct cbuf cb; | 110 | struct cbuf cb; |
114 | int retries; | 111 | int retries; |
115 | atomic_t waiting_requests; | ||
116 | #define MAX_CONNECT_RETRIES 3 | 112 | #define MAX_CONNECT_RETRIES 3 |
117 | struct connection *othercon; | 113 | struct connection *othercon; |
114 | struct work_struct rwork; /* Receive workqueue */ | ||
115 | struct work_struct swork; /* Send workqueue */ | ||
118 | }; | 116 | }; |
119 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) | 117 | #define sock2con(x) ((struct connection *)(x)->sk_user_data) |
120 | 118 | ||
@@ -131,14 +129,9 @@ struct writequeue_entry { | |||
131 | 129 | ||
132 | static struct sockaddr_storage dlm_local_addr; | 130 | static struct sockaddr_storage dlm_local_addr; |
133 | 131 | ||
134 | /* Manage daemons */ | 132 | /* Work queues */ |
135 | static struct task_struct *recv_task; | 133 | static struct workqueue_struct *recv_workqueue; |
136 | static struct task_struct *send_task; | 134 | static struct workqueue_struct *send_workqueue; |
137 | |||
138 | static wait_queue_t lowcomms_send_waitq_head; | ||
139 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq); | ||
140 | static wait_queue_t lowcomms_recv_waitq_head; | ||
141 | static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq); | ||
142 | 135 | ||
143 | /* An array of pointers to connections, indexed by NODEID */ | 136 | /* An array of pointers to connections, indexed by NODEID */ |
144 | static struct connection **connections; | 137 | static struct connection **connections; |
@@ -146,17 +139,8 @@ static DECLARE_MUTEX(connections_lock); | |||
146 | static struct kmem_cache *con_cache; | 139 | static struct kmem_cache *con_cache; |
147 | static int conn_array_size; | 140 | static int conn_array_size; |
148 | 141 | ||
149 | /* List of sockets that have reads pending */ | 142 | static void process_recv_sockets(struct work_struct *work); |
150 | static LIST_HEAD(read_sockets); | 143 | static void process_send_sockets(struct work_struct *work); |
151 | static DEFINE_SPINLOCK(read_sockets_lock); | ||
152 | |||
153 | /* List of sockets which have writes pending */ | ||
154 | static LIST_HEAD(write_sockets); | ||
155 | static DEFINE_SPINLOCK(write_sockets_lock); | ||
156 | |||
157 | /* List of sockets which have connects pending */ | ||
158 | static LIST_HEAD(state_sockets); | ||
159 | static DEFINE_SPINLOCK(state_sockets_lock); | ||
160 | 144 | ||
161 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) | 145 | static struct connection *nodeid2con(int nodeid, gfp_t allocation) |
162 | { | 146 | { |
@@ -186,9 +170,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) | |||
186 | goto finish; | 170 | goto finish; |
187 | 171 | ||
188 | con->nodeid = nodeid; | 172 | con->nodeid = nodeid; |
189 | init_rwsem(&con->sock_sem); | 173 | mutex_init(&con->sock_mutex); |
190 | INIT_LIST_HEAD(&con->writequeue); | 174 | INIT_LIST_HEAD(&con->writequeue); |
191 | spin_lock_init(&con->writequeue_lock); | 175 | spin_lock_init(&con->writequeue_lock); |
176 | INIT_WORK(&con->swork, process_send_sockets); | ||
177 | INIT_WORK(&con->rwork, process_recv_sockets); | ||
192 | 178 | ||
193 | connections[nodeid] = con; | 179 | connections[nodeid] = con; |
194 | } | 180 | } |
@@ -203,41 +189,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused) | |||
203 | { | 189 | { |
204 | struct connection *con = sock2con(sk); | 190 | struct connection *con = sock2con(sk); |
205 | 191 | ||
206 | atomic_inc(&con->waiting_requests); | 192 | if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) |
207 | if (test_and_set_bit(CF_READ_PENDING, &con->flags)) | 193 | queue_work(recv_workqueue, &con->rwork); |
208 | return; | ||
209 | |||
210 | spin_lock_bh(&read_sockets_lock); | ||
211 | list_add_tail(&con->read_list, &read_sockets); | ||
212 | spin_unlock_bh(&read_sockets_lock); | ||
213 | |||
214 | wake_up_interruptible(&lowcomms_recv_waitq); | ||
215 | } | 194 | } |
216 | 195 | ||
217 | static void lowcomms_write_space(struct sock *sk) | 196 | static void lowcomms_write_space(struct sock *sk) |
218 | { | 197 | { |
219 | struct connection *con = sock2con(sk); | 198 | struct connection *con = sock2con(sk); |
220 | 199 | ||
221 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) | 200 | if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) |
222 | return; | 201 | queue_work(send_workqueue, &con->swork); |
223 | |||
224 | spin_lock_bh(&write_sockets_lock); | ||
225 | list_add_tail(&con->write_list, &write_sockets); | ||
226 | spin_unlock_bh(&write_sockets_lock); | ||
227 | |||
228 | wake_up_interruptible(&lowcomms_send_waitq); | ||
229 | } | 202 | } |
230 | 203 | ||
231 | static inline void lowcomms_connect_sock(struct connection *con) | 204 | static inline void lowcomms_connect_sock(struct connection *con) |
232 | { | 205 | { |
233 | if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) | 206 | if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) |
234 | return; | 207 | queue_work(send_workqueue, &con->swork); |
235 | |||
236 | spin_lock_bh(&state_sockets_lock); | ||
237 | list_add_tail(&con->state_list, &state_sockets); | ||
238 | spin_unlock_bh(&state_sockets_lock); | ||
239 | |||
240 | wake_up_interruptible(&lowcomms_send_waitq); | ||
241 | } | 208 | } |
242 | 209 | ||
243 | static void lowcomms_state_change(struct sock *sk) | 210 | static void lowcomms_state_change(struct sock *sk) |
@@ -279,7 +246,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, | |||
279 | /* Close a remote connection and tidy up */ | 246 | /* Close a remote connection and tidy up */ |
280 | static void close_connection(struct connection *con, bool and_other) | 247 | static void close_connection(struct connection *con, bool and_other) |
281 | { | 248 | { |
282 | down_write(&con->sock_sem); | 249 | mutex_lock(&con->sock_mutex); |
283 | 250 | ||
284 | if (con->sock) { | 251 | if (con->sock) { |
285 | sock_release(con->sock); | 252 | sock_release(con->sock); |
@@ -294,7 +261,7 @@ static void close_connection(struct connection *con, bool and_other) | |||
294 | con->rx_page = NULL; | 261 | con->rx_page = NULL; |
295 | } | 262 | } |
296 | con->retries = 0; | 263 | con->retries = 0; |
297 | up_write(&con->sock_sem); | 264 | mutex_unlock(&con->sock_mutex); |
298 | } | 265 | } |
299 | 266 | ||
300 | /* Data received from remote end */ | 267 | /* Data received from remote end */ |
@@ -308,10 +275,13 @@ static int receive_from_sock(struct connection *con) | |||
308 | int r; | 275 | int r; |
309 | int call_again_soon = 0; | 276 | int call_again_soon = 0; |
310 | 277 | ||
311 | down_read(&con->sock_sem); | 278 | mutex_lock(&con->sock_mutex); |
279 | |||
280 | if (con->sock == NULL) { | ||
281 | ret = -EAGAIN; | ||
282 | goto out_close; | ||
283 | } | ||
312 | 284 | ||
313 | if (con->sock == NULL) | ||
314 | goto out; | ||
315 | if (con->rx_page == NULL) { | 285 | if (con->rx_page == NULL) { |
316 | /* | 286 | /* |
317 | * This doesn't need to be atomic, but I think it should | 287 | * This doesn't need to be atomic, but I think it should |
@@ -359,6 +329,9 @@ static int receive_from_sock(struct connection *con) | |||
359 | 329 | ||
360 | if (ret <= 0) | 330 | if (ret <= 0) |
361 | goto out_close; | 331 | goto out_close; |
332 | if (ret == -EAGAIN) | ||
333 | goto out_resched; | ||
334 | |||
362 | if (ret == len) | 335 | if (ret == len) |
363 | call_again_soon = 1; | 336 | call_again_soon = 1; |
364 | cbuf_add(&con->cb, ret); | 337 | cbuf_add(&con->cb, ret); |
@@ -381,24 +354,26 @@ static int receive_from_sock(struct connection *con) | |||
381 | con->rx_page = NULL; | 354 | con->rx_page = NULL; |
382 | } | 355 | } |
383 | 356 | ||
384 | out: | ||
385 | if (call_again_soon) | 357 | if (call_again_soon) |
386 | goto out_resched; | 358 | goto out_resched; |
387 | up_read(&con->sock_sem); | 359 | mutex_unlock(&con->sock_mutex); |
388 | return 0; | 360 | return 0; |
389 | 361 | ||
390 | out_resched: | 362 | out_resched: |
391 | lowcomms_data_ready(con->sock->sk, 0); | 363 | if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) |
392 | up_read(&con->sock_sem); | 364 | queue_work(recv_workqueue, &con->rwork); |
393 | cond_resched(); | 365 | mutex_unlock(&con->sock_mutex); |
394 | return 0; | 366 | return -EAGAIN; |
395 | 367 | ||
396 | out_close: | 368 | out_close: |
397 | up_read(&con->sock_sem); | 369 | mutex_unlock(&con->sock_mutex); |
398 | if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { | 370 | if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { |
399 | close_connection(con, false); | 371 | close_connection(con, false); |
400 | /* Reconnect when there is something to send */ | 372 | /* Reconnect when there is something to send */ |
401 | } | 373 | } |
374 | /* Don't return success if we really got EOF */ | ||
375 | if (ret == 0) | ||
376 | ret = -EAGAIN; | ||
402 | 377 | ||
403 | return ret; | 378 | return ret; |
404 | } | 379 | } |
@@ -412,6 +387,7 @@ static int accept_from_sock(struct connection *con) | |||
412 | int len; | 387 | int len; |
413 | int nodeid; | 388 | int nodeid; |
414 | struct connection *newcon; | 389 | struct connection *newcon; |
390 | struct connection *addcon; | ||
415 | 391 | ||
416 | memset(&peeraddr, 0, sizeof(peeraddr)); | 392 | memset(&peeraddr, 0, sizeof(peeraddr)); |
417 | result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, | 393 | result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, |
@@ -419,7 +395,7 @@ static int accept_from_sock(struct connection *con) | |||
419 | if (result < 0) | 395 | if (result < 0) |
420 | return -ENOMEM; | 396 | return -ENOMEM; |
421 | 397 | ||
422 | down_read(&con->sock_sem); | 398 | mutex_lock_nested(&con->sock_mutex, 0); |
423 | 399 | ||
424 | result = -ENOTCONN; | 400 | result = -ENOTCONN; |
425 | if (con->sock == NULL) | 401 | if (con->sock == NULL) |
@@ -445,7 +421,7 @@ static int accept_from_sock(struct connection *con) | |||
445 | if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { | 421 | if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { |
446 | printk("dlm: connect from non cluster node\n"); | 422 | printk("dlm: connect from non cluster node\n"); |
447 | sock_release(newsock); | 423 | sock_release(newsock); |
448 | up_read(&con->sock_sem); | 424 | mutex_unlock(&con->sock_mutex); |
449 | return -1; | 425 | return -1; |
450 | } | 426 | } |
451 | 427 | ||
@@ -462,7 +438,7 @@ static int accept_from_sock(struct connection *con) | |||
462 | result = -ENOMEM; | 438 | result = -ENOMEM; |
463 | goto accept_err; | 439 | goto accept_err; |
464 | } | 440 | } |
465 | down_write(&newcon->sock_sem); | 441 | mutex_lock_nested(&newcon->sock_mutex, 1); |
466 | if (newcon->sock) { | 442 | if (newcon->sock) { |
467 | struct connection *othercon = newcon->othercon; | 443 | struct connection *othercon = newcon->othercon; |
468 | 444 | ||
@@ -470,41 +446,45 @@ static int accept_from_sock(struct connection *con) | |||
470 | othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); | 446 | othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); |
471 | if (!othercon) { | 447 | if (!othercon) { |
472 | printk("dlm: failed to allocate incoming socket\n"); | 448 | printk("dlm: failed to allocate incoming socket\n"); |
473 | up_write(&newcon->sock_sem); | 449 | mutex_unlock(&newcon->sock_mutex); |
474 | result = -ENOMEM; | 450 | result = -ENOMEM; |
475 | goto accept_err; | 451 | goto accept_err; |
476 | } | 452 | } |
477 | othercon->nodeid = nodeid; | 453 | othercon->nodeid = nodeid; |
478 | othercon->rx_action = receive_from_sock; | 454 | othercon->rx_action = receive_from_sock; |
479 | init_rwsem(&othercon->sock_sem); | 455 | mutex_init(&othercon->sock_mutex); |
456 | INIT_WORK(&othercon->swork, process_send_sockets); | ||
457 | INIT_WORK(&othercon->rwork, process_recv_sockets); | ||
480 | set_bit(CF_IS_OTHERCON, &othercon->flags); | 458 | set_bit(CF_IS_OTHERCON, &othercon->flags); |
481 | newcon->othercon = othercon; | 459 | newcon->othercon = othercon; |
482 | } | 460 | } |
483 | othercon->sock = newsock; | 461 | othercon->sock = newsock; |
484 | newsock->sk->sk_user_data = othercon; | 462 | newsock->sk->sk_user_data = othercon; |
485 | add_sock(newsock, othercon); | 463 | add_sock(newsock, othercon); |
464 | addcon = othercon; | ||
486 | } | 465 | } |
487 | else { | 466 | else { |
488 | newsock->sk->sk_user_data = newcon; | 467 | newsock->sk->sk_user_data = newcon; |
489 | newcon->rx_action = receive_from_sock; | 468 | newcon->rx_action = receive_from_sock; |
490 | add_sock(newsock, newcon); | 469 | add_sock(newsock, newcon); |
491 | 470 | addcon = newcon; | |
492 | } | 471 | } |
493 | 472 | ||
494 | up_write(&newcon->sock_sem); | 473 | mutex_unlock(&newcon->sock_mutex); |
495 | 474 | ||
496 | /* | 475 | /* |
497 | * Add it to the active queue in case we got data | 476 | * Add it to the active queue in case we got data |
498 | * beween processing the accept adding the socket | 477 | * beween processing the accept adding the socket |
499 | * to the read_sockets list | 478 | * to the read_sockets list |
500 | */ | 479 | */ |
501 | lowcomms_data_ready(newsock->sk, 0); | 480 | if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) |
502 | up_read(&con->sock_sem); | 481 | queue_work(recv_workqueue, &addcon->rwork); |
482 | mutex_unlock(&con->sock_mutex); | ||
503 | 483 | ||
504 | return 0; | 484 | return 0; |
505 | 485 | ||
506 | accept_err: | 486 | accept_err: |
507 | up_read(&con->sock_sem); | 487 | mutex_unlock(&con->sock_mutex); |
508 | sock_release(newsock); | 488 | sock_release(newsock); |
509 | 489 | ||
510 | if (result != -EAGAIN) | 490 | if (result != -EAGAIN) |
@@ -525,7 +505,7 @@ static void connect_to_sock(struct connection *con) | |||
525 | return; | 505 | return; |
526 | } | 506 | } |
527 | 507 | ||
528 | down_write(&con->sock_sem); | 508 | mutex_lock(&con->sock_mutex); |
529 | if (con->retries++ > MAX_CONNECT_RETRIES) | 509 | if (con->retries++ > MAX_CONNECT_RETRIES) |
530 | goto out; | 510 | goto out; |
531 | 511 | ||
@@ -548,7 +528,7 @@ static void connect_to_sock(struct connection *con) | |||
548 | sock->sk->sk_user_data = con; | 528 | sock->sk->sk_user_data = con; |
549 | con->rx_action = receive_from_sock; | 529 | con->rx_action = receive_from_sock; |
550 | 530 | ||
551 | make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len); | 531 | make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); |
552 | 532 | ||
553 | add_sock(sock, con); | 533 | add_sock(sock, con); |
554 | 534 | ||
@@ -577,7 +557,7 @@ out_err: | |||
577 | result = 0; | 557 | result = 0; |
578 | } | 558 | } |
579 | out: | 559 | out: |
580 | up_write(&con->sock_sem); | 560 | mutex_unlock(&con->sock_mutex); |
581 | return; | 561 | return; |
582 | } | 562 | } |
583 | 563 | ||
@@ -616,10 +596,10 @@ static struct socket *create_listen_sock(struct connection *con, | |||
616 | con->sock = sock; | 596 | con->sock = sock; |
617 | 597 | ||
618 | /* Bind to our port */ | 598 | /* Bind to our port */ |
619 | make_sockaddr(saddr, dlm_config.tcp_port, &addr_len); | 599 | make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); |
620 | result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); | 600 | result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); |
621 | if (result < 0) { | 601 | if (result < 0) { |
622 | printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port); | 602 | printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); |
623 | sock_release(sock); | 603 | sock_release(sock); |
624 | sock = NULL; | 604 | sock = NULL; |
625 | con->sock = NULL; | 605 | con->sock = NULL; |
@@ -638,7 +618,7 @@ static struct socket *create_listen_sock(struct connection *con, | |||
638 | 618 | ||
639 | result = sock->ops->listen(sock, 5); | 619 | result = sock->ops->listen(sock, 5); |
640 | if (result < 0) { | 620 | if (result < 0) { |
641 | printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port); | 621 | printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port); |
642 | sock_release(sock); | 622 | sock_release(sock); |
643 | sock = NULL; | 623 | sock = NULL; |
644 | goto create_out; | 624 | goto create_out; |
@@ -709,6 +689,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, | |||
709 | if (!con) | 689 | if (!con) |
710 | return NULL; | 690 | return NULL; |
711 | 691 | ||
692 | spin_lock(&con->writequeue_lock); | ||
712 | e = list_entry(con->writequeue.prev, struct writequeue_entry, list); | 693 | e = list_entry(con->writequeue.prev, struct writequeue_entry, list); |
713 | if ((&e->list == &con->writequeue) || | 694 | if ((&e->list == &con->writequeue) || |
714 | (PAGE_CACHE_SIZE - e->end < len)) { | 695 | (PAGE_CACHE_SIZE - e->end < len)) { |
@@ -747,6 +728,7 @@ void dlm_lowcomms_commit_buffer(void *mh) | |||
747 | struct connection *con = e->con; | 728 | struct connection *con = e->con; |
748 | int users; | 729 | int users; |
749 | 730 | ||
731 | spin_lock(&con->writequeue_lock); | ||
750 | users = --e->users; | 732 | users = --e->users; |
751 | if (users) | 733 | if (users) |
752 | goto out; | 734 | goto out; |
@@ -754,12 +736,8 @@ void dlm_lowcomms_commit_buffer(void *mh) | |||
754 | kunmap(e->page); | 736 | kunmap(e->page); |
755 | spin_unlock(&con->writequeue_lock); | 737 | spin_unlock(&con->writequeue_lock); |
756 | 738 | ||
757 | if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { | 739 | if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { |
758 | spin_lock_bh(&write_sockets_lock); | 740 | queue_work(send_workqueue, &con->swork); |
759 | list_add_tail(&con->write_list, &write_sockets); | ||
760 | spin_unlock_bh(&write_sockets_lock); | ||
761 | |||
762 | wake_up_interruptible(&lowcomms_send_waitq); | ||
763 | } | 741 | } |
764 | return; | 742 | return; |
765 | 743 | ||
@@ -783,7 +761,7 @@ static void send_to_sock(struct connection *con) | |||
783 | struct writequeue_entry *e; | 761 | struct writequeue_entry *e; |
784 | int len, offset; | 762 | int len, offset; |
785 | 763 | ||
786 | down_read(&con->sock_sem); | 764 | mutex_lock(&con->sock_mutex); |
787 | if (con->sock == NULL) | 765 | if (con->sock == NULL) |
788 | goto out_connect; | 766 | goto out_connect; |
789 | 767 | ||
@@ -800,6 +778,7 @@ static void send_to_sock(struct connection *con) | |||
800 | offset = e->offset; | 778 | offset = e->offset; |
801 | BUG_ON(len == 0 && e->users == 0); | 779 | BUG_ON(len == 0 && e->users == 0); |
802 | spin_unlock(&con->writequeue_lock); | 780 | spin_unlock(&con->writequeue_lock); |
781 | kmap(e->page); | ||
803 | 782 | ||
804 | ret = 0; | 783 | ret = 0; |
805 | if (len) { | 784 | if (len) { |
@@ -828,18 +807,18 @@ static void send_to_sock(struct connection *con) | |||
828 | } | 807 | } |
829 | spin_unlock(&con->writequeue_lock); | 808 | spin_unlock(&con->writequeue_lock); |
830 | out: | 809 | out: |
831 | up_read(&con->sock_sem); | 810 | mutex_unlock(&con->sock_mutex); |
832 | return; | 811 | return; |
833 | 812 | ||
834 | send_error: | 813 | send_error: |
835 | up_read(&con->sock_sem); | 814 | mutex_unlock(&con->sock_mutex); |
836 | close_connection(con, false); | 815 | close_connection(con, false); |
837 | lowcomms_connect_sock(con); | 816 | lowcomms_connect_sock(con); |
838 | return; | 817 | return; |
839 | 818 | ||
840 | out_connect: | 819 | out_connect: |
841 | up_read(&con->sock_sem); | 820 | mutex_unlock(&con->sock_mutex); |
842 | lowcomms_connect_sock(con); | 821 | connect_to_sock(con); |
843 | return; | 822 | return; |
844 | } | 823 | } |
845 | 824 | ||
@@ -872,7 +851,6 @@ int dlm_lowcomms_close(int nodeid) | |||
872 | if (con) { | 851 | if (con) { |
873 | clean_one_writequeue(con); | 852 | clean_one_writequeue(con); |
874 | close_connection(con, true); | 853 | close_connection(con, true); |
875 | atomic_set(&con->waiting_requests, 0); | ||
876 | } | 854 | } |
877 | return 0; | 855 | return 0; |
878 | 856 | ||
@@ -880,102 +858,29 @@ out: | |||
880 | return -1; | 858 | return -1; |
881 | } | 859 | } |
882 | 860 | ||
883 | /* API send message call, may queue the request */ | ||
884 | /* N.B. This is the old interface - use the new one for new calls */ | ||
885 | int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation) | ||
886 | { | ||
887 | struct writequeue_entry *e; | ||
888 | char *b; | ||
889 | |||
890 | e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b); | ||
891 | if (e) { | ||
892 | memcpy(b, buf, len); | ||
893 | dlm_lowcomms_commit_buffer(e); | ||
894 | return 0; | ||
895 | } | ||
896 | return -ENOBUFS; | ||
897 | } | ||
898 | |||
899 | /* Look for activity on active sockets */ | 861 | /* Look for activity on active sockets */ |
900 | static void process_sockets(void) | 862 | static void process_recv_sockets(struct work_struct *work) |
901 | { | 863 | { |
902 | struct list_head *list; | 864 | struct connection *con = container_of(work, struct connection, rwork); |
903 | struct list_head *temp; | 865 | int err; |
904 | int count = 0; | ||
905 | |||
906 | spin_lock_bh(&read_sockets_lock); | ||
907 | list_for_each_safe(list, temp, &read_sockets) { | ||
908 | 866 | ||
909 | struct connection *con = | 867 | clear_bit(CF_READ_PENDING, &con->flags); |
910 | list_entry(list, struct connection, read_list); | 868 | do { |
911 | list_del(&con->read_list); | 869 | err = con->rx_action(con); |
912 | clear_bit(CF_READ_PENDING, &con->flags); | 870 | } while (!err); |
913 | |||
914 | spin_unlock_bh(&read_sockets_lock); | ||
915 | |||
916 | /* This can reach zero if we are processing requests | ||
917 | * as they come in. | ||
918 | */ | ||
919 | if (atomic_read(&con->waiting_requests) == 0) { | ||
920 | spin_lock_bh(&read_sockets_lock); | ||
921 | continue; | ||
922 | } | ||
923 | |||
924 | do { | ||
925 | con->rx_action(con); | ||
926 | |||
927 | /* Don't starve out everyone else */ | ||
928 | if (++count >= MAX_RX_MSG_COUNT) { | ||
929 | cond_resched(); | ||
930 | count = 0; | ||
931 | } | ||
932 | |||
933 | } while (!atomic_dec_and_test(&con->waiting_requests) && | ||
934 | !kthread_should_stop()); | ||
935 | |||
936 | spin_lock_bh(&read_sockets_lock); | ||
937 | } | ||
938 | spin_unlock_bh(&read_sockets_lock); | ||
939 | } | 871 | } |
940 | 872 | ||
941 | /* Try to send any messages that are pending | ||
942 | */ | ||
943 | static void process_output_queue(void) | ||
944 | { | ||
945 | struct list_head *list; | ||
946 | struct list_head *temp; | ||
947 | |||
948 | spin_lock_bh(&write_sockets_lock); | ||
949 | list_for_each_safe(list, temp, &write_sockets) { | ||
950 | struct connection *con = | ||
951 | list_entry(list, struct connection, write_list); | ||
952 | clear_bit(CF_WRITE_PENDING, &con->flags); | ||
953 | list_del(&con->write_list); | ||
954 | |||
955 | spin_unlock_bh(&write_sockets_lock); | ||
956 | send_to_sock(con); | ||
957 | spin_lock_bh(&write_sockets_lock); | ||
958 | } | ||
959 | spin_unlock_bh(&write_sockets_lock); | ||
960 | } | ||
961 | 873 | ||
962 | static void process_state_queue(void) | 874 | static void process_send_sockets(struct work_struct *work) |
963 | { | 875 | { |
964 | struct list_head *list; | 876 | struct connection *con = container_of(work, struct connection, swork); |
965 | struct list_head *temp; | ||
966 | |||
967 | spin_lock_bh(&state_sockets_lock); | ||
968 | list_for_each_safe(list, temp, &state_sockets) { | ||
969 | struct connection *con = | ||
970 | list_entry(list, struct connection, state_list); | ||
971 | list_del(&con->state_list); | ||
972 | clear_bit(CF_CONNECT_PENDING, &con->flags); | ||
973 | spin_unlock_bh(&state_sockets_lock); | ||
974 | 877 | ||
878 | if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { | ||
975 | connect_to_sock(con); | 879 | connect_to_sock(con); |
976 | spin_lock_bh(&state_sockets_lock); | ||
977 | } | 880 | } |
978 | spin_unlock_bh(&state_sockets_lock); | 881 | |
882 | clear_bit(CF_WRITE_PENDING, &con->flags); | ||
883 | send_to_sock(con); | ||
979 | } | 884 | } |
980 | 885 | ||
981 | 886 | ||
@@ -992,109 +897,33 @@ static void clean_writequeues(void) | |||
992 | } | 897 | } |
993 | } | 898 | } |
994 | 899 | ||
995 | static int read_list_empty(void) | 900 | static void work_stop(void) |
996 | { | 901 | { |
997 | int status; | 902 | destroy_workqueue(recv_workqueue); |
998 | 903 | destroy_workqueue(send_workqueue); | |
999 | spin_lock_bh(&read_sockets_lock); | ||
1000 | status = list_empty(&read_sockets); | ||
1001 | spin_unlock_bh(&read_sockets_lock); | ||
1002 | |||
1003 | return status; | ||
1004 | } | ||
1005 | |||
1006 | /* DLM Transport comms receive daemon */ | ||
1007 | static int dlm_recvd(void *data) | ||
1008 | { | ||
1009 | init_waitqueue_entry(&lowcomms_recv_waitq_head, current); | ||
1010 | add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); | ||
1011 | |||
1012 | while (!kthread_should_stop()) { | ||
1013 | set_current_state(TASK_INTERRUPTIBLE); | ||
1014 | if (read_list_empty()) | ||
1015 | cond_resched(); | ||
1016 | set_current_state(TASK_RUNNING); | ||
1017 | |||
1018 | process_sockets(); | ||
1019 | } | ||
1020 | |||
1021 | return 0; | ||
1022 | } | 904 | } |
1023 | 905 | ||
1024 | static int write_and_state_lists_empty(void) | 906 | static int work_start(void) |
1025 | { | 907 | { |
1026 | int status; | ||
1027 | |||
1028 | spin_lock_bh(&write_sockets_lock); | ||
1029 | status = list_empty(&write_sockets); | ||
1030 | spin_unlock_bh(&write_sockets_lock); | ||
1031 | |||
1032 | spin_lock_bh(&state_sockets_lock); | ||
1033 | if (list_empty(&state_sockets) == 0) | ||
1034 | status = 0; | ||
1035 | spin_unlock_bh(&state_sockets_lock); | ||
1036 | |||
1037 | return status; | ||
1038 | } | ||
1039 | |||
1040 | /* DLM Transport send daemon */ | ||
1041 | static int dlm_sendd(void *data) | ||
1042 | { | ||
1043 | init_waitqueue_entry(&lowcomms_send_waitq_head, current); | ||
1044 | add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); | ||
1045 | |||
1046 | while (!kthread_should_stop()) { | ||
1047 | set_current_state(TASK_INTERRUPTIBLE); | ||
1048 | if (write_and_state_lists_empty()) | ||
1049 | cond_resched(); | ||
1050 | set_current_state(TASK_RUNNING); | ||
1051 | |||
1052 | process_state_queue(); | ||
1053 | process_output_queue(); | ||
1054 | } | ||
1055 | |||
1056 | return 0; | ||
1057 | } | ||
1058 | |||
1059 | static void daemons_stop(void) | ||
1060 | { | ||
1061 | kthread_stop(recv_task); | ||
1062 | kthread_stop(send_task); | ||
1063 | } | ||
1064 | |||
1065 | static int daemons_start(void) | ||
1066 | { | ||
1067 | struct task_struct *p; | ||
1068 | int error; | 908 | int error; |
1069 | 909 | recv_workqueue = create_workqueue("dlm_recv"); | |
1070 | p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); | 910 | error = IS_ERR(recv_workqueue); |
1071 | error = IS_ERR(p); | ||
1072 | if (error) { | 911 | if (error) { |
1073 | log_print("can't start dlm_recvd %d", error); | 912 | log_print("can't start dlm_recv %d", error); |
1074 | return error; | 913 | return error; |
1075 | } | 914 | } |
1076 | recv_task = p; | ||
1077 | 915 | ||
1078 | p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); | 916 | send_workqueue = create_singlethread_workqueue("dlm_send"); |
1079 | error = IS_ERR(p); | 917 | error = IS_ERR(send_workqueue); |
1080 | if (error) { | 918 | if (error) { |
1081 | log_print("can't start dlm_sendd %d", error); | 919 | log_print("can't start dlm_send %d", error); |
1082 | kthread_stop(recv_task); | 920 | destroy_workqueue(recv_workqueue); |
1083 | return error; | 921 | return error; |
1084 | } | 922 | } |
1085 | send_task = p; | ||
1086 | 923 | ||
1087 | return 0; | 924 | return 0; |
1088 | } | 925 | } |
1089 | 926 | ||
1090 | /* | ||
1091 | * Return the largest buffer size we can cope with. | ||
1092 | */ | ||
1093 | int lowcomms_max_buffer_size(void) | ||
1094 | { | ||
1095 | return PAGE_CACHE_SIZE; | ||
1096 | } | ||
1097 | |||
1098 | void dlm_lowcomms_stop(void) | 927 | void dlm_lowcomms_stop(void) |
1099 | { | 928 | { |
1100 | int i; | 929 | int i; |
@@ -1107,7 +936,7 @@ void dlm_lowcomms_stop(void) | |||
1107 | connections[i]->flags |= 0xFF; | 936 | connections[i]->flags |= 0xFF; |
1108 | } | 937 | } |
1109 | 938 | ||
1110 | daemons_stop(); | 939 | work_stop(); |
1111 | clean_writequeues(); | 940 | clean_writequeues(); |
1112 | 941 | ||
1113 | for (i = 0; i < conn_array_size; i++) { | 942 | for (i = 0; i < conn_array_size; i++) { |
@@ -1159,7 +988,7 @@ int dlm_lowcomms_start(void) | |||
1159 | if (error) | 988 | if (error) |
1160 | goto fail_unlisten; | 989 | goto fail_unlisten; |
1161 | 990 | ||
1162 | error = daemons_start(); | 991 | error = work_start(); |
1163 | if (error) | 992 | if (error) |
1164 | goto fail_unlisten; | 993 | goto fail_unlisten; |
1165 | 994 | ||