Skip to content

Commit

Permalink
fix(agent): Graceful Stop (#3219)
Browse files Browse the repository at this point in the history
* fix(agent): Graceful Stop

* fix(agent): Graceful Stop
  • Loading branch information
xoscar authored Oct 5, 2023
1 parent e034bce commit ecbbdcb
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 9 deletions.
6 changes: 6 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/kubeshop/tracetest/agent/proto"
Expand Down Expand Up @@ -38,6 +39,7 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

c.done = make(chan bool)
ctx, cancel := context.WithCancel(ctx)
go func() {
<-c.done
Expand Down Expand Up @@ -138,3 +140,7 @@ func (c *Client) getName() (string, error) {

return hostname, nil
}

func isCancelledError(err error) bool {
return err != nil && strings.Contains(err.Error(), "context canceled")
}
5 changes: 3 additions & 2 deletions agent/client/workflow_listen_for_ds_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand All @@ -21,12 +22,12 @@ func (c *Client) startDataStoreConnectionTestListener(ctx context.Context) error
for {
req := proto.DataStoreConnectionTestRequest{}
err := stream.RecvMsg(&req)
if err == io.EOF {
if errors.Is(err, io.EOF) || isCancelledError(err) {
return
}

if err != nil {
log.Fatal("could not get message from trigger stream: %w", err)
log.Fatal("could not get message from ds connection stream: %w", err)
}

// TODO: Get ctx from request
Expand Down
5 changes: 3 additions & 2 deletions agent/client/workflow_listen_for_poll_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand All @@ -21,12 +22,12 @@ func (c *Client) startPollerListener(ctx context.Context) error {
for {
resp := proto.PollingRequest{}
err := stream.RecvMsg(&resp)
if err == io.EOF {
if errors.Is(err, io.EOF) || isCancelledError(err) {
return
}

if err != nil {
log.Fatal("could not get message from trigger stream: %w", err)
log.Fatal("could not get message from polling stream: %w", err)
}

// TODO: Get ctx from request
Expand Down
3 changes: 2 additions & 1 deletion agent/client/workflow_listen_for_trigger_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand All @@ -21,7 +22,7 @@ func (c *Client) startTriggerListener(ctx context.Context) error {
for {
resp := proto.TriggerRequest{}
err := stream.RecvMsg(&resp)
if err == io.EOF {
if errors.Is(err, io.EOF) || isCancelledError(err) {
return
}

Expand Down
6 changes: 4 additions & 2 deletions agent/client/workflow_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand All @@ -21,12 +22,13 @@ func (c *Client) startShutdownListener(ctx context.Context) error {
for {
resp := proto.ShutdownRequest{}
err := stream.RecvMsg(&resp)
if err == io.EOF {

if errors.Is(err, io.EOF) || isCancelledError(err) {
return
}

if err != nil {
log.Fatal("could not get message from trigger stream: %w", err)
log.Fatal("could not get shutdown listener: %w", err)
}

// TODO: get context from request
Expand Down
4 changes: 2 additions & 2 deletions cli/config/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c Configurator) organizationSelector(ctx context.Context, cfg Config) (str
options := make([]cliUI.Option, len(elements))
for i, org := range elements {
options[i] = cliUI.Option{
Text: org.Name,
Text: fmt.Sprintf("%s (%s)", org.Name, org.ID),
Fn: func(o Entry) func(ui cliUI.UI) {
return func(ui cliUI.UI) {
orgID = o.ID
Expand Down Expand Up @@ -72,7 +72,7 @@ func (c Configurator) environmentSelector(ctx context.Context, cfg Config) (stri
options := make([]cliUI.Option, len(elements))
for i, env := range elements {
options[i] = cliUI.Option{
Text: env.Name,
Text: fmt.Sprintf("%s (%s)", env.Name, env.ID),
Fn: func(e Entry) func(ui cliUI.UI) {
return func(ui cliUI.UI) {
envID = e.ID
Expand Down

0 comments on commit ecbbdcb

Please sign in to comment.