aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
authorMike Christie <michaelc@cs.wisc.edu>2013-06-14 05:56:13 -0400
committerDavid Teigland <teigland@redhat.com>2013-06-14 14:07:11 -0400
commit5d6898714fe2ce485e95ac74479ed40ebd8d5748 (patch)
tree83a1adc26581662bfa64d4177d2c71913822efea /fs/dlm
parent98e1b60ecc441625c91013e88f14cbd1b3c1fa08 (diff)
dlm: retry failed SCTP sends
Currently if a SCTP send fails, we lose the data we were trying to send because the writequeue_entry is released when we do the send. When this happens other nodes will then hang waiting for a reply. This adds support for SCTP to retry the send operation. I also removed the retry limit for SCTP use, because we want to make sure we try every path during init time and for longer failures we want to continually retry in case paths come back up while trying other paths. We will do this until userspace tells us to stop. Signed-off-by: Mike Christie <michaelc@cs.wisc.edu> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lowcomms.c104
1 files changed, 75 insertions, 29 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 56015c9e8d00..a4fad32bb788 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -607,15 +607,56 @@ static void sctp_init_failed(void)
607 mutex_unlock(&connections_lock); 607 mutex_unlock(&connections_lock);
608} 608}
609 609
610static void retry_failed_sctp_send(struct connection *recv_con,
611 struct sctp_send_failed *sn_send_failed,
612 char *buf)
613{
614 int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
615 struct dlm_mhandle *mh;
616 struct connection *con;
617 char *retry_buf;
618 int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
619
620 log_print("Retry sending %d bytes to node id %d", len, nodeid);
621
622 con = nodeid2con(nodeid, 0);
623 if (!con) {
624 log_print("Could not look up con for nodeid %d\n",
625 nodeid);
626 return;
627 }
628
629 mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
630 if (!mh) {
631 log_print("Could not allocate buf for retry.");
632 return;
633 }
634 memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
635 dlm_lowcomms_commit_buffer(mh);
636
637 /*
638 * If we got a assoc changed event before the send failed event then
639 * we only need to retry the send.
640 */
641 if (con->sctp_assoc) {
642 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
643 queue_work(send_workqueue, &con->swork);
644 } else
645 sctp_init_failed_foreach(con);
646}
647
610/* Something happened to an association */ 648/* Something happened to an association */
611static void process_sctp_notification(struct connection *con, 649static void process_sctp_notification(struct connection *con,
612 struct msghdr *msg, char *buf) 650 struct msghdr *msg, char *buf)
613{ 651{
614 union sctp_notification *sn = (union sctp_notification *)buf; 652 union sctp_notification *sn = (union sctp_notification *)buf;
615 653
616 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 654 switch (sn->sn_header.sn_type) {
655 case SCTP_SEND_FAILED:
656 retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
657 break;
658 case SCTP_ASSOC_CHANGE:
617 switch (sn->sn_assoc_change.sac_state) { 659 switch (sn->sn_assoc_change.sac_state) {
618
619 case SCTP_COMM_UP: 660 case SCTP_COMM_UP:
620 case SCTP_RESTART: 661 case SCTP_RESTART:
621 { 662 {
@@ -713,14 +754,10 @@ static void process_sctp_notification(struct connection *con,
713 } 754 }
714 break; 755 break;
715 756
716 /* We don't know which INIT failed, so clear the PENDING flags
717 * on them all. if assoc_id is zero then it will then try
718 * again */
719
720 case SCTP_CANT_STR_ASSOC: 757 case SCTP_CANT_STR_ASSOC:
721 { 758 {
759 /* Will retry init when we get the send failed notification */
722 log_print("Can't start SCTP association - retrying"); 760 log_print("Can't start SCTP association - retrying");
723 sctp_init_failed();
724 } 761 }
725 break; 762 break;
726 763
@@ -729,6 +766,8 @@ static void process_sctp_notification(struct connection *con,
729 (int)sn->sn_assoc_change.sac_assoc_id, 766 (int)sn->sn_assoc_change.sac_assoc_id,
730 sn->sn_assoc_change.sac_state); 767 sn->sn_assoc_change.sac_state);
731 } 768 }
769 default:
770 ; /* fall through */
732 } 771 }
733} 772}
734 773
@@ -988,6 +1027,24 @@ static void free_entry(struct writequeue_entry *e)
988 kfree(e); 1027 kfree(e);
989} 1028}
990 1029
1030/*
1031 * writequeue_entry_complete - try to delete and free write queue entry
1032 * @e: write queue entry to try to delete
1033 * @completed: bytes completed
1034 *
1035 * writequeue_lock must be held.
1036 */
1037static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1038{
1039 e->offset += completed;
1040 e->len -= completed;
1041
1042 if (e->len == 0 && e->users == 0) {
1043 list_del(&e->list);
1044 free_entry(e);
1045 }
1046}
1047
991/* Initiate an SCTP association. 1048/* Initiate an SCTP association.
992 This is a special case of send_to_sock() in that we don't yet have a 1049 This is a special case of send_to_sock() in that we don't yet have a
993 peeled-off socket for this association, so we use the listening socket 1050 peeled-off socket for this association, so we use the listening socket
@@ -1007,16 +1064,14 @@ static void sctp_init_assoc(struct connection *con)
1007 int addrlen; 1064 int addrlen;
1008 struct kvec iov[1]; 1065 struct kvec iov[1];
1009 1066
1067 mutex_lock(&con->sock_mutex);
1010 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 1068 if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1011 return; 1069 goto unlock;
1012
1013 if (con->retries++ > MAX_CONNECT_RETRIES)
1014 return;
1015 1070
1016 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, 1071 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
1017 con->try_new_addr)) { 1072 con->try_new_addr)) {
1018 log_print("no address for nodeid %d", con->nodeid); 1073 log_print("no address for nodeid %d", con->nodeid);
1019 return; 1074 goto unlock;
1020 } 1075 }
1021 base_con = nodeid2con(0, 0); 1076 base_con = nodeid2con(0, 0);
1022 BUG_ON(base_con == NULL); 1077 BUG_ON(base_con == NULL);
@@ -1034,17 +1089,17 @@ static void sctp_init_assoc(struct connection *con)
1034 if (list_empty(&con->writequeue)) { 1089 if (list_empty(&con->writequeue)) {
1035 spin_unlock(&con->writequeue_lock); 1090 spin_unlock(&con->writequeue_lock);
1036 log_print("writequeue empty for nodeid %d", con->nodeid); 1091 log_print("writequeue empty for nodeid %d", con->nodeid);
1037 return; 1092 goto unlock;
1038 } 1093 }
1039 1094
1040 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 1095 e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1041 len = e->len; 1096 len = e->len;
1042 offset = e->offset; 1097 offset = e->offset;
1043 spin_unlock(&con->writequeue_lock);
1044 1098
1045 /* Send the first block off the write queue */ 1099 /* Send the first block off the write queue */
1046 iov[0].iov_base = page_address(e->page)+offset; 1100 iov[0].iov_base = page_address(e->page)+offset;
1047 iov[0].iov_len = len; 1101 iov[0].iov_len = len;
1102 spin_unlock(&con->writequeue_lock);
1048 1103
1049 if (rem_addr.ss_family == AF_INET) { 1104 if (rem_addr.ss_family == AF_INET) {
1050 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; 1105 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
@@ -1060,7 +1115,7 @@ static void sctp_init_assoc(struct connection *con)
1060 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 1115 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1061 sinfo = CMSG_DATA(cmsg); 1116 sinfo = CMSG_DATA(cmsg);
1062 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 1117 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1063 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 1118 sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
1064 outmessage.msg_controllen = cmsg->cmsg_len; 1119 outmessage.msg_controllen = cmsg->cmsg_len;
1065 sinfo->sinfo_flags |= SCTP_ADDR_OVER; 1120 sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1066 1121
@@ -1075,15 +1130,12 @@ static void sctp_init_assoc(struct connection *con)
1075 } 1130 }
1076 else { 1131 else {
1077 spin_lock(&con->writequeue_lock); 1132 spin_lock(&con->writequeue_lock);
1078 e->offset += ret; 1133 writequeue_entry_complete(e, ret);
1079 e->len -= ret;
1080
1081 if (e->len == 0 && e->users == 0) {
1082 list_del(&e->list);
1083 free_entry(e);
1084 }
1085 spin_unlock(&con->writequeue_lock); 1134 spin_unlock(&con->writequeue_lock);
1086 } 1135 }
1136
1137unlock:
1138 mutex_unlock(&con->sock_mutex);
1087} 1139}
1088 1140
1089/* Connect a new socket to its peer */ 1141/* Connect a new socket to its peer */
@@ -1533,13 +1585,7 @@ static void send_to_sock(struct connection *con)
1533 } 1585 }
1534 1586
1535 spin_lock(&con->writequeue_lock); 1587 spin_lock(&con->writequeue_lock);
1536 e->offset += ret; 1588 writequeue_entry_complete(e, ret);
1537 e->len -= ret;
1538
1539 if (e->len == 0 && e->users == 0) {
1540 list_del(&e->list);
1541 free_entry(e);
1542 }
1543 } 1589 }
1544 spin_unlock(&con->writequeue_lock); 1590 spin_unlock(&con->writequeue_lock);
1545out: 1591out: