Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add new client-side attribute 'maxReconnectAttempts' #126

Merged
merged 4 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"math"
"net"
"os"
"strings"
Expand All @@ -45,7 +46,8 @@ const (
defaultReconnectInterval = 3e8 // 300ms
connectInterval = 5e8 // 500ms
connectTimeout = 3e9
defaultMaxReconnectAttempts = 10
defaultMaxReconnectAttempts = 50
maxBackOffTimes = 10
)

var (
Expand Down Expand Up @@ -208,14 +210,18 @@ func (c *client) dialUDP() Session {
}

// check connection alive by write/read action
conn.SetWriteDeadline(time.Now().Add(1e9))
if err := conn.SetWriteDeadline(time.Now().Add(1e9)); err != nil {
log.Warnf("failed to set write deadline: %+v", err)
}
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err))
<-gxtime.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
if err := conn.SetReadDeadline(time.Now().Add(1e9)); err != nil {
log.Warnf("failed to set read deadline: %+v", err)
}
length, err = conn.Read(buf)
if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() {
err = nil
Expand Down Expand Up @@ -424,32 +430,32 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
// a for-loop connect to make sure the connection pool is valid
func (c *client) reConnect() {
var (
sessionNum, maxReconnectAttempts, reconnectAttempts, reconnectInterval int
maxReconnectInterval int64
sessionNum, reconnectAttempts int
maxReconnectInterval int64
)
maxReconnectAttempts = c.number
reconnectInterval = c.reconnectInterval
reconnectInterval := c.reconnectInterval
if reconnectInterval == 0 {
reconnectInterval = defaultReconnectInterval
}
maxReconnectAttempts := c.maxReconnectAttempts
if maxReconnectAttempts == 0 {
maxReconnectAttempts = defaultMaxReconnectAttempts
}
connPoolSize := c.number
for {
if c.IsClosed() {
log.Warnf("client{peer:%s} goroutine exit now.", c.addr)
break
}

sessionNum = c.sessionNum()
if maxReconnectAttempts <= sessionNum || maxReconnectAttempts < reconnectAttempts {
if connPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts {
//exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts.
break
}
c.connect()
reconnectAttempts++
if reconnectAttempts > defaultMaxReconnectAttempts {
maxReconnectInterval = int64(defaultMaxReconnectAttempts) * int64(reconnectInterval)
} else {
maxReconnectInterval = int64(reconnectAttempts) * int64(reconnectInterval)
}
maxReconnectInterval = int64(math.Min(float64(reconnectAttempts), float64(maxBackOffTimes))) * int64(reconnectInterval)
<-gxtime.After(time.Duration(maxReconnectInterval))
}
}
Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestTCPClient(t *testing.T) {
WithServerAddress(addr.String()),
WithReconnectInterval(5e8),
WithConnectionNumber(1),
WithReconnectAttempts(10),
)
assert.NotNil(t, clt)
assert.True(t, clt.ID() > 0)
Expand Down
33 changes: 17 additions & 16 deletions examples/echo/tcp-echo/client/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,23 @@ var conf *Config

type (
GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"`
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
tcpWriteTimeout time.Duration
WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
waitTimeout time.Duration
MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"`
TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_reconnect_attempts" json:"tcp_max_reconnect_attempts,omitempty"`
}

// Config holds supported types by the multiconfig package
Expand Down
2 changes: 1 addition & 1 deletion examples/echo/tcp-echo/client/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newSession(session getty.Session) error {
func initClient() {
clientOpts := []getty.ClientOption{getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort))}
clientOpts = append(clientOpts, getty.WithClientTaskPool(taskPool))

clientOpts = append(clientOpts, getty.WithReconnectAttempts(conf.GettySessionParam.TcpMaxReconnectAttempts))
if conf.ConnectionNum != 0 {
clientOpts = append(clientOpts, getty.WithConnectionNumber(conf.ConnectionNum))
}
Expand Down
17 changes: 13 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func WithServerTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ServerOption
type ClientOption func(*ClientOptions)

type ClientOptions struct {
addr string
number int
reconnectInterval int // reConnect Interval

addr string
number int
reconnectInterval int // reConnect Interval
maxReconnectAttempts int // max reconnect attempts
// tls
sslEnabled bool
tlsConfigBuilder TlsConfigBuilder
Expand Down Expand Up @@ -168,3 +168,12 @@ func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption
o.tlsConfigBuilder = tlsConfigBuilder
}
}

// WithReconnectAttempts @maxReconnectAttempts is max reconnect attempts.
func WithReconnectAttempts(maxReconnectAttempts int) ClientOption {
return func(o *ClientOptions) {
if 0 < maxReconnectAttempts {
o.maxReconnectAttempts = maxReconnectAttempts
}
}
}
8 changes: 6 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,12 @@ func (s *session) stop() {
// let read/Write timeout asap
now := time.Now()
if conn := s.Conn(); conn != nil {
conn.SetReadDeadline(now.Add(s.ReadTimeout()))
conn.SetWriteDeadline(now.Add(s.WriteTimeout()))
if err := conn.SetReadDeadline(now.Add(s.ReadTimeout())); err != nil {
log.Warnf("failed to set read deadline: %+v", err)
}
if err := conn.SetWriteDeadline(now.Add(s.WriteTimeout())); err != nil {
log.Warnf("failed to set write deadline: %+v", err)
}
}
close(s.done)
clt, cltFound := s.GetAttribute(sessionClientKey).(*client)
Expand Down
Loading