Skip to content

Commit

Permalink
fix: make agent reconnect on server shutdown (#3864)
Browse files Browse the repository at this point in the history
* fix: make agent reconnect on server shutdown

* fix tests
  • Loading branch information
mathnogueira authored May 22, 2024
1 parent 86d4bc3 commit c99062d
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 15 deletions.
31 changes: 26 additions & 5 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,22 @@ func (c *Client) reconnect() error {
return c.Start(context.Background())
}

func (c *Client) handleDisconnectionError(inputErr error) (bool, error) {
if !isConnectionError(inputErr) {
// if it's nil or any error other than the one we care about, return it and let the caller handle it
type request interface {
String() string
}

func (c *Client) handleDisconnectionError(inputErr error, req request) (bool, error) {
if !isConnectionError(inputErr, req) {
// if any error other than the one we care about, return it and let the caller handle it
return false, inputErr
}

log.Printf("Reconnecting agent due to error: %s", inputErr.Error())
errMsg := "stream was closed by the server"
if inputErr != nil {
errMsg = inputErr.Error()
}

log.Printf("Reconnecting agent due to error: %s", errMsg)
err := retry.Do(func() error {
return c.reconnect()
})
Expand All @@ -241,8 +250,20 @@ func (c *Client) handleDisconnectionError(inputErr error) (bool, error) {
return true, nil
}

func isConnectionError(err error) bool {
func isConnectionError(err error, req request) bool {
if err == nil && req == nil {
return false
}

if err == nil && req.String() == "" {
// If `err` is nil and the request is empty, it means that `stream.RecvMsg` returned without a message.
// This is a very good indicative that the stream got closed by the server (probably the pod was killed)
// So, in this case, we force the client to reconnect to the server.
return true
}

if err == nil {
// if error is nil, but the request it not empty, it means that `stream.RecvMsg` ran successfully
return false
}

Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_ds_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Client) startDataStoreConnectionTestListener(ctx context.Context) error
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to data store connection stream")
return
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_otlp_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Client) startOTLPConnectionTestListener(ctx context.Context) error {
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to otlp connection stream")
return
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_poll_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c *Client) startPollerListener(ctx context.Context) error {
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to poller stream")
return
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_stop_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Client) startStopListener(ctx context.Context) error {
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to stop stream")
return
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_listen_for_trigger_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Client) startTriggerListener(ctx context.Context) error {
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to stop stream")
return
Expand Down
2 changes: 1 addition & 1 deletion agent/client/workflow_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *Client) startHeartBeat(ctx context.Context) error {
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, nil)
if reconnected {
log.Println("reconnected to ping stream")
return
Expand Down
8 changes: 4 additions & 4 deletions agent/client/workflow_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (c *Client) startShutdownListener(ctx context.Context) error {

go func() {
for {
resp := proto.ShutdownRequest{}
err := stream.RecvMsg(&resp)
req := proto.ShutdownRequest{}
err := stream.RecvMsg(&req)

if err != nil {
logger.Error("could not get message from shutdown stream", zap.Error(err))
Expand All @@ -36,7 +36,7 @@ func (c *Client) startShutdownListener(ctx context.Context) error {
return
}

reconnected, err := c.handleDisconnectionError(err)
reconnected, err := c.handleDisconnectionError(err, &req)
if reconnected {
logger.Warn("reconnected to shutdown stream")
return
Expand All @@ -50,7 +50,7 @@ func (c *Client) startShutdownListener(ctx context.Context) error {
}

// TODO: get context from request
err = c.shutdownListener(context.Background(), &resp)
err = c.shutdownListener(context.Background(), &req)
if err != nil {
logger.Error("could not handle shutdown request", zap.Error(err))
fmt.Println(err.Error())
Expand Down

0 comments on commit c99062d

Please sign in to comment.