aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
authorMarcelo Ricardo Leitner <marcelo.leitner@gmail.com>2015-08-11 18:22:23 -0400
committerDavid Teigland <teigland@redhat.com>2015-08-17 17:22:20 -0400
commitee44b4bc054afc586c92558a225055ef9fd25d17 (patch)
tree54015a32f73ec8cd75eddf28b32c42e34d2242a3 /fs/dlm
parent356344c4c36dc960f90a3457dd67fe2efcf92417 (diff)
dlm: use sctp 1-to-1 API
DLM is using 1-to-many API but in a 1-to-1 fashion. That is, it's not needed but this causes it to use sctp_do_peeloff() to mimic an kernel_accept() and this causes a symbol dependency on sctp module. By switching it to 1-to-1 API we can avoid this dependency and also reduce quite a lot of SCTP-specific code in lowcomms.c. The caveat is that now DLM won't always use the same src port. It will choose a random one, just like TCP code. This allows the peers to attempt simultaneous connections, which now are handled just like for TCP. Even more sharing between TCP and SCTP code on DLM is possible, but it is intentionally left for a later commit. Note that for using nodes with this commit, you have to have at least the early fixes on this patchset otherwise it will trigger some issues on old nodes. Signed-off-by: Marcelo Ricardo Leitner <marcelo.leitner@gmail.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lowcomms.c671
1 files changed, 237 insertions, 434 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 54a0031067de..856d750be96b 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -120,12 +120,10 @@ struct connection {
120 struct cbuf cb; 120 struct cbuf cb;
121 int retries; 121 int retries;
122#define MAX_CONNECT_RETRIES 3 122#define MAX_CONNECT_RETRIES 3
123 int sctp_assoc;
124 struct hlist_node list; 123 struct hlist_node list;
125 struct connection *othercon; 124 struct connection *othercon;
126 struct work_struct rwork; /* Receive workqueue */ 125 struct work_struct rwork; /* Receive workqueue */
127 struct work_struct swork; /* Send workqueue */ 126 struct work_struct swork; /* Send workqueue */
128 bool try_new_addr;
129}; 127};
130#define sock2con(x) ((struct connection *)(x)->sk_user_data) 128#define sock2con(x) ((struct connection *)(x)->sk_user_data)
131 129
@@ -252,26 +250,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
252 return con; 250 return con;
253} 251}
254 252
255/* This is a bit drastic, but only called when things go wrong */
256static struct connection *assoc2con(int assoc_id)
257{
258 int i;
259 struct connection *con;
260
261 mutex_lock(&connections_lock);
262
263 for (i = 0 ; i < CONN_HASH_SIZE; i++) {
264 hlist_for_each_entry(con, &connection_hash[i], list) {
265 if (con->sctp_assoc == assoc_id) {
266 mutex_unlock(&connections_lock);
267 return con;
268 }
269 }
270 }
271 mutex_unlock(&connections_lock);
272 return NULL;
273}
274
275static struct dlm_node_addr *find_node_addr(int nodeid) 253static struct dlm_node_addr *find_node_addr(int nodeid)
276{ 254{
277 struct dlm_node_addr *na; 255 struct dlm_node_addr *na;
@@ -322,14 +300,14 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
322 spin_lock(&dlm_node_addrs_spin); 300 spin_lock(&dlm_node_addrs_spin);
323 na = find_node_addr(nodeid); 301 na = find_node_addr(nodeid);
324 if (na && na->addr_count) { 302 if (na && na->addr_count) {
303 memcpy(&sas, na->addr[na->curr_addr_index],
304 sizeof(struct sockaddr_storage));
305
325 if (try_new_addr) { 306 if (try_new_addr) {
326 na->curr_addr_index++; 307 na->curr_addr_index++;
327 if (na->curr_addr_index == na->addr_count) 308 if (na->curr_addr_index == na->addr_count)
328 na->curr_addr_index = 0; 309 na->curr_addr_index = 0;
329 } 310 }
330
331 memcpy(&sas, na->addr[na->curr_addr_index ],
332 sizeof(struct sockaddr_storage));
333 } 311 }
334 spin_unlock(&dlm_node_addrs_spin); 312 spin_unlock(&dlm_node_addrs_spin);
335 313
@@ -459,18 +437,23 @@ static inline void lowcomms_connect_sock(struct connection *con)
459 437
460static void lowcomms_state_change(struct sock *sk) 438static void lowcomms_state_change(struct sock *sk)
461{ 439{
462 if (sk->sk_state == TCP_ESTABLISHED) 440 /* SCTP layer is not calling sk_data_ready when the connection
441 * is done, so we catch the signal through here. Also, it
442 * doesn't switch socket state when entering shutdown, so we
443 * skip the write in that case.
444 */
445 if (sk->sk_shutdown) {
446 if (sk->sk_shutdown == RCV_SHUTDOWN)
447 lowcomms_data_ready(sk);
448 } else if (sk->sk_state == TCP_ESTABLISHED) {
463 lowcomms_write_space(sk); 449 lowcomms_write_space(sk);
450 }
464} 451}
465 452
466int dlm_lowcomms_connect_node(int nodeid) 453int dlm_lowcomms_connect_node(int nodeid)
467{ 454{
468 struct connection *con; 455 struct connection *con;
469 456
470 /* with sctp there's no connecting without sending */
471 if (dlm_config.ci_protocol != 0)
472 return 0;
473
474 if (nodeid == dlm_our_nodeid()) 457 if (nodeid == dlm_our_nodeid())
475 return 0; 458 return 0;
476 459
@@ -542,264 +525,6 @@ static void close_connection(struct connection *con, bool and_other,
542 mutex_unlock(&con->sock_mutex); 525 mutex_unlock(&con->sock_mutex);
543} 526}
544 527
545/* We only send shutdown messages to nodes that are not part of the cluster
546 * or if we get multiple connections from a node.
547 */
548static void sctp_send_shutdown(sctp_assoc_t associd)
549{
550 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
551 struct msghdr outmessage;
552 struct cmsghdr *cmsg;
553 struct sctp_sndrcvinfo *sinfo;
554 int ret;
555 struct connection *con;
556
557 con = nodeid2con(0,0);
558 BUG_ON(con == NULL);
559
560 outmessage.msg_name = NULL;
561 outmessage.msg_namelen = 0;
562 outmessage.msg_control = outcmsg;
563 outmessage.msg_controllen = sizeof(outcmsg);
564 outmessage.msg_flags = MSG_EOR;
565
566 cmsg = CMSG_FIRSTHDR(&outmessage);
567 cmsg->cmsg_level = IPPROTO_SCTP;
568 cmsg->cmsg_type = SCTP_SNDRCV;
569 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
570 outmessage.msg_controllen = cmsg->cmsg_len;
571 sinfo = CMSG_DATA(cmsg);
572 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
573
574 sinfo->sinfo_flags |= MSG_EOF;
575 sinfo->sinfo_assoc_id = associd;
576
577 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
578
579 if (ret != 0)
580 log_print("send EOF to node failed: %d", ret);
581}
582
583static void sctp_init_failed_foreach(struct connection *con)
584{
585
586 /*
587 * Don't try to recover base con and handle race where the
588 * other node's assoc init creates a assoc and we get that
589 * notification, then we get a notification that our attempt
590 * failed due. This happens when we are still trying the primary
591 * address, but the other node has already tried secondary addrs
592 * and found one that worked.
593 */
594 if (!con->nodeid || con->sctp_assoc)
595 return;
596
597 log_print("Retrying SCTP association init for node %d\n", con->nodeid);
598
599 con->try_new_addr = true;
600 con->sctp_assoc = 0;
601 if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
602 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
603 queue_work(send_workqueue, &con->swork);
604 }
605}
606
607/* INIT failed but we don't know which node...
608 restart INIT on all pending nodes */
609static void sctp_init_failed(void)
610{
611 mutex_lock(&connections_lock);
612
613 foreach_conn(sctp_init_failed_foreach);
614
615 mutex_unlock(&connections_lock);
616}
617
618static void retry_failed_sctp_send(struct connection *recv_con,
619 struct sctp_send_failed *sn_send_failed,
620 char *buf)
621{
622 int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
623 struct dlm_mhandle *mh;
624 struct connection *con;
625 char *retry_buf;
626 int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
627
628 log_print("Retry sending %d bytes to node id %d", len, nodeid);
629
630 if (!nodeid) {
631 log_print("Shouldn't resend data via listening connection.");
632 return;
633 }
634
635 con = nodeid2con(nodeid, 0);
636 if (!con) {
637 log_print("Could not look up con for nodeid %d\n",
638 nodeid);
639 return;
640 }
641
642 mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
643 if (!mh) {
644 log_print("Could not allocate buf for retry.");
645 return;
646 }
647 memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
648 dlm_lowcomms_commit_buffer(mh);
649
650 /*
651 * If we got a assoc changed event before the send failed event then
652 * we only need to retry the send.
653 */
654 if (con->sctp_assoc) {
655 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
656 queue_work(send_workqueue, &con->swork);
657 } else
658 sctp_init_failed_foreach(con);
659}
660
661/* Something happened to an association */
662static void process_sctp_notification(struct connection *con,
663 struct msghdr *msg, char *buf)
664{
665 union sctp_notification *sn = (union sctp_notification *)buf;
666 struct linger linger;
667
668 switch (sn->sn_header.sn_type) {
669 case SCTP_SEND_FAILED:
670 retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
671 break;
672 case SCTP_ASSOC_CHANGE:
673 switch (sn->sn_assoc_change.sac_state) {
674 case SCTP_COMM_UP:
675 case SCTP_RESTART:
676 {
677 /* Check that the new node is in the lockspace */
678 struct sctp_prim prim;
679 int nodeid;
680 int prim_len, ret;
681 int addr_len;
682 struct connection *new_con;
683
684 /*
685 * We get this before any data for an association.
686 * We verify that the node is in the cluster and
687 * then peel off a socket for it.
688 */
689 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
690 log_print("COMM_UP for invalid assoc ID %d",
691 (int)sn->sn_assoc_change.sac_assoc_id);
692 sctp_init_failed();
693 return;
694 }
695 memset(&prim, 0, sizeof(struct sctp_prim));
696 prim_len = sizeof(struct sctp_prim);
697 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
698
699 ret = kernel_getsockopt(con->sock,
700 IPPROTO_SCTP,
701 SCTP_PRIMARY_ADDR,
702 (char*)&prim,
703 &prim_len);
704 if (ret < 0) {
705 log_print("getsockopt/sctp_primary_addr on "
706 "new assoc %d failed : %d",
707 (int)sn->sn_assoc_change.sac_assoc_id,
708 ret);
709
710 /* Retry INIT later */
711 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
712 if (new_con)
713 clear_bit(CF_CONNECT_PENDING, &con->flags);
714 return;
715 }
716 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
717 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
718 unsigned char *b=(unsigned char *)&prim.ssp_addr;
719 log_print("reject connect from unknown addr");
720 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
721 b, sizeof(struct sockaddr_storage));
722 sctp_send_shutdown(prim.ssp_assoc_id);
723 return;
724 }
725
726 new_con = nodeid2con(nodeid, GFP_NOFS);
727 if (!new_con)
728 return;
729
730 if (new_con->sock) {
731 log_print("reject connect from node %d: "
732 "already has a connection.",
733 nodeid);
734 sctp_send_shutdown(prim.ssp_assoc_id);
735 return;
736 }
737
738 /* Peel off a new sock */
739 lock_sock(con->sock->sk);
740 ret = sctp_do_peeloff(con->sock->sk,
741 sn->sn_assoc_change.sac_assoc_id,
742 &new_con->sock);
743 release_sock(con->sock->sk);
744 if (ret < 0) {
745 log_print("Can't peel off a socket for "
746 "connection %d to node %d: err=%d",
747 (int)sn->sn_assoc_change.sac_assoc_id,
748 nodeid, ret);
749 return;
750 }
751 add_sock(new_con->sock, new_con);
752
753 linger.l_onoff = 1;
754 linger.l_linger = 0;
755 ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER,
756 (char *)&linger, sizeof(linger));
757 if (ret < 0)
758 log_print("set socket option SO_LINGER failed");
759
760 log_print("connecting to %d sctp association %d",
761 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
762
763 new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
764 new_con->try_new_addr = false;
765 /* Send any pending writes */
766 clear_bit(CF_CONNECT_PENDING, &new_con->flags);
767 clear_bit(CF_INIT_PENDING, &new_con->flags);
768 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
769 queue_work(send_workqueue, &new_con->swork);
770 }
771 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
772 queue_work(recv_workqueue, &new_con->rwork);
773 }
774 break;
775
776 case SCTP_COMM_LOST:
777 case SCTP_SHUTDOWN_COMP:
778 {
779 con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
780 if (con) {
781 con->sctp_assoc = 0;
782 }
783 }
784 break;
785
786 case SCTP_CANT_STR_ASSOC:
787 {
788 /* Will retry init when we get the send failed notification */
789 log_print("Can't start SCTP association - retrying");
790 }
791 break;
792
793 default:
794 log_print("unexpected SCTP assoc change id=%d state=%d",
795 (int)sn->sn_assoc_change.sac_assoc_id,
796 sn->sn_assoc_change.sac_state);
797 }
798 default:
799 ; /* fall through */
800 }
801}
802
803/* Data received from remote end */ 528/* Data received from remote end */
804static int receive_from_sock(struct connection *con) 529static int receive_from_sock(struct connection *con)
805{ 530{
@@ -810,7 +535,6 @@ static int receive_from_sock(struct connection *con)
810 int r; 535 int r;
811 int call_again_soon = 0; 536 int call_again_soon = 0;
812 int nvec; 537 int nvec;
813 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
814 538
815 mutex_lock(&con->sock_mutex); 539 mutex_lock(&con->sock_mutex);
816 540
@@ -830,11 +554,6 @@ static int receive_from_sock(struct connection *con)
830 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 554 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
831 } 555 }
832 556
833 /* Only SCTP needs these really */
834 memset(&incmsg, 0, sizeof(incmsg));
835 msg.msg_control = incmsg;
836 msg.msg_controllen = sizeof(incmsg);
837
838 /* 557 /*
839 * iov[0] is the bit of the circular buffer between the current end 558 * iov[0] is the bit of the circular buffer between the current end
840 * point (cb.base + cb.len) and the end of the buffer. 559 * point (cb.base + cb.len) and the end of the buffer.
@@ -860,31 +579,20 @@ static int receive_from_sock(struct connection *con)
860 MSG_DONTWAIT | MSG_NOSIGNAL); 579 MSG_DONTWAIT | MSG_NOSIGNAL);
861 if (ret <= 0) 580 if (ret <= 0)
862 goto out_close; 581 goto out_close;
582 else if (ret == len)
583 call_again_soon = 1;
863 584
864 /* Process SCTP notifications */
865 if (msg.msg_flags & MSG_NOTIFICATION) {
866 msg.msg_control = incmsg;
867 msg.msg_controllen = sizeof(incmsg);
868
869 process_sctp_notification(con, &msg,
870 page_address(con->rx_page) + con->cb.base);
871 mutex_unlock(&con->sock_mutex);
872 return 0;
873 }
874 BUG_ON(con->nodeid == 0); 585 BUG_ON(con->nodeid == 0);
875 586
876 if (ret == len)
877 call_again_soon = 1;
878 cbuf_add(&con->cb, ret); 587 cbuf_add(&con->cb, ret);
879 ret = dlm_process_incoming_buffer(con->nodeid, 588 ret = dlm_process_incoming_buffer(con->nodeid,
880 page_address(con->rx_page), 589 page_address(con->rx_page),
881 con->cb.base, con->cb.len, 590 con->cb.base, con->cb.len,
882 PAGE_CACHE_SIZE); 591 PAGE_CACHE_SIZE);
883 if (ret == -EBADMSG) { 592 if (ret == -EBADMSG) {
884 log_print("lowcomms: addr=%p, base=%u, len=%u, " 593 log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
885 "iov_len=%u, iov_base[0]=%p, read=%d", 594 page_address(con->rx_page), con->cb.base,
886 page_address(con->rx_page), con->cb.base, con->cb.len, 595 con->cb.len, r);
887 len, iov[0].iov_base, r);
888 } 596 }
889 if (ret < 0) 597 if (ret < 0)
890 goto out_close; 598 goto out_close;
@@ -1050,6 +758,120 @@ accept_err:
1050 return result; 758 return result;
1051} 759}
1052 760
761int sctp_accept_from_sock(struct connection *con)
762{
763 /* Check that the new node is in the lockspace */
764 struct sctp_prim prim;
765 int nodeid;
766 int prim_len, ret;
767 int addr_len;
768 struct connection *newcon;
769 struct connection *addcon;
770 struct socket *newsock;
771
772 mutex_lock(&connections_lock);
773 if (!dlm_allow_conn) {
774 mutex_unlock(&connections_lock);
775 return -1;
776 }
777 mutex_unlock(&connections_lock);
778
779 mutex_lock_nested(&con->sock_mutex, 0);
780
781 ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
782 if (ret < 0)
783 goto accept_err;
784
785 memset(&prim, 0, sizeof(struct sctp_prim));
786 prim_len = sizeof(struct sctp_prim);
787
788 ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
789 (char *)&prim, &prim_len);
790 if (ret < 0) {
791 log_print("getsockopt/sctp_primary_addr failed: %d", ret);
792 goto accept_err;
793 }
794
795 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
796 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
797 unsigned char *b = (unsigned char *)&prim.ssp_addr;
798
799 log_print("reject connect from unknown addr");
800 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
801 b, sizeof(struct sockaddr_storage));
802 goto accept_err;
803 }
804
805 newcon = nodeid2con(nodeid, GFP_NOFS);
806 if (!newcon) {
807 ret = -ENOMEM;
808 goto accept_err;
809 }
810
811 mutex_lock_nested(&newcon->sock_mutex, 1);
812
813 if (newcon->sock) {
814 struct connection *othercon = newcon->othercon;
815
816 if (!othercon) {
817 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
818 if (!othercon) {
819 log_print("failed to allocate incoming socket");
820 mutex_unlock(&newcon->sock_mutex);
821 ret = -ENOMEM;
822 goto accept_err;
823 }
824 othercon->nodeid = nodeid;
825 othercon->rx_action = receive_from_sock;
826 mutex_init(&othercon->sock_mutex);
827 INIT_WORK(&othercon->swork, process_send_sockets);
828 INIT_WORK(&othercon->rwork, process_recv_sockets);
829 set_bit(CF_IS_OTHERCON, &othercon->flags);
830 }
831 if (!othercon->sock) {
832 newcon->othercon = othercon;
833 othercon->sock = newsock;
834 newsock->sk->sk_user_data = othercon;
835 add_sock(newsock, othercon);
836 addcon = othercon;
837 } else {
838 printk("Extra connection from node %d attempted\n", nodeid);
839 ret = -EAGAIN;
840 mutex_unlock(&newcon->sock_mutex);
841 goto accept_err;
842 }
843 } else {
844 newsock->sk->sk_user_data = newcon;
845 newcon->rx_action = receive_from_sock;
846 add_sock(newsock, newcon);
847 addcon = newcon;
848 }
849
850 log_print("connected to %d", nodeid);
851
852 mutex_unlock(&newcon->sock_mutex);
853
854 /*
855 * Add it to the active queue in case we got data
856 * between processing the accept adding the socket
857 * to the read_sockets list
858 */
859 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
860 queue_work(recv_workqueue, &addcon->rwork);
861 mutex_unlock(&con->sock_mutex);
862
863 return 0;
864
865accept_err:
866 mutex_unlock(&con->sock_mutex);
867 if (newsock)
868 sock_release(newsock);
869 if (ret != -EAGAIN)
870 log_print("error accepting connection from node: %d", ret);
871
872 return ret;
873}
874
1053static void free_entry(struct writequeue_entry *e) 875static void free_entry(struct writequeue_entry *e)
1054{ 876{
1055 __free_page(e->page); 877 __free_page(e->page);
@@ -1074,96 +896,127 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1074 } 896 }
1075} 897}
1076 898
899/*
900 * sctp_bind_addrs - bind a SCTP socket to all our addresses
901 */
902static int sctp_bind_addrs(struct connection *con, uint16_t port)
903{
904 struct sockaddr_storage localaddr;
905 int i, addr_len, result = 0;
906
907 for (i = 0; i < dlm_local_count; i++) {
908 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
909 make_sockaddr(&localaddr, port, &addr_len);
910
911 if (!i)
912 result = kernel_bind(con->sock,
913 (struct sockaddr *)&localaddr,
914 addr_len);
915 else
916 result = kernel_setsockopt(con->sock, SOL_SCTP,
917 SCTP_SOCKOPT_BINDX_ADD,
918 (char *)&localaddr, addr_len);
919
920 if (result < 0) {
921 log_print("Can't bind to %d addr number %d, %d.\n",
922 port, i + 1, result);
923 break;
924 }
925 }
926 return result;
927}
928
1077/* Initiate an SCTP association. 929/* Initiate an SCTP association.
1078 This is a special case of send_to_sock() in that we don't yet have a 930 This is a special case of send_to_sock() in that we don't yet have a
1079 peeled-off socket for this association, so we use the listening socket 931 peeled-off socket for this association, so we use the listening socket
1080 and add the primary IP address of the remote node. 932 and add the primary IP address of the remote node.
1081 */ 933 */
1082static void sctp_init_assoc(struct connection *con) 934static void sctp_connect_to_sock(struct connection *con)
1083{ 935{
1084 struct sockaddr_storage rem_addr; 936 struct sockaddr_storage daddr;
1085 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 937 int one = 1;
1086 struct msghdr outmessage; 938 int result;
1087 struct cmsghdr *cmsg; 939 int addr_len;
1088 struct sctp_sndrcvinfo *sinfo; 940 struct socket *sock;
1089 struct connection *base_con; 941
1090 struct writequeue_entry *e; 942 if (con->nodeid == 0) {
1091 int len, offset; 943 log_print("attempt to connect sock 0 foiled");
1092 int ret; 944 return;
1093 int addrlen; 945 }
1094 struct kvec iov[1];
1095 946
1096 mutex_lock(&con->sock_mutex); 947 mutex_lock(&con->sock_mutex);
1097 if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1098 goto unlock;
1099 948
1100 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, 949 /* Some odd races can cause double-connects, ignore them */
1101 con->try_new_addr)) { 950 if (con->retries++ > MAX_CONNECT_RETRIES)
951 goto out;
952
953 if (con->sock) {
954 log_print("node %d already connected.", con->nodeid);
955 goto out;
956 }
957
958 memset(&daddr, 0, sizeof(daddr));
959 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
960 if (result < 0) {
1102 log_print("no address for nodeid %d", con->nodeid); 961 log_print("no address for nodeid %d", con->nodeid);
1103 goto unlock; 962 goto out;
1104 } 963 }
1105 base_con = nodeid2con(0, 0);
1106 BUG_ON(base_con == NULL);
1107 964
1108 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 965 /* Create a socket to communicate with */
966 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
967 SOCK_STREAM, IPPROTO_SCTP, &sock);
968 if (result < 0)
969 goto socket_err;
1109 970
1110 outmessage.msg_name = &rem_addr; 971 sock->sk->sk_user_data = con;
1111 outmessage.msg_namelen = addrlen; 972 con->rx_action = receive_from_sock;
1112 outmessage.msg_control = outcmsg; 973 con->connect_action = sctp_connect_to_sock;
1113 outmessage.msg_controllen = sizeof(outcmsg); 974 add_sock(sock, con);
1114 outmessage.msg_flags = MSG_EOR;
1115 975
1116 spin_lock(&con->writequeue_lock); 976 /* Bind to all addresses. */
977 if (sctp_bind_addrs(con, 0))
978 goto bind_err;
1117 979
1118 if (list_empty(&con->writequeue)) { 980 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1119 spin_unlock(&con->writequeue_lock);
1120 log_print("writequeue empty for nodeid %d", con->nodeid);
1121 goto unlock;
1122 }
1123 981
1124 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 982 log_print("connecting to %d", con->nodeid);
1125 len = e->len;
1126 offset = e->offset;
1127 983
1128 /* Send the first block off the write queue */ 984 /* Turn off Nagle's algorithm */
1129 iov[0].iov_base = page_address(e->page)+offset; 985 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1130 iov[0].iov_len = len; 986 sizeof(one));
1131 spin_unlock(&con->writequeue_lock);
1132 987
1133 if (rem_addr.ss_family == AF_INET) { 988 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1134 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; 989 O_NONBLOCK);
1135 log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr); 990 if (result == -EINPROGRESS)
1136 } else { 991 result = 0;
1137 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr; 992 if (result == 0)
1138 log_print("Trying to connect to %pI6", &sin6->sin6_addr); 993 goto out;
1139 }
1140 994
1141 cmsg = CMSG_FIRSTHDR(&outmessage);
1142 cmsg->cmsg_level = IPPROTO_SCTP;
1143 cmsg->cmsg_type = SCTP_SNDRCV;
1144 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1145 sinfo = CMSG_DATA(cmsg);
1146 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1147 sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
1148 outmessage.msg_controllen = cmsg->cmsg_len;
1149 sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1150 995
1151 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 996bind_err:
1152 if (ret < 0) { 997 con->sock = NULL;
1153 log_print("Send first packet to node %d failed: %d", 998 sock_release(sock);
1154 con->nodeid, ret);
1155 999
1156 /* Try again later */ 1000socket_err:
1001 /*
1002 * Some errors are fatal and this list might need adjusting. For other
1003 * errors we try again until the max number of retries is reached.
1004 */
1005 if (result != -EHOSTUNREACH &&
1006 result != -ENETUNREACH &&
1007 result != -ENETDOWN &&
1008 result != -EINVAL &&
1009 result != -EPROTONOSUPPORT) {
1010 log_print("connect %d try %d error %d", con->nodeid,
1011 con->retries, result);
1012 mutex_unlock(&con->sock_mutex);
1013 msleep(1000);
1157 clear_bit(CF_CONNECT_PENDING, &con->flags); 1014 clear_bit(CF_CONNECT_PENDING, &con->flags);
1158 clear_bit(CF_INIT_PENDING, &con->flags); 1015 lowcomms_connect_sock(con);
1159 } 1016 return;
1160 else {
1161 spin_lock(&con->writequeue_lock);
1162 writequeue_entry_complete(e, ret);
1163 spin_unlock(&con->writequeue_lock);
1164 } 1017 }
1165 1018
1166unlock: 1019out:
1167 mutex_unlock(&con->sock_mutex); 1020 mutex_unlock(&con->sock_mutex);
1168} 1021}
1169 1022
@@ -1343,37 +1196,11 @@ static void init_local(void)
1343 } 1196 }
1344} 1197}
1345 1198
1346/* Bind to an IP address. SCTP allows multiple address so it can do
1347 multi-homing */
1348static int add_sctp_bind_addr(struct connection *sctp_con,
1349 struct sockaddr_storage *addr,
1350 int addr_len, int num)
1351{
1352 int result = 0;
1353
1354 if (num == 1)
1355 result = kernel_bind(sctp_con->sock,
1356 (struct sockaddr *) addr,
1357 addr_len);
1358 else
1359 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1360 SCTP_SOCKOPT_BINDX_ADD,
1361 (char *)addr, addr_len);
1362
1363 if (result < 0)
1364 log_print("Can't bind to port %d addr number %d",
1365 dlm_config.ci_tcp_port, num);
1366
1367 return result;
1368}
1369
1370/* Initialise SCTP socket and bind to all interfaces */ 1199/* Initialise SCTP socket and bind to all interfaces */
1371static int sctp_listen_for_all(void) 1200static int sctp_listen_for_all(void)
1372{ 1201{
1373 struct socket *sock = NULL; 1202 struct socket *sock = NULL;
1374 struct sockaddr_storage localaddr; 1203 int result = -EINVAL;
1375 struct sctp_event_subscribe subscribe;
1376 int result = -EINVAL, num = 1, i, addr_len;
1377 struct connection *con = nodeid2con(0, GFP_NOFS); 1204 struct connection *con = nodeid2con(0, GFP_NOFS);
1378 int bufsize = NEEDED_RMEM; 1205 int bufsize = NEEDED_RMEM;
1379 int one = 1; 1206 int one = 1;
@@ -1384,33 +1211,17 @@ static int sctp_listen_for_all(void)
1384 log_print("Using SCTP for communications"); 1211 log_print("Using SCTP for communications");
1385 1212
1386 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1213 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1387 SOCK_SEQPACKET, IPPROTO_SCTP, &sock); 1214 SOCK_STREAM, IPPROTO_SCTP, &sock);
1388 if (result < 0) { 1215 if (result < 0) {
1389 log_print("Can't create comms socket, check SCTP is loaded"); 1216 log_print("Can't create comms socket, check SCTP is loaded");
1390 goto out; 1217 goto out;
1391 } 1218 }
1392 1219
1393 /* Listen for events */
1394 memset(&subscribe, 0, sizeof(subscribe));
1395 subscribe.sctp_data_io_event = 1;
1396 subscribe.sctp_association_event = 1;
1397 subscribe.sctp_send_failure_event = 1;
1398 subscribe.sctp_shutdown_event = 1;
1399 subscribe.sctp_partial_delivery_event = 1;
1400
1401 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1220 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1402 (char *)&bufsize, sizeof(bufsize)); 1221 (char *)&bufsize, sizeof(bufsize));
1403 if (result) 1222 if (result)
1404 log_print("Error increasing buffer space on socket %d", result); 1223 log_print("Error increasing buffer space on socket %d", result);
1405 1224
1406 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1407 (char *)&subscribe, sizeof(subscribe));
1408 if (result < 0) {
1409 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1410 result);
1411 goto create_delsock;
1412 }
1413
1414 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, 1225 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1415 sizeof(one)); 1226 sizeof(one));
1416 if (result < 0) 1227 if (result < 0)
@@ -1420,19 +1231,12 @@ static int sctp_listen_for_all(void)
1420 sock->sk->sk_user_data = con; 1231 sock->sk->sk_user_data = con;
1421 con->sock = sock; 1232 con->sock = sock;
1422 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1233 con->sock->sk->sk_data_ready = lowcomms_data_ready;
1423 con->rx_action = receive_from_sock; 1234 con->rx_action = sctp_accept_from_sock;
1424 con->connect_action = sctp_init_assoc; 1235 con->connect_action = sctp_connect_to_sock;
1425
1426 /* Bind to all interfaces. */
1427 for (i = 0; i < dlm_local_count; i++) {
1428 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1429 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1430 1236
1431 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1237 /* Bind to all addresses. */
1432 if (result) 1238 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1433 goto create_delsock; 1239 goto create_delsock;
1434 ++num;
1435 }
1436 1240
1437 result = sock->ops->listen(sock, 5); 1241 result = sock->ops->listen(sock, 5);
1438 if (result < 0) { 1242 if (result < 0) {
@@ -1636,8 +1440,7 @@ send_error:
1636 1440
1637out_connect: 1441out_connect:
1638 mutex_unlock(&con->sock_mutex); 1442 mutex_unlock(&con->sock_mutex);
1639 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1443 lowcomms_connect_sock(con);
1640 lowcomms_connect_sock(con);
1641} 1444}
1642 1445
1643static void clean_one_writequeue(struct connection *con) 1446static void clean_one_writequeue(struct connection *con)