diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index cd8f97cb5e..b70a967415 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -156,16 +156,19 @@ func (c *ClientConn) handleRequestFrame(ctx context.Context, accept chan<- *Tran // Serve handles incoming frames. // Remote-initiated tps that are successfully created are pushing into 'accept' and exposed via 'Client.Accept()'. func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err error) { - defer c.wg.Done() - log := c.log.WithField("remoteServer", c.remoteSrv) - - log.WithField("connCount", incrementServeCount()). - Infoln("ServingConn") + log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") defer func() { - log.WithError(err). - WithField("connCount", decrementServeCount()). - Infoln("ClosingConn") + log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ClosingConn") + c.mx.Lock() + for _, tp := range c.tps { + if tp != nil { + go tp.Close() //nolint:errcheck + } + } + _ = c.Conn.Close() //nolint:errcheck + c.mx.Unlock() + c.wg.Done() }() closeConn := func(log *logrus.Entry) { @@ -247,18 +250,17 @@ func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey) // Close closes the connection to dms_server. func (c *ClientConn) Close() error { - c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection") - c.once.Do(func() { close(c.done) }) - c.mx.Lock() - for _, tp := range c.tps { - if tp != nil { - _ = tp.Close() - } - } - err := c.Conn.Close() - c.mx.Unlock() + closed := false + c.once.Do(func() { + closed = true + c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection") + close(c.done) + }) c.wg.Wait() - return err + if !closed { + return ErrClientClosed + } + return nil } // Client implements transport.Factory @@ -428,20 +430,22 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey) conn := NewClientConn(c.log, nc, c.pk, srvPK) c.setConn(ctx, conn) go func() { - if err := conn.Serve(ctx, c.accept); err != nil { - conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("connected with server closed") - c.delConn(ctx, srvPK) + err := conn.Serve(ctx, c.accept) + conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("connected with server closed") + c.delConn(ctx, srvPK) - // reconnect logic. - t := time.NewTimer(time.Second * 2) - select { - case <-c.done: - case <-ctx.Done(): - case <-t.C: - conn.log.WithField("remoteServer", srvPK).Warn("Reconnecting") - _, _ = c.findOrConnectToServer(ctx, srvPK) //nolint:errcheck + // reconnect logic. + retryServerConnect: + select { + case <-c.done: + case <-ctx.Done(): + case <-time.After(time.Second * 3): + conn.log.WithField("remoteServer", srvPK).Warn("Reconnecting") + if _, err := c.findOrConnectToServer(ctx, srvPK); err != nil { + conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("ReconnectionFailed") + goto retryServerConnect } - return + conn.log.WithField("remoteServer", srvPK).Warn("ReconnectionSucceeded") } }() return conn, nil diff --git a/pkg/dmsg/testing.md b/pkg/dmsg/testing.md index 272b9c4888..58f0a6848c 100644 --- a/pkg/dmsg/testing.md +++ b/pkg/dmsg/testing.md @@ -33,6 +33,8 @@ Note that even though `messaging-discovery` is also considered to be an entity o - Then: - It should work as expected still. +#### Handling `msg.Server` Failures + **`reconnect_to_server_should_succeed`** - Given: @@ -47,4 +49,3 @@ Note that even though `messaging-discovery` is also considered to be an entity o We should test the robustness of the system under different conditions and random order of events. These tests should be written consisiting of x-number of servers, clients and a single discovery. -TODO. \ No newline at end of file