Skip to content

Commit

Permalink
fix: make test connection async on trace polling step (#3534)
Browse files Browse the repository at this point in the history
* fix: make test connection async on trace polling step

* Update server/executor/tracepollerworker/starter_worker.go

Co-authored-by: Matheus Nogueira <[email protected]>

---------

Co-authored-by: Matheus Nogueira <[email protected]>
  • Loading branch information
danielbdias and mathnogueira authored Jan 18, 2024
1 parent 805f6f3 commit 121ec57
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 31 deletions.
2 changes: 1 addition & 1 deletion agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest)
w.logger.Error("Invalid datastore", zap.Error(err))
return err
}
w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig))
w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig), zap.Any("originalDatastore", request.Datastore))

if datastoreConfig == nil {
w.logger.Error("Invalid datastore: nil")
Expand Down
33 changes: 17 additions & 16 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,22 @@ func (app *App) Start(opts ...appOption) error {
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, subscriptionManager, tracer)
}

testConnectionDriverFactory := pipeline.NewDriverFactory[testconnection.Job](natsConn)
dsTestListener := testconnection.NewListener()
dsTestPipeline := buildDataStoreTestPipeline(
testConnectionDriverFactory,
dsTestListener,
tracer,
tracedbFactory,
app.cfg,
meter,
)

dsTestPipeline.Start()
app.registerStopFn(func() {
dsTestPipeline.Stop()
})

executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn)
testPipeline := buildTestPipeline(
executorDriverFactory,
Expand All @@ -262,6 +278,7 @@ func (app *App) Start(opts ...appOption) error {
subscriptionManager,
triggerRegistry,
tracedbFactory,
dsTestPipeline,
app.cfg,
meter,
)
Expand All @@ -283,22 +300,6 @@ func (app *App) Start(opts ...appOption) error {
testSuitePipeline.Stop()
})

testConnectionDriverFactory := pipeline.NewDriverFactory[testconnection.Job](natsConn)
dsTestListener := testconnection.NewListener()
dsTestPipeline := buildDataStoreTestPipeline(
testConnectionDriverFactory,
dsTestListener,
tracer,
tracedbFactory,
app.cfg,
meter,
)

dsTestPipeline.Start()
app.registerStopFn(func() {
dsTestPipeline.Stop()
})

err = analytics.SendEvent("Server Started", "beacon", "", nil)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/test"
"github.com/kubeshop/tracetest/server/testconnection"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand All @@ -33,6 +34,7 @@ func buildTestPipeline(
subscriptionManager subscription.Manager,
triggerRegistry *trigger.Registry,
tracedbFactory tracedb.FactoryFunc,
dataStoreTestPipeline *testconnection.DataStoreTestPipeline,
appConfig *config.AppConfig,
meter metric.Meter,
) *executor.TestPipeline {
Expand Down Expand Up @@ -66,6 +68,7 @@ func buildTestPipeline(
execTestUpdater,
subscriptionManager,
tracer,
dataStoreTestPipeline,
)

traceFetcherWorker := tracepollerworker.NewFetcherWorker(
Expand Down
66 changes: 52 additions & 14 deletions server/executor/tracepollerworker/starter_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,31 @@ import (
"errors"
"fmt"
"log"
"sync"

"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/resourcemanager"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/testconnection"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type tracePollerStarterWorker struct {
state *workerState
outputQueue pipeline.Enqueuer[executor.Job]
state *workerState
dsTestPipeline dataStorePipeline
outputQueue pipeline.Enqueuer[executor.Job]
}

type dataStorePipeline interface {
Run(context.Context, testconnection.Job)
NewJob(context.Context, datastore.DataStore) testconnection.Job
Subscribe(string, testconnection.NotifierFn) error
Unsubscribe(string)
}

func NewStarterWorker(
Expand All @@ -28,6 +39,7 @@ func NewStarterWorker(
updater executor.RunUpdater,
subscriptionManager subscription.Manager,
tracer trace.Tracer,
dsTestPipeline dataStorePipeline,
) *tracePollerStarterWorker {
state := &workerState{
eventEmitter: eventEmitter,
Expand All @@ -38,7 +50,10 @@ func NewStarterWorker(
tracer: tracer,
}

return &tracePollerStarterWorker{state: state}
return &tracePollerStarterWorker{
state: state,
dsTestPipeline: dsTestPipeline, // this is necessary just for this worker
}
}

func (w *tracePollerStarterWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) {
Expand Down Expand Up @@ -85,29 +100,52 @@ func (w *tracePollerStarterWorker) ProcessItem(ctx context.Context, job executor

emitEvent(ctx, w.state, events.TraceFetchingStart(job.Test.ID, job.Run.ID))

err = w.testConnection(ctx, traceDB, &job)
endpoints := traceDB.GetEndpoints()
ds, err := w.state.dsRepo.Current(ctx)
if err != nil {
wrappedError := fmt.Errorf("could not get current datastore: %w", err)
handleError(ctx, job, wrappedError, w.state, span)
return
}

connectionResult, err := w.testConnection(ctx, traceDB, ds)
if err != nil {
log.Printf("[TracePoller] TestConnection error: %s", err.Error())
handleError(ctx, job, err, w.state, span)
return
}

if connectionResult != nil {
emitEvent(ctx, w.state, events.TraceDataStoreConnectionInfo(job.Test.ID, job.Run.ID, *connectionResult))
}

emitEvent(ctx, w.state, events.TracePollingStart(job.Test.ID, job.Run.ID, string(ds.Type), endpoints))

w.outputQueue.Enqueue(ctx, job)
}

func (w *tracePollerStarterWorker) testConnection(ctx context.Context, traceDB tracedb.TraceDB, job *executor.Job) error {
if testableTraceDB, ok := traceDB.(tracedb.TestableTraceDB); ok {
connectionResult := testableTraceDB.TestConnection(ctx)

emitEvent(ctx, w.state, events.TraceDataStoreConnectionInfo(job.Test.ID, job.Run.ID, connectionResult))
func (w *tracePollerStarterWorker) testConnection(ctx context.Context, traceDB tracedb.TraceDB, ds datastore.DataStore) (*model.ConnectionResult, error) {
_, ok := traceDB.(tracedb.TestableTraceDB)
if !ok {
return nil, nil
}

endpoints := traceDB.GetEndpoints()
ds, err := w.state.dsRepo.Current(ctx)
job := w.dsTestPipeline.NewJob(ctx, ds)

wg := sync.WaitGroup{}
err := w.dsTestPipeline.Subscribe(job.ID, func(result testconnection.Job) {
job = result
wg.Done()
})

if err != nil {
return fmt.Errorf("could not get current datastore: %w", err)
return nil, err
}

emitEvent(ctx, w.state, events.TracePollingStart(job.Test.ID, job.Run.ID, string(ds.Type), endpoints))
wg.Add(1)
w.dsTestPipeline.Run(ctx, job)
wg.Wait()
w.dsTestPipeline.Unsubscribe(job.ID)

return nil
return &job.TestResult, nil
}

0 comments on commit 121ec57

Please sign in to comment.