diff --git a/agent/auto-config/auto_config.go b/agent/auto-config/auto_config.go index bc0d3f54dc37..aef01002e44b 100644 --- a/agent/auto-config/auto_config.go +++ b/agent/auto-config/auto_config.go @@ -298,7 +298,7 @@ func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context, csr strin } ac.logger.Debug("making AutoConfig.InitialConfiguration RPC", "addr", addr.String()) - if err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &resp, time.Time{}); err != nil { + if err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &resp, 0); err != nil { ac.logger.Error("AutoConfig.InitialConfiguration RPC failed", "addr", addr.String(), "error", err) continue } diff --git a/agent/auto-config/auto_config_test.go b/agent/auto-config/auto_config_test.go index f906c142fda2..4e02cc875f6e 100644 --- a/agent/auto-config/auto_config_test.go +++ b/agent/auto-config/auto_config_test.go @@ -265,7 +265,7 @@ func TestInitialConfiguration_cancelled(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, mock.Anything, - time.Time{}).Return(fmt.Errorf("injected error")).Times(0).Maybe() + time.Duration(0)).Return(fmt.Errorf("injected error")).Times(0).Maybe() mcfg.serverProvider.On("FindLANServer").Return(nil).Times(0).Maybe() ac, err := New(mcfg.Config) @@ -395,7 +395,7 @@ func TestInitialConfiguration_success(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(nil).Run(populateResponse) + time.Duration(0)).Return(nil).Run(populateResponse) ac, err := New(mcfg.Config) require.NoError(t, err) @@ -481,7 +481,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(fmt.Errorf("injected failure")).Times(0) + time.Duration(0)).Return(fmt.Errorf("injected failure")).Times(0) mcfg.directRPC.On( "RPC", "dc1", @@ -490,7 +490,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(fmt.Errorf("injected failure")).Times(0) + time.Duration(0)).Return(fmt.Errorf("injected failure")).Times(0) mcfg.directRPC.On( "RPC", "dc1", @@ -499,7 +499,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(fmt.Errorf("injected failure")).Times(0) + time.Duration(0)).Return(fmt.Errorf("injected failure")).Times(0) mcfg.directRPC.On( "RPC", "dc1", @@ -508,7 +508,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(fmt.Errorf("injected failure")).Once() + time.Duration(0)).Return(fmt.Errorf("injected failure")).Once() mcfg.directRPC.On( "RPC", "dc1", @@ -517,7 +517,7 @@ func TestInitialConfiguration_retries(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(nil).Run(populateResponse).Once() + time.Duration(0)).Return(nil).Run(populateResponse).Once() ac, err := New(mcfg.Config) require.NoError(t, err) @@ -804,7 +804,7 @@ func startedAutoConfig(t *testing.T, autoEncrypt bool) testAutoConfig { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(nil).Run(populateResponse).Once() + time.Duration(0)).Return(nil).Run(populateResponse).Once() } else { expectedRequest := structs.CASignRequest{ WriteRequest: structs.WriteRequest{Token: originalToken}, @@ -825,7 +825,7 @@ func startedAutoConfig(t *testing.T, autoEncrypt bool) testAutoConfig { "AutoEncrypt.Sign", &expectedRequest, &structs.SignedResponse{}, - time.Time{}).Return(nil).Run(populateResponse) + time.Duration(0)).Return(nil).Run(populateResponse) } ac, err := New(mcfg.Config) @@ -1100,7 +1100,7 @@ func TestFallback(t *testing.T) { "AutoConfig.InitialConfiguration", &expectedRequest, &pbautoconf.AutoConfigResponse{}, - time.Time{}).Return(nil).Run(populateResponse).Once() + time.Duration(0)).Return(nil).Run(populateResponse).Once() // this gets called when InitialConfiguration is invoked to record the token from the // auto-config response which is how the Fallback for auto-config works diff --git a/agent/auto-config/auto_encrypt.go b/agent/auto-config/auto_encrypt.go index 3eb7b8a3501e..510565076f8d 100644 --- a/agent/auto-config/auto_encrypt.go +++ b/agent/auto-config/auto_encrypt.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "strings" - "time" "github.com/hashicorp/consul/agent/structs" ) @@ -57,7 +56,7 @@ func (ac *AutoConfig) autoEncryptInitialCertsOnce(ctx context.Context, csr, key } ac.logger.Debug("making AutoEncrypt.Sign RPC", "addr", addr.String()) - err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoEncrypt.Sign", &request, &resp, time.Time{}) + err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoEncrypt.Sign", &request, &resp, 0) if err != nil { ac.logger.Error("AutoEncrypt.Sign RPC failed", "addr", addr.String(), "error", err) continue diff --git a/agent/auto-config/auto_encrypt_test.go b/agent/auto-config/auto_encrypt_test.go index 6bef134a4c3f..0bdc4f560e78 100644 --- a/agent/auto-config/auto_encrypt_test.go +++ b/agent/auto-config/auto_encrypt_test.go @@ -203,7 +203,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) { "AutoEncrypt.Sign", &request, &structs.SignedResponse{}, - time.Time{}, + time.Duration(0), ).Once().Return(fmt.Errorf("injected error")) // second failure mcfg.directRPC.On("RPC", @@ -213,7 +213,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) { "AutoEncrypt.Sign", &request, &structs.SignedResponse{}, - time.Time{}, + time.Duration(0), ).Once().Return(fmt.Errorf("injected error")) // third times is successfuly (second attempt to first server) mcfg.directRPC.On("RPC", @@ -223,7 +223,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) { "AutoEncrypt.Sign", &request, &structs.SignedResponse{}, - time.Time{}, + time.Duration(0), ).Once().Return(nil).Run(func(args mock.Arguments) { resp, ok := args.Get(5).(*structs.SignedResponse) require.True(t, ok) @@ -312,7 +312,7 @@ func TestAutoEncrypt_InitialConfiguration(t *testing.T) { "AutoEncrypt.Sign", &expectedRequest, &structs.SignedResponse{}, - time.Time{}).Return(nil).Run(populateResponse) + time.Duration(0)).Return(nil).Run(populateResponse) ac, err := New(mcfg.Config) require.NoError(t, err) @@ -522,7 +522,7 @@ func TestAutoEncrypt_Fallback(t *testing.T) { "AutoEncrypt.Sign", &expectedRequest, &structs.SignedResponse{}, - time.Time{}).Return(nil).Run(populateResponse).Once() + time.Duration(0)).Return(nil).Run(populateResponse).Once() testAC.mcfg.expectInitialTLS(t, "autoconf", "dc1", testAC.originalToken, secondCA, &secondRoots, thirdCert, testAC.extraCerts) diff --git a/agent/auto-config/config.go b/agent/auto-config/config.go index 7c4c2d4f37d9..3638ed47a6c0 100644 --- a/agent/auto-config/config.go +++ b/agent/auto-config/config.go @@ -20,7 +20,7 @@ import ( // the agent gets configured, it can go through the normal RPC means of selecting a available // server automatically. type DirectRPC interface { - RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}, deadline time.Time) error + RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}, timeout time.Duration) error } // Cache is an interface to represent the methods of the diff --git a/agent/auto-config/mock_test.go b/agent/auto-config/mock_test.go index 1b1b3380391d..89123b9ab279 100644 --- a/agent/auto-config/mock_test.go +++ b/agent/auto-config/mock_test.go @@ -30,22 +30,22 @@ func newMockDirectRPC(t *testing.T) *mockDirectRPC { return &m } -func (m *mockDirectRPC) RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}, deadline time.Time) error { +func (m *mockDirectRPC) RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}, timeout time.Duration) error { var retValues mock.Arguments if method == "AutoConfig.InitialConfiguration" { req := args.(*pbautoconf.AutoConfigRequest) csr := req.CSR req.CSR = "" - retValues = m.Called(dc, node, addr, method, args, reply, deadline) + retValues = m.Called(dc, node, addr, method, args, reply, timeout) req.CSR = csr } else if method == "AutoEncrypt.Sign" { req := args.(*structs.CASignRequest) csr := req.CSR req.CSR = "" - retValues = m.Called(dc, node, addr, method, args, reply, deadline) + retValues = m.Called(dc, node, addr, method, args, reply, timeout) req.CSR = csr } else { - retValues = m.Called(dc, node, addr, method, args, reply, deadline) + retValues = m.Called(dc, node, addr, method, args, reply, timeout) } return retValues.Error(0) diff --git a/agent/consul/client.go b/agent/consul/client.go index 21c884a0be83..b55ad47e59b6 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -270,15 +270,13 @@ TRY: // Use the zero value for RPCInfo if the request doesn't implement RPCInfo info, _ := args.(structs.RPCInfo) - var deadline time.Time + timeout := time.Duration(0) if info != nil { - deadline = time.Now().Add(info.Timeout(c.config.RPCHoldTimeout, c.config.MaxQueryTime, c.config.DefaultQueryTime)) - } else { - deadline = time.Time{} + timeout = info.Timeout(c.config.RPCHoldTimeout, c.config.MaxQueryTime, c.config.DefaultQueryTime) } // Make the request. - rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply, deadline) + rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply, timeout) if rpcErr == nil { return nil } diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 93834b0e8b55..9a6c1d9cb8af 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -531,6 +531,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { MaxStreams: 4, TLSConfigurator: tls, Datacenter: c.Datacenter, + Timeout: c.RPCHoldTimeout, } return Deps{ diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 30e1f066eb48..0621e2a34348 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -569,7 +569,7 @@ func (s *Server) ForwardRPC(method string, info structs.RPCInfo, reply interface } forwardToLeader := func(leader *metadata.Server) error { return s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr, - method, info, reply, time.Time{}) + method, info, reply, 0) } return s.forwardRPC(info, forwardToDC, forwardToLeader) } @@ -765,7 +765,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1, []metrics.Label{{Name: "datacenter", Value: dc}}) - if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply, time.Time{}); err != nil { + if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply, 0); err != nil { manager.NotifyFailedServer(server) s.rpcLogger().Error("RPC failed to server in DC", "server", server.Addr, diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index e540f3f55393..1ee14eff8364 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -375,7 +375,7 @@ func (s *Server) maybeBootstrap() { // Retry with exponential backoff to get peer status from this server for attempt := uint(0); attempt < maxPeerRetries; attempt++ { if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, - "Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers, time.Time{}); err != nil { + "Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers, 0); err != nil { nextRetry := (1 << attempt) * time.Second s.logger.Error("Failed to confirm peer status for server (will retry).", "server", server.Name, diff --git a/agent/consul/stats_fetcher.go b/agent/consul/stats_fetcher.go index 0a8aad11fa05..56bdf34a126c 100644 --- a/agent/consul/stats_fetcher.go +++ b/agent/consul/stats_fetcher.go @@ -4,7 +4,6 @@ import ( "context" "net" "sync" - "time" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" @@ -62,7 +61,7 @@ func (f *StatsFetcher) fetch(server *autopilot.Server, replyCh chan *autopilot.S return } - err = f.pool.RPC(f.datacenter, server.Name, addr, "Status.RaftStats", &args, &reply, time.Time{}) + err = f.pool.RPC(f.datacenter, server.Name, addr, "Status.RaftStats", &args, &reply, 0) if err != nil { f.logger.Warn("error getting server health from server", "server", server.Name, diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 2b354719f860..7fa15da3a91b 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -30,7 +30,7 @@ type muxSession interface { // streamClient is used to wrap a stream with an RPC client type StreamClient struct { - stream net.Conn + stream *TimeoutConn codec rpc.ClientCodec } @@ -55,6 +55,47 @@ type Conn struct { clientLock sync.Mutex } +// TimeoutConn wraps net.Conn with a default timeout. +// readTimeout and writeTimeout apply to the subsequent operation. +type TimeoutConn struct { + net.Conn + DefaultTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (c *TimeoutConn) Read(b []byte) (int, error) { + timeout := c.ReadTimeout + c.ReadTimeout = 0 + if timeout == 0 { + timeout = c.DefaultTimeout + } + deadline := time.Time{} + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + if err := c.Conn.SetReadDeadline(deadline); err != nil { + return 0, err + } + return c.Conn.Read(b) +} + +func (c *TimeoutConn) Write(b []byte) (int, error) { + timeout := c.WriteTimeout + c.WriteTimeout = 0 + if timeout == 0 { + timeout = c.DefaultTimeout + } + deadline := time.Time{} + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + if err := c.Conn.SetWriteDeadline(deadline); err != nil { + return 0, err + } + return c.Conn.Write(b) +} + func (c *Conn) Close() error { return c.session.Close() } @@ -78,12 +119,14 @@ func (c *Conn) getClient() (*StreamClient, error) { return nil, err } + timeoutStream := &TimeoutConn{stream, c.pool.Timeout, 0, 0} + // Create the RPC client - codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle) + codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle) // Return a new stream client sc := &StreamClient{ - stream: stream, + stream: timeoutStream, codec: codec, } return sc, nil @@ -100,7 +143,7 @@ func (c *Conn) returnClient(client *StreamClient) { // If this is a Yamux stream, shrink the internal buffers so that // we can GC the idle memory - if ys, ok := client.stream.(*yamux.Stream); ok { + if ys, ok := client.stream.Conn.(*yamux.Stream); ok { ys.Shrink() } } @@ -132,6 +175,9 @@ type ConnPool struct { // TODO: consider refactoring to accept a full yamux.Config instead of a logger Logger *log.Logger + // The default timeout for stream reads/writes + Timeout time.Duration + // The maximum time to keep a connection open MaxTime time.Duration @@ -324,7 +370,7 @@ func (p *ConnPool) dial( tlsRPCType RPCType, ) (net.Conn, HalfCloser, error) { // Try to dial the conn - d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout} + d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout} conn, err := d.Dial("tcp", addr.String()) if err != nil { return nil, nil, err @@ -538,7 +584,7 @@ func (p *ConnPool) RPC( method string, args interface{}, reply interface{}, - deadline time.Time, + timeout time.Duration, ) error { if nodeName == "" { return fmt.Errorf("pool: ConnPool.RPC requires a node name") @@ -551,7 +597,7 @@ func (p *ConnPool) RPC( if method == "AutoEncrypt.Sign" || method == "AutoConfig.InitialConfiguration" { return p.rpcInsecure(dc, addr, method, args, reply) } else { - return p.rpc(dc, nodeName, addr, method, args, reply, deadline) + return p.rpc(dc, nodeName, addr, method, args, reply, timeout) } } @@ -581,7 +627,7 @@ func (p *ConnPool) rpcInsecure(dc string, addr net.Addr, method string, args int return nil } -func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}, deadline time.Time) error { +func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}, timeout time.Duration) error { p.once.Do(p.init) // Get a usable client @@ -590,10 +636,8 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, return fmt.Errorf("rpc error getting client: %w", err) } - if !deadline.IsZero() { - if err = sc.stream.SetDeadline(deadline); err != nil { - return fmt.Errorf("rpc error setting client deadline: %w", err) - } + if timeout > 0 { + sc.stream.ReadTimeout = timeout } // Make the RPC call @@ -612,11 +656,7 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, p.releaseConn(conn) return fmt.Errorf("rpc error making call: %w", err) } - if !deadline.IsZero() { - if err = sc.stream.SetDeadline(time.Time{}); err != nil { - return fmt.Errorf("rpc error resetting client deadline: %w", err) - } - } + // Done with the connection conn.returnClient(sc) p.releaseConn(conn) @@ -627,7 +667,7 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, // returns true if healthy, false if an error occurred func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) { var out struct{} - err := p.RPC(dc, nodeName, addr, "Status.Ping", struct{}{}, &out, time.Time{}) + err := p.RPC(dc, nodeName, addr, "Status.Ping", struct{}{}, &out, 0) return err == nil, err } diff --git a/agent/setup.go b/agent/setup.go index 82543e7fab8b..817420c1e8af 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -169,6 +169,7 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), TLSConfigurator: tls, Datacenter: config.Datacenter, + Timeout: config.RPCHoldTimeout, } if config.ServerMode { pool.MaxTime = 2 * time.Minute