Skip to content

Commit

Permalink
feat(agent): add metrics to agent workers (#3674)
Browse files Browse the repository at this point in the history
* wip

* adding basic metrics for workers

* adding better timeouts for github actions

* fixing agent tests

* update scheduled jobs timeout
  • Loading branch information
danielbdias authored Feb 22, 2024
1 parent d60b884 commit ed7540c
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 27 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/scheduled-jobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ jobs:
pokeshop-trace-based-tests:
name: Run trace based tests for Pokeshop
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Configure Tracetest CLI
timeout-minutes: 5
uses: kubeshop/tracetest-github-action@v1
with:
token: ${{secrets.TRACETEST_POKESHOP_TOKEN}}

- name: Run synthetic monitoring tests
timeout-minutes: 10
run: |
tracetest run testsuite --file ./testing/synthetic-monitoring/pokeshop/_testsuite.yaml --vars ./testing/synthetic-monitoring/pokeshop/_variableset.yaml
Expand Down Expand Up @@ -65,18 +67,20 @@ jobs:
pokeshop-serverless-trace-based-tests:
name: Run trace based tests for Pokeshop Serverless
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Configure Tracetest CLI
timeout-minutes: 5
uses: kubeshop/tracetest-github-action@v1
with:
token: ${{secrets.TRACETEST_SERVERLESS_POKESHOP_TOKEN}}

- name: Run synthetic monitoring tests
timeout-minutes: 10
run: |
tracetest run testsuite --file ./testing/synthetic-monitoring/pokeshop-serverless/_testsuite.yaml --vars ./testing/synthetic-monitoring/pokeshop-serverless/_variableset.yaml
Expand Down Expand Up @@ -119,18 +123,20 @@ jobs:
otel-demo-trace-based-tests:
name: Run trace based tests for Open Telemetry demo
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Configure Tracetest CLI
timeout-minutes: 5
uses: kubeshop/tracetest-github-action@v1
with:
token: ${{secrets.TRACETEST_OTELDEMO_TOKEN}}

- name: Run synthetic monitoring tests
timeout-minutes: 10
run: |
tracetest run testsuite --file ./testing/synthetic-monitoring/otel-demo/_testsuite.yaml --vars ./testing/synthetic-monitoring/otel-demo/_variableset.yaml
Expand Down
16 changes: 14 additions & 2 deletions agent/runner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -43,8 +44,14 @@ func StartSession(ctx context.Context, cfg config.Config, observer event.Observe
return nil, err
}

meter, err := telemetry.GetMeter(ctx, cfg.CollectorEndpoint, cfg.Name)
if err != nil {
observer.Error(err)
return nil, err
}

traceCache := collector.NewTraceCache()
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger, tracer)
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger, tracer, meter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,7 +114,7 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
return collector, nil
}

func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer) (*client.Client, error) {
func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer, meter metric.Meter) (*client.Client, error) {
controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
Expand All @@ -123,7 +130,9 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
stopWorker := workers.NewStopperWorker(
workers.WithStopperObserver(observer),
workers.WithStopperCancelFuncList(processStopper.CancelMap()),
workers.WithStopperLogger(logger),
workers.WithStopperTracer(tracer),
workers.WithStopperMeter(meter),
)

triggerWorker := workers.NewTriggerWorker(
Expand All @@ -133,6 +142,7 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
workers.WithTriggerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithTriggerLogger(logger),
workers.WithTriggerTracer(tracer),
workers.WithTriggerMeter(meter),
)

pollingWorker := workers.NewPollerWorker(
Expand All @@ -142,13 +152,15 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
workers.WithPollerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithPollerLogger(logger),
workers.WithPollerTracer(tracer),
workers.WithPollerMeter(meter),
)

dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(
controlPlaneClient,
workers.WithTestConnectionLogger(logger),
workers.WithTestConnectionObserver(observer),
workers.WithTestConnectionTracer(tracer),
workers.WithTestConnectionMeter(meter),
)

controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
Expand Down
37 changes: 37 additions & 0 deletions agent/telemetry/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package telemetry

import (
"fmt"
"os"

"github.com/kubeshop/tracetest/server/version"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func getAgentServiceName(serviceName string) string {
return fmt.Sprintf("tracetest.agent-%s", serviceName)
}

func getResource(serviceName string) (*resource.Resource, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("could not get OS hostname: %w", err)
}

resource, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.HostName(hostname),
semconv.ServiceVersion(version.Version), // TODO: should we consider a version file for the agent?
),
)

if err != nil {
return nil, fmt.Errorf("could not merge resources: %w", err)
}

return resource, nil
}
77 changes: 77 additions & 0 deletions agent/telemetry/meter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package telemetry

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
)

const (
metricsReaderInterval = 30 * time.Second
metricExporterTimeout = 5 * time.Second
)

func GetNoopMeter() metric.Meter {
return noop.NewMeterProvider().Meter("noop")
}

