Skip to content

Commit

Permalink
Merge pull request #117 from No-SilverBullet/fix/reconnect
Browse files Browse the repository at this point in the history
fix issue116: limit the reconnect times or  duration
  • Loading branch information
AlexStocks authored Apr 25, 2024
2 parents ce6736e + b33c1e6 commit 6a6e1d1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
22 changes: 15 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ var (
sessionClientKey = "session-client-owner"
connectPingPackage = []byte("connect-ping")

clientID = EndPointID(0)
clientID = EndPointID(0)
ignoreReconnectKey = "ignore-reconnect"
)

type Client interface {
Expand Down Expand Up @@ -397,6 +398,7 @@ func (c *client) connect() {
c.ssMap[ss] = struct{}{}
c.Unlock()
ss.SetAttribute(sessionClientKey, c)
ss.SetAttribute(ignoreReconnectKey, false)
break
}
// don't distinguish between tcp connection and websocket connection. Because
Expand All @@ -421,8 +423,10 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {

// a for-loop connect to make sure the connection pool is valid
func (c *client) reConnect() {
var num, max, times, interval int

var (
num, max, times, interval int
maxDuration int64
)
max = c.number
interval = c.reconnectInterval
if interval == 0 {
Expand All @@ -435,15 +439,18 @@ func (c *client) reConnect() {
}

num = c.sessionNum()
if max <= num {
if max <= num || max < times {
//Exit when the number of connection pools is sufficient or the reconnection times exceeds the connections numbers.
break
}
c.connect()
times++
if maxTimes < times {
times = maxTimes
if times > maxTimes {
maxDuration = int64(maxTimes) * int64(interval)
} else {
maxDuration = int64(times) * int64(interval)
}
<-gxtime.After(time.Duration(int64(times) * int64(interval)))
<-gxtime.After(time.Duration(maxDuration))
}
}

Expand All @@ -457,6 +464,7 @@ func (c *client) stop() {
c.Lock()
for s := range c.ssMap {
s.RemoveAttribute(sessionClientKey)
s.RemoveAttribute(ignoreReconnectKey)
s.Close()
}
c.ssMap = nil
Expand Down
7 changes: 5 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ func (s *session) handleTCPPackage() error {
}
if perrors.Cause(err) == io.EOF {
log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
//when read EOF, means that the peer has closed the connection, stop to reconnect to maintain the connection pool.
s.SetAttribute(ignoreReconnectKey, true)
err = nil
exit = true
if bufLen != 0 {
Expand Down Expand Up @@ -858,8 +860,9 @@ func (s *session) stop() {
conn.SetWriteDeadline(now.Add(s.WriteTimeout()))
}
close(s.done)
c := s.GetAttribute(sessionClientKey)
if clt, ok := c.(*client); ok {
clt, cltFound := s.GetAttribute(sessionClientKey).(*client)
ignoreReconnect, flagFound := s.GetAttribute(ignoreReconnectKey).(bool)
if cltFound && flagFound && !ignoreReconnect {
clt.reConnect()
}
})
Expand Down

0 comments on commit 6a6e1d1

Please sign in to comment.