Skip to content

Commit

Permalink
fix(agent): use goroutines for processing requests from control plane (
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren authored May 16, 2024
1 parent 5931da3 commit f07ac86
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 20 deletions.
12 changes: 7 additions & 5 deletions agent/client/workflow_listen_for_ds_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func (c *Client) startDataStoreConnectionTestListener(ctx context.Context) error

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
err = c.dataStoreConnectionListener(ctx, &req)
if err != nil {
logger.Error("could not handle data store connection test request", zap.Error(err))
fmt.Println(err.Error())
}
go func() {
err = c.dataStoreConnectionListener(ctx, &req)
if err != nil {
logger.Error("could not handle data store connection test request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
Expand Down
12 changes: 7 additions & 5 deletions agent/client/workflow_listen_for_otlp_connection_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func (c *Client) startOTLPConnectionTestListener(ctx context.Context) error {

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
err = c.otlpConnectionTestListener(ctx, &req)
if err != nil {
logger.Error("could not handle otlp connection test request", zap.Error(err))
fmt.Println(err.Error())
}
go func() {
err = c.otlpConnectionTestListener(ctx, &req)
if err != nil {
logger.Error("could not handle otlp connection test request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
Expand Down
12 changes: 7 additions & 5 deletions agent/client/workflow_listen_for_poll_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func (c *Client) startPollerListener(ctx context.Context) error {

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
err = c.pollListener(ctx, &req)
if err != nil {
logger.Error("could not handle poll request", zap.Error(err))
fmt.Println(err.Error())
}
go func() {
err = c.pollListener(ctx, &req)
if err != nil {
logger.Error("could not handle poll request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
Expand Down
12 changes: 7 additions & 5 deletions agent/client/workflow_listen_for_trigger_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func (c *Client) startTriggerListener(ctx context.Context) error {

// we want a new context per request, not to reuse the one from the stream
ctx := telemetry.InjectMetadataIntoContext(context.Background(), req.Metadata)
err = c.triggerListener(ctx, &req)
if err != nil {
logger.Error("could not handle trigger request", zap.Error(err))
fmt.Println(err.Error())
}
go func() {
err = c.triggerListener(ctx, &req)
if err != nil {
logger.Error("could not handle trigger request", zap.Error(err))
fmt.Println(err.Error())
}
}()
}
}()
return nil
Expand Down

0 comments on commit f07ac86

Please sign in to comment.