func GetMeter(ctx context.Context, otelExporterEndpoint, serviceName string) (metric.Meter, error) {
if otelExporterEndpoint == "" {
// fallback, return noop
return GetNoopMeter(), nil
}

realServiceName := getAgentServiceName(serviceName)

provider, err := newMeterProvider(ctx, otelExporterEndpoint, realServiceName)
if err != nil {
return nil, fmt.Errorf("could not create meter provider: %w", err)
}

return provider.Meter("tracetest.agent"), nil
}

func newMeterProvider(ctx context.Context, otelExporterEndpoint, serviceName string) (metric.MeterProvider, error) {
resource, err := getResource(serviceName)
if err != nil {
return nil, fmt.Errorf("could not get resource: %w", err)
}

exporter, err := getMetricExporter(ctx, otelExporterEndpoint)
if err != nil {
return nil, fmt.Errorf("could not create metric exporter: %w", err)
}

periodicReader := metricsdk.NewPeriodicReader(
exporter,
metricsdk.WithInterval(metricsReaderInterval),
)

provider := metricsdk.NewMeterProvider(
metricsdk.WithResource(resource),
metricsdk.WithReader(periodicReader),
)

return provider, nil
}

func getMetricExporter(ctx context.Context, otelExporterEndpoint string) (*otlpmetricgrpc.Exporter, error) {
ctx, cancel := context.WithTimeout(ctx, metricExporterTimeout)
defer cancel()

exporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(otelExporterEndpoint),
otlpmetricgrpc.WithCompressor("gzip"),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("could not create metric exporter: %w", err)
}

return exporter, nil
}
22 changes: 8 additions & 14 deletions agent/telemetry/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const spanExporterTimeout = 1 * time.Minute

func GetNoopTracer() trace.Tracer {
return trace.NewNoopTracerProvider().Tracer("noop")
}

func GetTracer(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) {
if otelExporterEndpoint == "" {
// fallback, return noop
return trace.NewNoopTracerProvider().Tracer("noop"), nil
return GetNoopTracer(), nil
}

realServiceName := fmt.Sprintf("tracetestAgent_%s", serviceName)
realServiceName := getAgentServiceName(serviceName)

spanExporter, err := newSpanExporter(ctx, otelExporterEndpoint)
if err != nil {
Expand Down Expand Up @@ -57,22 +59,14 @@ func newSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace
}

func newTraceProvider(ctx context.Context, spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) {
defaultResource := resource.Default()

mergedResource, err := resource.Merge(
defaultResource,
resource.NewWithAttributes(
defaultResource.SchemaURL(),
semconv.ServiceNameKey.String(serviceName),
),
)
resource, err := getResource(serviceName)
if err != nil {
return nil, fmt.Errorf("failed to create otel resource: %w", err)
}

tp := sdkTrace.NewTracerProvider(
sdkTrace.WithBatcher(spanExporter),
sdkTrace.WithResource(mergedResource),
sdkTrace.WithResource(resource),
)

otel.SetTracerProvider(tp)
Expand Down
21 changes: 20 additions & 1 deletion agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"
"github.com/kubeshop/tracetest/server/traces"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
Expand All @@ -29,6 +31,7 @@ type PollerWorker struct {
observer event.Observer
stoppableProcessRunner StoppableProcessRunner
tracer trace.Tracer
meter metric.Meter
}

type PollerOption func(*PollerWorker)
Expand Down Expand Up @@ -63,13 +66,20 @@ func WithPollerTracer(tracer trace.Tracer) PollerOption {
}
}

func WithPollerMeter(meter metric.Meter) PollerOption {
return func(pw *PollerWorker) {
pw.meter = meter
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
pollerWorker := &PollerWorker{
client: client,
sentSpanIDs: gocache.New[string, bool](),
logger: zap.NewNop(),
observer: event.NewNopObserver(),
tracer: trace.NewNoopTracerProvider().Tracer("noop"),
tracer: telemetry.GetNoopTracer(),
meter: telemetry.GetNoopMeter(),
}

for _, opt := range opts {
Expand All @@ -83,6 +93,11 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)
ctx, span := w.tracer.Start(ctx, "PollingRequest Worker operation")
defer span.End()

runCounter, _ := w.meter.Int64Counter("tracetest.agent.pollerworker.runs")
runCounter.Add(ctx, 1)

errorCounter, _ := w.meter.Int64Counter("tracetest.agent.pollerworker.errors")

w.logger.Debug("Received polling request", zap.Any("request", request))
w.observer.StartTracePoll(request)

Expand Down Expand Up @@ -120,9 +135,13 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)

formattedErr := fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error())
span.RecordError(formattedErr)
errorCounter.Add(ctx, 1)

return formattedErr
}

span.RecordError(err)
errorCounter.Add(ctx, 1)
}

w.observer.EndTracePoll(request, nil)
Expand Down
Loading

0 comments on commit ed7540c

Please sign in to comment.