Skip to content

Commit

Permalink
Fixed shutdown and reconnection logic for dmsg.Client.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed Jun 13, 2019
1 parent caa31bd commit c63a547
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 32 deletions.
66 changes: 35 additions & 31 deletions pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,19 @@ func (c *ClientConn) handleRequestFrame(ctx context.Context, accept chan<- *Tran
// Serve handles incoming frames.
// Remote-initiated tps that are successfully created are pushing into 'accept' and exposed via 'Client.Accept()'.
func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err error) {
defer c.wg.Done()

log := c.log.WithField("remoteServer", c.remoteSrv)

log.WithField("connCount", incrementServeCount()).
Infoln("ServingConn")
log.WithField("connCount", incrementServeCount()).Infoln("ServingConn")
defer func() {
log.WithError(err).
WithField("connCount", decrementServeCount()).
Infoln("ClosingConn")
log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ClosingConn")
c.mx.Lock()
for _, tp := range c.tps {
if tp != nil {
go tp.Close() //nolint:errcheck
}
}
_ = c.Conn.Close() //nolint:errcheck
c.mx.Unlock()
c.wg.Done()
}()

closeConn := func(log *logrus.Entry) {
Expand Down Expand Up @@ -247,18 +250,17 @@ func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey)

// Close closes the connection to dms_server.
func (c *ClientConn) Close() error {
c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection")
c.once.Do(func() { close(c.done) })
c.mx.Lock()
for _, tp := range c.tps {
if tp != nil {
_ = tp.Close()
}
}
err := c.Conn.Close()
c.mx.Unlock()
closed := false
c.once.Do(func() {
closed = true
c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection")
close(c.done)
})
c.wg.Wait()
return err
if !closed {
return ErrClientClosed
}
return nil
}

// Client implements transport.Factory
Expand Down Expand Up @@ -428,20 +430,22 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey)
conn := NewClientConn(c.log, nc, c.pk, srvPK)
c.setConn(ctx, conn)
go func() {
if err := conn.Serve(ctx, c.accept); err != nil {
conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("connected with server closed")
c.delConn(ctx, srvPK)
err := conn.Serve(ctx, c.accept)
conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("connected with server closed")
c.delConn(ctx, srvPK)

// reconnect logic.
t := time.NewTimer(time.Second * 2)
select {
case <-c.done:
case <-ctx.Done():
case <-t.C:
conn.log.WithField("remoteServer", srvPK).Warn("Reconnecting")
_, _ = c.findOrConnectToServer(ctx, srvPK) //nolint:errcheck
// reconnect logic.
retryServerConnect:
select {
case <-c.done:
case <-ctx.Done():
case <-time.After(time.Second * 3):
conn.log.WithField("remoteServer", srvPK).Warn("Reconnecting")
if _, err := c.findOrConnectToServer(ctx, srvPK); err != nil {
conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("ReconnectionFailed")
goto retryServerConnect
}
return
conn.log.WithField("remoteServer", srvPK).Warn("ReconnectionSucceeded")
}
}()
return conn, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/dmsg/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Note that even though `messaging-discovery` is also considered to be an entity o
- Then:
- It should work as expected still.

#### Handling `msg.Server` Failures

**`reconnect_to_server_should_succeed`**

- Given:
Expand All @@ -47,4 +49,3 @@ Note that even though `messaging-discovery` is also considered to be an entity o

We should test the robustness of the system under different conditions and random order of events. These tests should be written consisiting of x-number of servers, clients and a single discovery.

TODO.

0 comments on commit c63a547

Please sign in to comment.