Skip to content

Commit

Permalink
chore(server): Updating data store test to use queues (#3166)
Browse files Browse the repository at this point in the history
* chore(server): Updating data store test to use queues

* chore(server): Updating data store test to use queues

* fix: build

* Adding postgres pipelines

* fix: agent unit tests

* using testconnection job instead of executor job

* cleanup

* PR comments
  • Loading branch information
xoscar authored Sep 20, 2023
1 parent 23aff08 commit 6e64d9c
Show file tree
Hide file tree
Showing 20 changed files with 2,339 additions and 439 deletions.
16 changes: 13 additions & 3 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type Client struct {
sessionConfig *SessionConfig
done chan bool

triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
dataStoreConnectionListener func(context.Context, *proto.DataStoreConnectionTestRequest) error
}

func (c *Client) Start(ctx context.Context) error {
Expand Down Expand Up @@ -61,6 +62,11 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startDataStoreConnectionTestListener(ctx)
if err != nil {
return err
}

c.startHearthBeat(ctx)

return nil
Expand Down Expand Up @@ -88,6 +94,10 @@ func (c *Client) OnTriggerRequest(listener func(context.Context, *proto.TriggerR
c.triggerListener = listener
}

func (c *Client) OnDataStoreTestConnectionRequest(listener func(context.Context, *proto.DataStoreConnectionTestRequest) error) {
c.dataStoreConnectionListener = listener
}

func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingRequest) error) {
c.pollListener = listener
}
Expand Down
30 changes: 23 additions & 7 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@ import (

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
terminationChannel chan *proto.ShutdownRequest
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
terminationChannel chan *proto.ShutdownRequest
dataStoreTestChannel chan *proto.DataStoreConnectionTestRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
}

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
dataStoreTestChannel: make(chan *proto.DataStoreConnectionTestRequest),
}
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -108,6 +110,20 @@ func (s *GrpcServerMock) RegisterPollerAgent(id *proto.AgentIdentification, stre
}
}

func (s *GrpcServerMock) RegisterDataStoreConnectionTestAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterDataStoreConnectionTestAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
dsTestRequest := <-s.dataStoreTestChannel
err := stream.Send(dsTestRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.PollingResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
Expand Down
37 changes: 37 additions & 0 deletions agent/client/workflow_listen_for_ds_connection_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"context"
"fmt"
"io"
"log"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) startDataStoreConnectionTestListener(ctx context.Context) error {
client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterDataStoreConnectionTestAgent(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
req := proto.DataStoreConnectionTestRequest{}
err := stream.RecvMsg(&req)
if err == io.EOF {
return
}

if err != nil {
log.Fatal("could not get message from trigger stream: %w", err)
}

// TODO: Get ctx from request
c.dataStoreConnectionListener(context.Background(), &req)
}
}()
return nil
}
21 changes: 21 additions & 0 deletions agent/client/workflow_send_ds_connection_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) SendDataStoreConnectionResult(ctx context.Context, response *proto.DataStoreConnectionTestResponse) error {
client := proto.NewOrchestratorClient(c.conn)

response.AgentIdentification = c.sessionConfig.AgentIdentification

_, err := client.SendDataStoreConnectionTestResult(ctx, response)
if err != nil {
return fmt.Errorf("could not send data store connection result request: %w", err)
}

return nil
}
2 changes: 2 additions & 0 deletions agent/initialization/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func NewClient(ctx context.Context, config config.Config) (*client.Client, error

triggerWorker := workers.NewTriggerWorker(client)
pollingWorker := workers.NewPollerWorker(client)
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(client)

client.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
client.OnTriggerRequest(triggerWorker.Trigger)
client.OnPollingRequest(pollingWorker.Poll)
client.OnConnectionClosed(func(ctx context.Context, sr *proto.ShutdownRequest) error {
Expand Down
Loading

0 comments on commit 6e64d9c

Please sign in to comment.