aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lowcomms.c104
1 files changed, 75 insertions, 29 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 56015c9e8d00..a4fad32bb788 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -607,15 +607,56 @@ static void sctp_init_failed(void)
607 mutex_unlock(&connections_lock); 607 mutex_unlock(&connections_lock);
608} 608}
609 609
610static void retry_failed_sctp_send(struct connection *recv_con,
611 struct sctp_send_failed *sn_send_failed,
612 char *buf)
613{
614 int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
615 struct dlm_mhandle *mh;
616 struct connection *con;
617 char *retry_buf;
618 int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
619
620 log_print("Retry sending %d bytes to node id %d", len, nodeid);
621
622 con = nodeid2con(nodeid, 0);
623 if (!con) {
624 log_print("Could not look up con for nodeid %d\n",
625 nodeid);
626 return;
627 }
628
629 mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
630 if (!mh) {
631 log_print("Could not allocate buf for retry.");
632 return;
633 }
634 memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
635 dlm_lowcomms_commit_buffer(mh);
636
637 /*
638 * If we got a assoc changed event before the send failed event then
639 * we only need to retry the send.
640 */
641 if (con->sctp_assoc) {
642 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
643 queue_work(send_workqueue, &con->swork);
644 } else
645 sctp_init_failed_foreach(con);
646}
647
610/* Something happened to an association */ 648/* Something happened to an association */
611static void process_sctp_notification(struct connection *con, 649static void process_sctp_notification(struct connection *con,
612 struct msghdr *msg, char *buf) 650 struct msghdr *msg, char *buf)
613{ 651{
614 union sctp_notification *sn = (union sctp_notification *)buf; 652 union sctp_notification *sn = (union sctp_notification *)buf;
615 653
616 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 654 switch (sn->sn_header.sn_type) {
655 case SCTP_SEND_FAILED:
656 retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
657 break;
658 case SCTP_ASSOC_CHANGE:
617 switch (sn->sn_assoc_change.sac_state) { 659 switch (sn->sn_assoc_change.sac_state) {
618
619 case SCTP_COMM_UP: 660 case SCTP_COMM_UP:
620 case SCTP_RESTART: 661 case SCTP_RESTART:
621 { 662 {
@@ -713,14 +754,10 @@ static void process_sctp_notification(struct connection *con,
713 } 754 }
714 break; 755 break;
715 756
716 /* We don't know which INIT failed, so clear the PENDING flags
717 * on them all. if assoc_id is zero then it will then try
718 * again */
719
720 case SCTP_CANT_STR_ASSOC: 757 case SCTP_CANT_STR_ASSOC:
721 { 758 {
759 /* Will retry init when we get the send failed notification */
722 log_print("Can't start SCTP association - retrying"); 760 log_print("Can't start SCTP association - retrying");
723 sctp_init_failed();
724 } 761 }
725 break; 762 break;
726 763
@@ -729,6 +766,8 @@ static void process_sctp_notification(struct connection *con,
729 (int)sn->sn_assoc_change.sac_assoc_id, 766 (int)sn->sn_assoc_change.sac_assoc_id,
730 sn->sn_assoc_change.sac_state); 767 sn->sn_assoc_change.sac_state);
731 } 768 }
769 default:
770 ; /* fall through */
732 } 771 }
733} 772}
734 773
@@ -988,6 +1027,24 @@ static void free_entry(struct writequeue_entry *e)
988 kfree(e); 1027 kfree(e);
989} 1028}
990 1029
1030/*
1031 * writequeue_entry_complete - try to delete and free write queue entry
1032 * @e: write queue entry to try to delete
1033 * @completed: bytes completed
1034 *
1035 * writequeue_lock must be held.
1036 */
1037static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1038{
1039 e->offset += completed;
1040 e->len -= completed;
1041
1042 if (e->len == 0 && e->users == 0) {
1043 list_del(&e->list);
1044 free_entry(e);
1045 }
1046}
1047
991/* Initiate an SCTP association. 1048/* Initiate an SCTP association.
992 This is a special case of send_to_sock() in that we don't yet have a 1049 This is a special case of send_to_sock() in that we don't yet have a
993 peeled-off socket for this association, so we use the listening socket 1050 peeled-off socket for this association, so we use the listening socket
@@ -1007,16 +1064,14 @@ static void sctp_init_assoc(struct connection *con)
1007 int addrlen; 1064 int addrlen;
1008 struct kvec iov[1]; 1065 struct kvec iov[1];
1009 1066
1067 mutex_lock(&con->sock_mutex);
1010 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 1068 if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1011 return; 1069 goto unlock;
1012
1013 if (con->retries++ > MAX_CONNECT_RETRIES)
1014 return;
1015 1070
1016 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, 1071 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
1017 con->try_new_addr)) { 1072 con->try_new_addr)) {
1018 log_print("no address for nodeid %d", con->nodeid); 1073 log_print("no address for nodeid %d", con->nodeid);
1019 return; 1074 goto unlock;
1020 } 1075 }
1021 base_con = nodeid2con(0, 0); 1076 base_con = nodeid2con(0, 0);
1022 BUG_ON(base_con == NULL); 1077 BUG_ON(base_con == NULL);
@@ -1034,17 +1089,17 @@ static void sctp_init_assoc(struct connection *con)
1034 if (list_empty(&con->writequeue)) { 1089 if (list_empty(&con->writequeue)) {
1035 spin_unlock(&con->writequeue_lock); 1090 spin_unlock(&con->writequeue_lock);
1036 log_print("writequeue empty for nodeid %d", con->nodeid); 1091 log_print("writequeue empty for nodeid %d", con->nodeid);
1037 return; 1092 goto unlock;
1038 } 1093 }
1039 1094
1040 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 1095 e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1041 len = e->len; 1096 len = e->len;
1042 offset = e->offset; 1097 offset = e->offset;
1043 spin_unlock(&con->writequeue_lock);
1044 1098
1045 /* Send the first block off the write queue */ 1099 /* Send the first block off the write queue */
1046 iov[0].iov_base = page_address(e->page)+offset; 1100 iov[0].iov_base = page_address(e->page)+offset;
1047 iov[0].iov_len = len; 1101 iov[0].iov_len = len;
1102 spin_unlock(&con->writequeue_lock);
1048 1103
1049 if (rem_addr.ss_family == AF_INET) { 1104 if (rem_addr.ss_family == AF_INET) {
1050 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; 1105 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
@@ -1060,7 +1115,7 @@ static void sctp_init_assoc(struct connection *con)
1060 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 1115 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1061 sinfo = CMSG_DATA(cmsg); 1116 sinfo = CMSG_DATA(cmsg);
1062 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 1117 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1063 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 1118 sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
1064 outmessage.msg_controllen = cmsg->cmsg_len; 1119 outmessage.msg_controllen = cmsg->cmsg_len;
1065 sinfo->sinfo_flags |= SCTP_ADDR_OVER; 1120 sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1066 1121
@@ -1075,15 +1130,12 @@ static void sctp_init_assoc(struct connection *con)
1075 } 1130 }
1076 else { 1131 else {
1077 spin_lock(&con->writequeue_lock); 1132 spin_lock(&con->writequeue_lock);
1078 e->offset += ret; 1133 writequeue_entry_complete(e, ret);
1079 e->len -= ret;
1080
1081 if (e->len == 0 && e->users == 0) {
1082 list_del(&e->list);
1083 free_entry(e);
1084 }
1085 spin_unlock(&con->writequeue_lock); 1134 spin_unlock(&con->writequeue_lock);
1086 } 1135 }
1136
1137unlock:
1138 mutex_unlock(&con->sock_mutex);
1087} 1139}
1088 1140
1089/* Connect a new socket to its peer */ 1141/* Connect a new socket to its peer */
@@ -1533,13 +1585,7 @@ static void send_to_sock(struct connection *con)
1533 } 1585 }
1534 1586
1535 spin_lock(&con->writequeue_lock); 1587 spin_lock(&con->writequeue_lock);
1536 e->offset += ret; 1588 writequeue_entry_complete(e, ret);
1537 e->len -= ret;
1538
1539 if (e->len == 0 && e->users == 0) {
1540 list_del(&e->list);
1541 free_entry(e);
1542 }
1543 } 1589 }
1544 spin_unlock(&con->writequeue_lock); 1590 spin_unlock(&con->writequeue_lock);
1545out: 1591out: