Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): agent development #3081

Merged
merged 21 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e6ce577
feat: add structure for agent (#2964)
mathnogueira Jul 19, 2023
95f573c
feat: create client for receiving trigger requests from the server (#…
mathnogueira Jul 21, 2023
ef143f8
feat: agent client polling methods (#2972)
mathnogueira Jul 21, 2023
1ce6b85
feat: send trigger requests from agent (#2975)
mathnogueira Jul 24, 2023
ae8dae7
feat: agent polling (#2979)
mathnogueira Jul 24, 2023
f231ad4
feat: agent internal collector (#2997)
mathnogueira Jul 28, 2023
1e71c79
feat: agent shutdown (#3022)
mathnogueira Aug 2, 2023
906d9a7
update mod
schoren Aug 9, 2023
897b7b5
fix merge errors;
schoren Aug 9, 2023
75b2891
fix merge errors;
schoren Aug 9, 2023
b033e8f
more fixes
schoren Aug 9, 2023
3450bf0
feat: add agent token to proto and client (#3068)
mathnogueira Aug 16, 2023
c5359eb
add token to shutdown listener method (#3071)
mathnogueira Aug 16, 2023
34809d8
fix(agent): rename traceid response property in proto (#3076)
mathnogueira Aug 17, 2023
eaeb45f
update go.work.sum
mathnogueira Aug 18, 2023
31bf155
feat(agent): add kafka agent proto (#3080)
mathnogueira Aug 18, 2023
1c3f74a
fix module name
mathnogueira Aug 22, 2023
d2f14b2
feat(agent): inject env variables in agent configuration (#3097)
mathnogueira Aug 23, 2023
a8f3270
Merge branch 'main' of github.com:kubeshop/tracetest into feat/agent
mathnogueira Aug 28, 2023
ef9bca2
feat: make agent wait until it's disconnected to exit (#3109)
mathnogueira Aug 30, 2023
6870143
Merge branch 'main' into feat/agent
mathnogueira Aug 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ jobs:
- name: Run unit tests
run: cd server; make test -B

unit-test-agent:
name: Agent unit tests
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup go
uses: actions/setup-go@v3
with:
go-version-file: "go.work"
cache: true
cache-dependency-path: go.work
- name: Run unit tests
run: cd agent; make test -B


unit-test-web:
name: WebUI unit tests
runs-on: ubuntu-latest
Expand Down
22 changes: 22 additions & 0 deletions agent/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Dependencies:
# https://grpc.io/docs/protoc-installation/

REQUIRED_BINS := protoc

build-proto: ensure-dependencies clean-proto
@protoc \
--go_out=./ \
--go_opt=paths=source_relative \
--go-grpc_out=./ \
--go-grpc_opt=paths=source_relative \
proto/orchestrator.proto

ensure-dependencies:
$(foreach bin,$(REQUIRED_BINS),\
$(if $(shell command -v $(bin) 2> /dev/null),,$(error Please install `$(bin)`)))

clean-proto:
@rm -f proto/*.go

test:
go test -timeout 150s -coverprofile=coverage.out ./...
128 changes: 128 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package client

import (
"context"
"fmt"
"os"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"google.golang.org/grpc"
)

type Config struct {
APIKey string
AgentName string
}

type SessionConfig struct {
BatchTimeout time.Duration
AgentIdentification *proto.AgentIdentification
}

type Client struct {
conn *grpc.ClientConn
config Config
sessionConfig *SessionConfig
done chan bool

triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
}

func (c *Client) Start(ctx context.Context) error {
err := c.startup(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
go func() {
<-c.done
// We cannot `defer cancel()` in this case because the start listener functions
// start one goroutine each and don't block the execution of this function.
// Thus, if we cancel the context, all those goroutines will fail.
cancel()
}()

err = c.startTriggerListener(ctx)
if err != nil {
return err
}

err = c.startPollerListener(ctx)
if err != nil {
return err
}

err = c.startShutdownListener(ctx)
if err != nil {
return err
}

return nil
}

func (c *Client) WaitUntilDisconnected() {
<-c.done
}

func (c *Client) SessionConfiguration() *SessionConfig {
if c.sessionConfig == nil {
return nil
}

deferredPtr := *c.sessionConfig
return &deferredPtr
}

func (c *Client) Close() error {
c.done <- true
return c.conn.Close()
}

func (c *Client) OnTriggerRequest(listener func(context.Context, *proto.TriggerRequest) error) {
c.triggerListener = listener
}

func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingRequest) error) {
c.pollListener = listener
}

func (c *Client) OnConnectionClosed(listener func(context.Context, *proto.ShutdownRequest) error) {
c.shutdownListener = listener
}

func (c *Client) getConnectionRequest() (*proto.ConnectRequest, error) {
name, err := c.getName()
if err != nil {
return nil, err
}

request := proto.ConnectRequest{
ApiKey: c.config.APIKey,
Name: name,
}

return &request, nil
}

// getName retrieves the name of the agent. By default, it is the host name, however,
// it can be overwritten with an environment variable, or a flag.
func (c *Client) getName() (string, error) {
if name := c.config.AgentName; name != "" {
return name, nil
}

if name := os.Getenv("TRACETEST_AGENT_NAME"); name != "" {
return name, nil
}

hostname, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("could not get hostname: %w", err)
}

return hostname, nil
}
37 changes: 37 additions & 0 deletions agent/client/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func Connect(ctx context.Context, endpoint string, opts ...Option) (*Client, error) {
conn, err := connect(ctx, endpoint)
if err != nil {
return nil, err
}

client := &Client{conn: conn}
for _, opt := range opts {
opt(client)
}

return client, nil
}

func connect(ctx context.Context, endpoint string) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

// TODO: don't use insecure transportation
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("could not connect to server: %w", err)
}

return conn, nil
}
152 changes: 152 additions & 0 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package mocks

import (
"context"
"fmt"
"log"
"net"
"sync"

"github.com/kubeshop/tracetest/agent/proto"
"google.golang.org/grpc"
)

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
terminationChannel chan *proto.ShutdownRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
}

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
}
var wg sync.WaitGroup
wg.Add(1)

go server.start(&wg)

wg.Wait()

return server
}

func (s *GrpcServerMock) Addr() string {
return fmt.Sprintf("localhost:%d", s.port)
}

func (s *GrpcServerMock) start(wg *sync.WaitGroup) {
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

s.port = lis.Addr().(*net.TCPAddr).Port

server := grpc.NewServer()
proto.RegisterOrchestratorServer(server, s)

wg.Done()
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

func (s *GrpcServerMock) Connect(ctx context.Context, req *proto.ConnectRequest) (*proto.AgentConfiguration, error) {
return &proto.AgentConfiguration{
Configuration: &proto.SessionConfiguration{
BatchTimeout: 1000,
},
Identification: &proto.AgentIdentification{
Token: "token",
},
}, nil
}

func (s *GrpcServerMock) RegisterTriggerAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterTriggerAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
triggerRequest := <-s.triggerChannel
err := stream.Send(triggerRequest)
if err != nil {
log.Println("could not send trigger request to agent: %w", err)
}

}
}

func (s *GrpcServerMock) SendTriggerResult(ctx context.Context, result *proto.TriggerResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastTriggerResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterPollerAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterPollerAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
pollerRequest := <-s.pollingChannel
err := stream.Send(pollerRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.PollingResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastPollingResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterShutdownListener(_ *proto.AgentIdentification, stream proto.Orchestrator_RegisterShutdownListenerServer) error {
for {
shutdownRequest := <-s.terminationChannel
err := stream.Send(shutdownRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

// Test methods

func (s *GrpcServerMock) SendTriggerRequest(request *proto.TriggerRequest) {
s.triggerChannel <- request
}

func (s *GrpcServerMock) SendPollingRequest(request *proto.PollingRequest) {
s.pollingChannel <- request
}

func (s *GrpcServerMock) GetLastTriggerResponse() *proto.TriggerResponse {
return s.lastTriggerResponse
}

func (s *GrpcServerMock) GetLastPollingResponse() *proto.PollingResponse {
return s.lastPollingResponse
}

func (s *GrpcServerMock) TerminateConnection(reason string) {
s.terminationChannel <- &proto.ShutdownRequest{
Reason: reason,
}
}
15 changes: 15 additions & 0 deletions agent/client/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package client

type Option func(*Client)

func WithAPIKey(apiKey string) Option {
return func(c *Client) {
c.config.APIKey = apiKey
}
}

func WithAgentName(name string) Option {
return func(c *Client) {
c.config.AgentName = name
}
}
Loading