diff --git a/client.go b/client.go index 13667df..e3c22c8 100644 --- a/client.go +++ b/client.go @@ -22,20 +22,21 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "math" "net" "os" "strings" "sync" "sync/atomic" "time" -) -import ( - "github.com/dubbogo/gost/bytes" - "github.com/dubbogo/gost/net" + gxbytes "github.com/dubbogo/gost/bytes" + + gxnet "github.com/dubbogo/gost/net" + gxsync "github.com/dubbogo/gost/sync" - gxtime "github.com/dubbogo/gost/time" + gxtime "github.com/dubbogo/gost/time" "github.com/gorilla/websocket" perrors "github.com/pkg/errors" @@ -45,7 +46,8 @@ const ( defaultReconnectInterval = 3e8 // 300ms connectInterval = 5e8 // 500ms connectTimeout = 3e9 - defaultMaxReconnectAttempts = 10 + defaultMaxReconnectAttempts = 50 + maxBackOffTimes = 10 ) var ( @@ -208,14 +210,18 @@ func (c *client) dialUDP() Session { } // check connection alive by write/read action - conn.SetWriteDeadline(time.Now().Add(1e9)) + if err := conn.SetWriteDeadline(time.Now().Add(1e9)); err != nil { + log.Warnf("failed to set write deadline: %+v", err) + } if length, err = conn.Write(connectPingPackage[:]); err != nil { conn.Close() log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err)) <-gxtime.After(connectInterval) continue } - conn.SetReadDeadline(time.Now().Add(1e9)) + if err := conn.SetReadDeadline(time.Now().Add(1e9)); err != nil { + log.Warnf("failed to set read deadline: %+v", err) + } length, err = conn.Read(buf) if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() { err = nil @@ -424,14 +430,18 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { // a for-loop connect to make sure the connection pool is valid func (c *client) reConnect() { var ( - sessionNum, maxReconnectAttempts, reconnectAttempts, reconnectInterval int - maxReconnectInterval int64 + sessionNum, reconnectAttempts int + maxReconnectInterval int64 ) - maxReconnectAttempts = c.number - reconnectInterval = c.reconnectInterval + reconnectInterval := c.reconnectInterval if reconnectInterval == 0 { reconnectInterval = defaultReconnectInterval } + maxReconnectAttempts := c.maxReconnectAttempts + if maxReconnectAttempts == 0 { + maxReconnectAttempts = defaultMaxReconnectAttempts + } + connPoolSize := c.number for { if c.IsClosed() { log.Warnf("client{peer:%s} goroutine exit now.", c.addr) @@ -439,17 +449,13 @@ func (c *client) reConnect() { } sessionNum = c.sessionNum() - if maxReconnectAttempts <= sessionNum || maxReconnectAttempts < reconnectAttempts { + if connPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts { //exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts. break } c.connect() reconnectAttempts++ - if reconnectAttempts > defaultMaxReconnectAttempts { - maxReconnectInterval = int64(defaultMaxReconnectAttempts) * int64(reconnectInterval) - } else { - maxReconnectInterval = int64(reconnectAttempts) * int64(reconnectInterval) - } + maxReconnectInterval = int64(math.Min(float64(reconnectAttempts), float64(maxBackOffTimes))) * int64(reconnectInterval) <-gxtime.After(time.Duration(maxReconnectInterval)) } } diff --git a/client_test.go b/client_test.go index 779d5fe..0cbb429 100644 --- a/client_test.go +++ b/client_test.go @@ -115,6 +115,7 @@ func TestTCPClient(t *testing.T) { WithServerAddress(addr.String()), WithReconnectInterval(5e8), WithConnectionNumber(1), + WithReconnectAttempts(10), ) assert.NotNil(t, clt) assert.True(t, clt.ID() > 0) diff --git a/examples/echo/tcp-echo/client/app/config.go b/examples/echo/tcp-echo/client/app/config.go index a8d7ff9..c8c50ba 100644 --- a/examples/echo/tcp-echo/client/app/config.go +++ b/examples/echo/tcp-echo/client/app/config.go @@ -38,22 +38,23 @@ var conf *Config type ( GettySessionParam struct { - CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` - TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` - TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` - KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` - keepAlivePeriod time.Duration - TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` - TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` - PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` - TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` - tcpReadTimeout time.Duration - TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` - tcpWriteTimeout time.Duration - WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` - waitTimeout time.Duration - MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` - SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` + CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` + TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` + TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` + KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` + keepAlivePeriod time.Duration + TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` + TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` + PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` + TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` + tcpReadTimeout time.Duration + TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` + tcpWriteTimeout time.Duration + WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` + waitTimeout time.Duration + MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` + SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` + TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_reconnect_attempts" json:"tcp_max_reconnect_attempts,omitempty"` } // Config holds supported types by the multiconfig package diff --git a/examples/echo/tcp-echo/client/app/main.go b/examples/echo/tcp-echo/client/app/main.go index 95a78be..79ee82c 100644 --- a/examples/echo/tcp-echo/client/app/main.go +++ b/examples/echo/tcp-echo/client/app/main.go @@ -117,7 +117,7 @@ func newSession(session getty.Session) error { func initClient() { clientOpts := []getty.ClientOption{getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort))} clientOpts = append(clientOpts, getty.WithClientTaskPool(taskPool)) - + clientOpts = append(clientOpts, getty.WithReconnectAttempts(conf.GettySessionParam.TcpMaxReconnectAttempts)) if conf.ConnectionNum != 0 { clientOpts = append(clientOpts, getty.WithConnectionNumber(conf.ConnectionNum)) } diff --git a/options.go b/options.go index d376e0c..f42b693 100644 --- a/options.go +++ b/options.go @@ -100,10 +100,10 @@ func WithServerTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ServerOption type ClientOption func(*ClientOptions) type ClientOptions struct { - addr string - number int - reconnectInterval int // reConnect Interval - + addr string + number int + reconnectInterval int // reConnect Interval + maxReconnectAttempts int // max reconnect attempts // tls sslEnabled bool tlsConfigBuilder TlsConfigBuilder @@ -168,3 +168,12 @@ func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption o.tlsConfigBuilder = tlsConfigBuilder } } + +// WithReconnectAttempts @maxReconnectAttempts is max reconnect attempts. +func WithReconnectAttempts(maxReconnectAttempts int) ClientOption { + return func(o *ClientOptions) { + if 0 < maxReconnectAttempts { + o.maxReconnectAttempts = maxReconnectAttempts + } + } +} diff --git a/session.go b/session.go index 7e704b5..d3405ef 100644 --- a/session.go +++ b/session.go @@ -27,13 +27,12 @@ import ( "runtime" "sync" "time" -) -import ( gxbytes "github.com/dubbogo/gost/bytes" + gxcontext "github.com/dubbogo/gost/context" - gxtime "github.com/dubbogo/gost/time" + gxtime "github.com/dubbogo/gost/time" "github.com/gorilla/websocket" perrors "github.com/pkg/errors" @@ -856,8 +855,12 @@ func (s *session) stop() { // let read/Write timeout asap now := time.Now() if conn := s.Conn(); conn != nil { - conn.SetReadDeadline(now.Add(s.ReadTimeout())) - conn.SetWriteDeadline(now.Add(s.WriteTimeout())) + if err := conn.SetReadDeadline(now.Add(s.ReadTimeout())); err != nil { + log.Warnf("failed to set read deadline: %+v", err) + } + if err := conn.SetWriteDeadline(now.Add(s.WriteTimeout())); err != nil { + log.Warnf("failed to set write deadline: %+v", err) + } } close(s.done) clt, cltFound := s.GetAttribute(sessionClientKey).(*client)