Skip to content

Commit

Permalink
feat(agent): keep track of spans that were generated by tests (#3175)
Browse files Browse the repository at this point in the history
* feat(agent): start watching traces with trace-id that was triggered

* feat(agent): collector watches spans coming from triggered tests

* fix: tests

* fix: cli tests
  • Loading branch information
mathnogueira authored Sep 21, 2023
1 parent 3d54ed1 commit 34529cd
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 23 deletions.
43 changes: 43 additions & 0 deletions agent/collector/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package collector

import (
"sync"

gocache "github.com/Code-Hex/go-generics-cache"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

type TraceCache interface {
Get(string) ([]*v1.Span, bool)
Set(string, []*v1.Span)
}

type traceCache struct {
mutex sync.Mutex
internalCache *gocache.Cache[string, []*v1.Span]
}

// Get implements TraceCache.
func (c *traceCache) Get(traceID string) ([]*v1.Span, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()

return c.internalCache.Get(traceID)
}

// Set implements TraceCache.
func (c *traceCache) Set(traceID string, spans []*v1.Span) {
c.mutex.Lock()
defer c.mutex.Unlock()

existingTraces, _ := c.internalCache.Get(traceID)
spans = append(existingTraces, spans...)

c.internalCache.Set(traceID, spans)
}

func NewTraceCache() TraceCache {
return &traceCache{
internalCache: gocache.New[string, []*v1.Span](),
}
}
39 changes: 32 additions & 7 deletions agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,38 @@ type Config struct {
RemoteServerToken string
}

func Start(ctx context.Context, config Config, tracer trace.Tracer) error {
ingester, err := newForwardIngester(ctx, config.BatchTimeout, remoteIngesterConfig{
type CollectorOption func(*remoteIngesterConfig)

func WithTraceCache(traceCache TraceCache) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.traceCache = traceCache
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
}

// Stop implements stoppable.
func (c *collector) Stop() {
c.grpcServer.Stop()
c.httpServer.Stop()
}

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
})
}

for _, opt := range opts {
opt(&ingesterConfig)
}

ingester, err := newForwardIngester(ctx, config.BatchTimeout, ingesterConfig)
if err != nil {
return fmt.Errorf("could not start local collector: %w", err)
return nil, fmt.Errorf("could not start local collector: %w", err)
}

grpcServer := otlp.NewGrpcServer(fmt.Sprintf("0.0.0.0:%d", config.GRPCPort), ingester, tracer)
Expand All @@ -41,14 +66,14 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer) error {
})

if err = grpcServer.Start(); err != nil {
return fmt.Errorf("could not start gRPC OTLP listener: %w", err)
return nil, fmt.Errorf("could not start gRPC OTLP listener: %w", err)
}

if err = httpServer.Start(); err != nil {
return fmt.Errorf("could not start HTTP OTLP listener: %w", err)
return nil, fmt.Errorf("could not start HTTP OTLP listener: %w", err)
}

return nil
return &collector{grpcServer: grpcServer, httpServer: httpServer}, nil
}

