aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <cel@citi.umich.edu>2005-08-11 16:25:53 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2005-09-23 12:38:29 -0400
commitb0d93ad511ce2f37823a07c7a3258117a431f5fb (patch)
treea9fc753bf821d1d71c345f58e056511dfd0cf5ac
parentc7b2cae8a634015b72941ba2fc6c4bc9b8d3a129 (diff)
[PATCH] RPC: separate TCP and UDP transport connection logic
Create separate connection worker functions for managing UDP and TCP transport sockets. This eliminates several dependencies on "xprt->stream". Test-plan: Destructive testing (unplugging the network temporarily). Connectathon with v2, v3, and v4. Version: Thu, 11 Aug 2005 16:08:18 -0400 Signed-off-by: Chuck Lever <cel@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
-rw-r--r--net/sunrpc/xprtsock.c164
1 files changed, 91 insertions, 73 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 70a772d7a796..f91529787b9b 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -836,102 +836,118 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
836 return err; 836 return err;
837} 837}
838 838
839static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport) 839/**
840 * xs_udp_connect_worker - set up a UDP socket
841 * @args: RPC transport to connect
842 *
843 * Invoked by a work queue tasklet.
844 */
845static void xs_udp_connect_worker(void *args)
840{ 846{
841 struct socket *sock; 847 struct rpc_xprt *xprt = (struct rpc_xprt *) args;
842 int type, err; 848 struct socket *sock = xprt->sock;
843 849 int err, status = -EIO;
844 dprintk("RPC: xs_create(%s %d)\n",
845 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
846 850
847 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; 851 if (xprt->shutdown || xprt->addr.sin_port == 0)
852 goto out;
848 853
849 if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { 854 dprintk("RPC: xs_udp_connect_worker for xprt %p\n", xprt);
850 dprintk("RPC: can't create socket (%d).\n", -err);
851 return NULL;
852 }
853 855
854 /* If the caller has the capability, bind to a reserved port */ 856 /* Start by resetting any existing state */
855 if (resvport && xs_bindresvport(xprt, sock) < 0) 857 xs_close(xprt);
856 goto failed;
857 858
858 return sock; 859 if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
860 dprintk("RPC: can't create UDP transport socket (%d).\n", -err);
861 goto out;
862 }
859 863
860failed: 864 if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
861 sock_release(sock); 865 sock_release(sock);
862 return NULL; 866 goto out;
863} 867 }
864 868
865static void xs_bind(struct rpc_xprt *xprt, struct socket *sock) 869 if (!xprt->inet) {
866{ 870 struct sock *sk = sock->sk;
867 struct sock *sk = sock->sk;
868 871
869 if (xprt->inet) 872 write_lock_bh(&sk->sk_callback_lock);
870 return;
871 873
872 write_lock_bh(&sk->sk_callback_lock); 874 sk->sk_user_data = xprt;
873 sk->sk_user_data = xprt; 875 xprt->old_data_ready = sk->sk_data_ready;
874 xprt->old_data_ready = sk->sk_data_ready; 876 xprt->old_state_change = sk->sk_state_change;
875 xprt->old_state_change = sk->sk_state_change; 877 xprt->old_write_space = sk->sk_write_space;
876 xprt->old_write_space = sk->sk_write_space;
877 if (xprt->prot == IPPROTO_UDP) {
878 sk->sk_data_ready = xs_udp_data_ready; 878 sk->sk_data_ready = xs_udp_data_ready;
879 sk->sk_write_space = xs_udp_write_space; 879 sk->sk_write_space = xs_udp_write_space;
880 sk->sk_no_check = UDP_CSUM_NORCV; 880 sk->sk_no_check = UDP_CSUM_NORCV;
881
881 xprt_set_connected(xprt); 882 xprt_set_connected(xprt);
882 } else {
883 tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */
884 sk->sk_data_ready = xs_tcp_data_ready;
885 sk->sk_state_change = xs_tcp_state_change;
886 sk->sk_write_space = xs_tcp_write_space;
887 xprt_clear_connected(xprt);
888 }
889 883
890 /* Reset to new socket */ 884 /* Reset to new socket */
891 xprt->sock = sock; 885 xprt->sock = sock;
892 xprt->inet = sk; 886 xprt->inet = sk;
893 write_unlock_bh(&sk->sk_callback_lock);
894 887
895 return; 888 write_unlock_bh(&sk->sk_callback_lock);
889 }
890 xs_set_buffer_size(xprt);
891 status = 0;
892out:
893 xprt_wake_pending_tasks(xprt, status);
894 xprt_clear_connecting(xprt);
896} 895}
897 896
898/** 897/**
899 * xs_connect_worker - try to connect a socket to a remote endpoint 898 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
900 * @args: RPC transport to connect 899 * @args: RPC transport to connect
901 * 900 *
902 * Invoked by a work queue tasklet. 901 * Invoked by a work queue tasklet.
903 */ 902 */
904static void xs_connect_worker(void *args) 903static void xs_tcp_connect_worker(void *args)
905{ 904{
906 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 905 struct rpc_xprt *xprt = (struct rpc_xprt *)args;
907 struct socket *sock = xprt->sock; 906 struct socket *sock = xprt->sock;
908 int status = -EIO; 907 int err, status = -EIO;
909 908
910 if (xprt->shutdown || xprt->addr.sin_port == 0) 909 if (xprt->shutdown || xprt->addr.sin_port == 0)
911 goto out; 910 goto out;
912 911
913 dprintk("RPC: xs_connect_worker xprt %p\n", xprt); 912 dprintk("RPC: xs_tcp_connect_worker for xprt %p\n", xprt);
914 913
915 /* 914 /* Start by resetting any existing socket state */
916 * Start by resetting any existing state
917 */
918 xs_close(xprt); 915 xs_close(xprt);
919 sock = xs_create(xprt, xprt->prot, xprt->resvport); 916
920 if (sock == NULL) { 917 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
921 /* couldn't create socket or bind to reserved port; 918 dprintk("RPC: can't create TCP transport socket (%d).\n", -err);
922 * this is likely a permanent error, so cause an abort */
923 goto out; 919 goto out;
924 } 920 }
925 xs_bind(xprt, sock);
926 xs_set_buffer_size(xprt);
927 921
928 status = 0; 922 if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
929 if (!xprt->stream) 923 sock_release(sock);
930 goto out; 924 goto out;
925 }
931 926
932 /* 927 if (!xprt->inet) {
933 * Tell the socket layer to start connecting... 928 struct sock *sk = sock->sk;
934 */ 929
930 write_lock_bh(&sk->sk_callback_lock);
931
932 sk->sk_user_data = xprt;
933 xprt->old_data_ready = sk->sk_data_ready;
934 xprt->old_state_change = sk->sk_state_change;
935 xprt->old_write_space = sk->sk_write_space;
936 sk->sk_data_ready = xs_tcp_data_ready;
937 sk->sk_state_change = xs_tcp_state_change;
938 sk->sk_write_space = xs_tcp_write_space;
939 tcp_sk(sk)->nonagle = 1;
940
941 xprt_clear_connected(xprt);
942
943 /* Reset to new socket */
944 xprt->sock = sock;
945 xprt->inet = sk;
946
947 write_unlock_bh(&sk->sk_callback_lock);
948 }
949
950 /* Tell the socket layer to start connecting... */
935 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, 951 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
936 sizeof(xprt->addr), O_NONBLOCK); 952 sizeof(xprt->addr), O_NONBLOCK);
937 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 953 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
@@ -959,18 +975,20 @@ static void xs_connect(struct rpc_task *task)
959{ 975{
960 struct rpc_xprt *xprt = task->tk_xprt; 976 struct rpc_xprt *xprt = task->tk_xprt;
961 977
962 if (!xprt_test_and_set_connecting(xprt)) { 978 if (xprt_test_and_set_connecting(xprt))
963 if (xprt->sock != NULL) { 979 return;
964 dprintk("RPC: xs_connect delayed xprt %p\n", xprt); 980
965 schedule_delayed_work(&xprt->connect_worker, 981 if (xprt->sock != NULL) {
982 dprintk("RPC: xs_connect delayed xprt %p\n", xprt);
983 schedule_delayed_work(&xprt->connect_worker,
966 RPC_REESTABLISH_TIMEOUT); 984 RPC_REESTABLISH_TIMEOUT);
967 } else { 985 } else {
968 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 986 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
969 schedule_work(&xprt->connect_worker); 987 schedule_work(&xprt->connect_worker);
970 /* flush_scheduled_work can sleep... */ 988
971 if (!RPC_IS_ASYNC(task)) 989 /* flush_scheduled_work can sleep... */
972 flush_scheduled_work(); 990 if (!RPC_IS_ASYNC(task))
973 } 991 flush_scheduled_work();
974 } 992 }
975} 993}
976 994
@@ -1013,7 +1031,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1013 /* XXX: header size can vary due to auth type, IPv6, etc. */ 1031 /* XXX: header size can vary due to auth type, IPv6, etc. */
1014 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 1032 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1015 1033
1016 INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt); 1034 INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
1017 1035
1018 xprt->ops = &xs_ops; 1036 xprt->ops = &xs_ops;
1019 1037
@@ -1052,7 +1070,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1052 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 1070 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1053 xprt->max_payload = (1U << 31) - 1; 1071 xprt->max_payload = (1U << 31) - 1;
1054 1072
1055 INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt); 1073 INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
1056 1074
1057 xprt->ops = &xs_ops; 1075 xprt->ops = &xs_ops;
1058 1076