diff options
author | Chuck Lever <cel@citi.umich.edu> | 2005-08-11 16:25:53 -0400 |
---|---|---|
committer | Trond Myklebust <Trond.Myklebust@netapp.com> | 2005-09-23 12:38:29 -0400 |
commit | b0d93ad511ce2f37823a07c7a3258117a431f5fb (patch) | |
tree | a9fc753bf821d1d71c345f58e056511dfd0cf5ac /net/sunrpc | |
parent | c7b2cae8a634015b72941ba2fc6c4bc9b8d3a129 (diff) |
[PATCH] RPC: separate TCP and UDP transport connection logic
Create separate connection worker functions for managing UDP and TCP
transport sockets. This eliminates several dependencies on "xprt->stream".
Test-plan:
Destructive testing (unplugging the network temporarily). Connectathon with
v2, v3, and v4.
Version: Thu, 11 Aug 2005 16:08:18 -0400
Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/xprtsock.c | 164 |
1 files changed, 91 insertions, 73 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 70a772d7a796..f91529787b9b 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c | |||
@@ -836,102 +836,118 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock) | |||
836 | return err; | 836 | return err; |
837 | } | 837 | } |
838 | 838 | ||
839 | static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport) | 839 | /** |
840 | * xs_udp_connect_worker - set up a UDP socket | ||
841 | * @args: RPC transport to connect | ||
842 | * | ||
843 | * Invoked by a work queue tasklet. | ||
844 | */ | ||
845 | static void xs_udp_connect_worker(void *args) | ||
840 | { | 846 | { |
841 | struct socket *sock; | 847 | struct rpc_xprt *xprt = (struct rpc_xprt *) args; |
842 | int type, err; | 848 | struct socket *sock = xprt->sock; |
843 | 849 | int err, status = -EIO; | |
844 | dprintk("RPC: xs_create(%s %d)\n", | ||
845 | (proto == IPPROTO_UDP)? "udp" : "tcp", proto); | ||
846 | 850 | ||
847 | type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; | 851 | if (xprt->shutdown || xprt->addr.sin_port == 0) |
852 | goto out; | ||
848 | 853 | ||
849 | if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { | 854 | dprintk("RPC: xs_udp_connect_worker for xprt %p\n", xprt); |
850 | dprintk("RPC: can't create socket (%d).\n", -err); | ||
851 | return NULL; | ||
852 | } | ||
853 | 855 | ||
854 | /* If the caller has the capability, bind to a reserved port */ | 856 | /* Start by resetting any existing state */ |
855 | if (resvport && xs_bindresvport(xprt, sock) < 0) | 857 | xs_close(xprt); |
856 | goto failed; | ||
857 | 858 | ||
858 | return sock; | 859 | if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) { |
860 | dprintk("RPC: can't create UDP transport socket (%d).\n", -err); | ||
861 | goto out; | ||
862 | } | ||
859 | 863 | ||
860 | failed: | 864 | if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) { |
861 | sock_release(sock); | 865 | sock_release(sock); |
862 | return NULL; | 866 | goto out; |
863 | } | 867 | } |
864 | 868 | ||
865 | static void xs_bind(struct rpc_xprt *xprt, struct socket *sock) | 869 | if (!xprt->inet) { |
866 | { | 870 | struct sock *sk = sock->sk; |
867 | struct sock *sk = sock->sk; | ||
868 | 871 | ||
869 | if (xprt->inet) | 872 | write_lock_bh(&sk->sk_callback_lock); |
870 | return; | ||
871 | 873 | ||
872 | write_lock_bh(&sk->sk_callback_lock); | 874 | sk->sk_user_data = xprt; |
873 | sk->sk_user_data = xprt; | 875 | xprt->old_data_ready = sk->sk_data_ready; |
874 | xprt->old_data_ready = sk->sk_data_ready; | 876 | xprt->old_state_change = sk->sk_state_change; |
875 | xprt->old_state_change = sk->sk_state_change; | 877 | xprt->old_write_space = sk->sk_write_space; |
876 | xprt->old_write_space = sk->sk_write_space; | ||
877 | if (xprt->prot == IPPROTO_UDP) { | ||
878 | sk->sk_data_ready = xs_udp_data_ready; | 878 | sk->sk_data_ready = xs_udp_data_ready; |
879 | sk->sk_write_space = xs_udp_write_space; | 879 | sk->sk_write_space = xs_udp_write_space; |
880 | sk->sk_no_check = UDP_CSUM_NORCV; | 880 | sk->sk_no_check = UDP_CSUM_NORCV; |
881 | |||
881 | xprt_set_connected(xprt); | 882 | xprt_set_connected(xprt); |
882 | } else { | ||
883 | tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ | ||
884 | sk->sk_data_ready = xs_tcp_data_ready; | ||
885 | sk->sk_state_change = xs_tcp_state_change; | ||
886 | sk->sk_write_space = xs_tcp_write_space; | ||
887 | xprt_clear_connected(xprt); | ||
888 | } | ||
889 | 883 | ||
890 | /* Reset to new socket */ | 884 | /* Reset to new socket */ |
891 | xprt->sock = sock; | 885 | xprt->sock = sock; |
892 | xprt->inet = sk; | 886 | xprt->inet = sk; |
893 | write_unlock_bh(&sk->sk_callback_lock); | ||
894 | 887 | ||
895 | return; | 888 | write_unlock_bh(&sk->sk_callback_lock); |
889 | } | ||
890 | xs_set_buffer_size(xprt); | ||
891 | status = 0; | ||
892 | out: | ||
893 | xprt_wake_pending_tasks(xprt, status); | ||
894 | xprt_clear_connecting(xprt); | ||
896 | } | 895 | } |
897 | 896 | ||
898 | /** | 897 | /** |
899 | * xs_connect_worker - try to connect a socket to a remote endpoint | 898 | * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint |
900 | * @args: RPC transport to connect | 899 | * @args: RPC transport to connect |
901 | * | 900 | * |
902 | * Invoked by a work queue tasklet. | 901 | * Invoked by a work queue tasklet. |
903 | */ | 902 | */ |
904 | static void xs_connect_worker(void *args) | 903 | static void xs_tcp_connect_worker(void *args) |
905 | { | 904 | { |
906 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; | 905 | struct rpc_xprt *xprt = (struct rpc_xprt *)args; |
907 | struct socket *sock = xprt->sock; | 906 | struct socket *sock = xprt->sock; |
908 | int status = -EIO; | 907 | int err, status = -EIO; |
909 | 908 | ||
910 | if (xprt->shutdown || xprt->addr.sin_port == 0) | 909 | if (xprt->shutdown || xprt->addr.sin_port == 0) |
911 | goto out; | 910 | goto out; |
912 | 911 | ||
913 | dprintk("RPC: xs_connect_worker xprt %p\n", xprt); | 912 | dprintk("RPC: xs_tcp_connect_worker for xprt %p\n", xprt); |
914 | 913 | ||
915 | /* | 914 | /* Start by resetting any existing socket state */ |
916 | * Start by resetting any existing state | ||
917 | */ | ||
918 | xs_close(xprt); | 915 | xs_close(xprt); |
919 | sock = xs_create(xprt, xprt->prot, xprt->resvport); | 916 | |
920 | if (sock == NULL) { | 917 | if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) { |
921 | /* couldn't create socket or bind to reserved port; | 918 | dprintk("RPC: can't create TCP transport socket (%d).\n", -err); |
922 | * this is likely a permanent error, so cause an abort */ | ||
923 | goto out; | 919 | goto out; |
924 | } | 920 | } |
925 | xs_bind(xprt, sock); | ||
926 | xs_set_buffer_size(xprt); | ||
927 | 921 | ||
928 | status = 0; | 922 | if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) { |
929 | if (!xprt->stream) | 923 | sock_release(sock); |
930 | goto out; | 924 | goto out; |
925 | } | ||
931 | 926 | ||
932 | /* | 927 | if (!xprt->inet) { |
933 | * Tell the socket layer to start connecting... | 928 | struct sock *sk = sock->sk; |
934 | */ | 929 | |
930 | write_lock_bh(&sk->sk_callback_lock); | ||
931 | |||
932 | sk->sk_user_data = xprt; | ||
933 | xprt->old_data_ready = sk->sk_data_ready; | ||
934 | xprt->old_state_change = sk->sk_state_change; | ||
935 | xprt->old_write_space = sk->sk_write_space; | ||
936 | sk->sk_data_ready = xs_tcp_data_ready; | ||
937 | sk->sk_state_change = xs_tcp_state_change; | ||
938 | sk->sk_write_space = xs_tcp_write_space; | ||
939 | tcp_sk(sk)->nonagle = 1; | ||
940 | |||
941 | xprt_clear_connected(xprt); | ||
942 | |||
943 | /* Reset to new socket */ | ||
944 | xprt->sock = sock; | ||
945 | xprt->inet = sk; | ||
946 | |||
947 | write_unlock_bh(&sk->sk_callback_lock); | ||
948 | } | ||
949 | |||
950 | /* Tell the socket layer to start connecting... */ | ||
935 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, | 951 | status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, |
936 | sizeof(xprt->addr), O_NONBLOCK); | 952 | sizeof(xprt->addr), O_NONBLOCK); |
937 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", | 953 | dprintk("RPC: %p connect status %d connected %d sock state %d\n", |
@@ -959,18 +975,20 @@ static void xs_connect(struct rpc_task *task) | |||
959 | { | 975 | { |
960 | struct rpc_xprt *xprt = task->tk_xprt; | 976 | struct rpc_xprt *xprt = task->tk_xprt; |
961 | 977 | ||
962 | if (!xprt_test_and_set_connecting(xprt)) { | 978 | if (xprt_test_and_set_connecting(xprt)) |
963 | if (xprt->sock != NULL) { | 979 | return; |
964 | dprintk("RPC: xs_connect delayed xprt %p\n", xprt); | 980 | |
965 | schedule_delayed_work(&xprt->connect_worker, | 981 | if (xprt->sock != NULL) { |
982 | dprintk("RPC: xs_connect delayed xprt %p\n", xprt); | ||
983 | schedule_delayed_work(&xprt->connect_worker, | ||
966 | RPC_REESTABLISH_TIMEOUT); | 984 | RPC_REESTABLISH_TIMEOUT); |
967 | } else { | 985 | } else { |
968 | dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); | 986 | dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); |
969 | schedule_work(&xprt->connect_worker); | 987 | schedule_work(&xprt->connect_worker); |
970 | /* flush_scheduled_work can sleep... */ | 988 | |
971 | if (!RPC_IS_ASYNC(task)) | 989 | /* flush_scheduled_work can sleep... */ |
972 | flush_scheduled_work(); | 990 | if (!RPC_IS_ASYNC(task)) |
973 | } | 991 | flush_scheduled_work(); |
974 | } | 992 | } |
975 | } | 993 | } |
976 | 994 | ||
@@ -1013,7 +1031,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) | |||
1013 | /* XXX: header size can vary due to auth type, IPv6, etc. */ | 1031 | /* XXX: header size can vary due to auth type, IPv6, etc. */ |
1014 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); | 1032 | xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); |
1015 | 1033 | ||
1016 | INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt); | 1034 | INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt); |
1017 | 1035 | ||
1018 | xprt->ops = &xs_ops; | 1036 | xprt->ops = &xs_ops; |
1019 | 1037 | ||
@@ -1052,7 +1070,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) | |||
1052 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; | 1070 | xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; |
1053 | xprt->max_payload = (1U << 31) - 1; | 1071 | xprt->max_payload = (1U << 31) - 1; |
1054 | 1072 | ||
1055 | INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt); | 1073 | INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt); |
1056 | 1074 | ||
1057 | xprt->ops = &xs_ops; | 1075 | xprt->ops = &xs_ops; |
1058 | 1076 | ||