func onProcessTermination(callback func()) {
Expand Down
78 changes: 77 additions & 1 deletion agent/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package collector_test
import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/kubeshop/tracetest/agent/collector"
"github.com/kubeshop/tracetest/agent/collector/mocks"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

func TestCollector(t *testing.T) {
Expand All @@ -19,7 +22,7 @@ func TestCollector(t *testing.T) {

noopTracer := trace.NewNoopTracerProvider().Tracer("noop_tracer")

err = collector.Start(
c, err := collector.Start(
context.Background(),
collector.Config{
HTTPPort: 4318,
Expand All @@ -31,6 +34,8 @@ func TestCollector(t *testing.T) {
)
require.NoError(t, err)

defer c.Stop()

tracer, err := mocks.NewTracer(context.Background(), "localhost:4317")
require.NoError(t, err)

Expand All @@ -51,3 +56,74 @@ func TestCollector(t *testing.T) {
time.Sleep(4 * time.Second)
assert.Len(t, targetServer.ReceivedSpans(), 10)
}

func TestCollectorWatchingSpansFromTest(t *testing.T) {
targetServer, err := mocks.NewOTLPIngestionServer()
require.NoError(t, err)

cache := collector.NewTraceCache()
noopTracer := trace.NewNoopTracerProvider().Tracer("noop_tracer")

c, err := collector.Start(
context.Background(),
collector.Config{
HTTPPort: 4318,
GRPCPort: 4317,
BatchTimeout: 2 * time.Second,
RemoteServerURL: targetServer.Addr(),
},
noopTracer,
collector.WithTraceCache(cache),
)
require.NoError(t, err)

defer c.Stop()

tracer, err := mocks.NewTracer(context.Background(), "localhost:4317")
require.NoError(t, err)

watchedTraceID := id.NewRandGenerator().TraceID()
spanContext := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: watchedTraceID,
})

ctx := trace.ContextWithSpanContext(context.Background(), spanContext)

cache.Set(watchedTraceID.String(), []*v1.Span{})

// 10 spans will be watched and stored in the cache
func(ctx context.Context) {
for i := 0; i < 10; i++ {
spanCtx, span := tracer.Start(ctx, fmt.Sprintf("watched span %d", i))
ctx = spanCtx

defer span.End()
}
}(ctx)

// 10 spans will not be watched neither stored in the cache
func(ctx context.Context) {
for i := 0; i < 10; i++ {
spanCtx, span := tracer.Start(ctx, fmt.Sprintf("span %d", i))
ctx = spanCtx

defer span.End()
}
}(context.Background())

time.Sleep(500 * time.Millisecond)
// Should not have any spans yet, because batch timeout is 2 seconds
assert.Len(t, targetServer.ReceivedSpans(), 0)

// Now after waiting the timeout, it should contain all spans
time.Sleep(4 * time.Second)
assert.Len(t, targetServer.ReceivedSpans(), 20)

cachedSpans, ok := cache.Get(watchedTraceID.String())
assert.True(t, ok)
assert.Len(t, cachedSpans, 10)
for _, span := range cachedSpans {
isWatched := strings.Contains(span.Name, "watched")
assert.True(t, isWatched)
}
}
35 changes: 33 additions & 2 deletions agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/kubeshop/tracetest/server/otlp"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"google.golang.org/grpc"
Expand All @@ -24,6 +25,7 @@ func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteI
RemoteIngester: remoteIngesterConfig,
buffer: &buffer{},
done: make(chan bool),
traceCache: remoteIngesterConfig.traceCache,
}

err := ingester.connectToRemoteServer(ctx)
Expand All @@ -44,11 +46,13 @@ type forwardIngester struct {
client pb.TraceServiceClient
buffer *buffer
done chan bool
traceCache TraceCache
}

type remoteIngesterConfig struct {
URL string
Token string
URL string
Token string
traceCache TraceCache
}

type buffer struct {
Expand All @@ -61,6 +65,12 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
i.buffer.spans = append(i.buffer.spans, request.ResourceSpans...)
i.buffer.mutex.Unlock()

if i.traceCache != nil {
// In case of OTLP datastore, those spans will be polled from this cache instead
// of a real datastore
i.cacheTestSpans(request.ResourceSpans)
}

return &pb.ExportTraceServiceResponse{
PartialSuccess: &pb.ExportTracePartialSuccess{
RejectedSpans: 0,
Expand Down Expand Up @@ -124,6 +134,27 @@ func (i *forwardIngester) forwardSpans(ctx context.Context, spans []*v1.Resource
return nil
}

func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
spans := make(map[string][]*v1.Span)
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
for _, span := range scopedSpan.Spans {
traceID := trace.TraceID(span.TraceId).String()
spans[traceID] = append(spans[traceID], span)
}
}
}

for traceID, spans := range spans {
if _, ok := i.traceCache.Get(traceID); !ok {
// traceID is not part of a test
continue
}

i.traceCache.Set(traceID, spans)
}
}

func (i *forwardIngester) Stop() {
i.done <- true
}
13 changes: 7 additions & 6 deletions agent/initialization/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

func NewClient(ctx context.Context, config config.Config) (*client.Client, error) {
func NewClient(ctx context.Context, config config.Config, traceCache collector.TraceCache) (*client.Client, error) {
client, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
Expand All @@ -21,7 +21,7 @@ func NewClient(ctx context.Context, config config.Config) (*client.Client, error
return nil, err
}

triggerWorker := workers.NewTriggerWorker(client)
triggerWorker := workers.NewTriggerWorker(client, workers.WithTraceCache(traceCache))
pollingWorker := workers.NewPollerWorker(client)
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(client)

Expand All @@ -38,7 +38,8 @@ func NewClient(ctx context.Context, config config.Config) (*client.Client, error

// Start the agent with given configuration
func Start(ctx context.Context, config config.Config) error {
client, err := NewClient(ctx, config)
traceCache := collector.NewTraceCache()
client, err := NewClient(ctx, config, traceCache)
if err != nil {
return err
}
Expand All @@ -48,7 +49,7 @@ func Start(ctx context.Context, config config.Config) error {
return err
}

err = startCollector(ctx, config)
err = startCollector(ctx, config, traceCache)
if err != nil {
return err
}
Expand All @@ -57,14 +58,14 @@ func Start(ctx context.Context, config config.Config) error {
return nil
}

func startCollector(ctx context.Context, config config.Config) error {
func startCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error {
noopTracer := trace.NewNoopTracerProvider().Tracer("noop")
collectorConfig := collector.Config{
HTTPPort: config.OTLPServer.HTTPPort,
GRPCPort: config.OTLPServer.GRPCPort,
}

err := collector.Start(ctx, collectorConfig, noopTracer)
_, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 34529cd

Please sign in to comment.