diff --git a/local-config/tracetest.provision.yaml b/local-config/tracetest.provision.yaml index 998c2ca7e0..90c851a62f 100644 --- a/local-config/tracetest.provision.yaml +++ b/local-config/tracetest.provision.yaml @@ -1,2 +1,6 @@ -dataStore: +--- +type: DataStore +spec: + name: OpenTelemetry Collector type: otlp + isdefault: true diff --git a/server/app/app.go b/server/app/app.go index 835e999930..ef2d9e4cb2 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "net/http" - "time" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -16,7 +15,6 @@ import ( "github.com/kubeshop/tracetest/server/config" "github.com/kubeshop/tracetest/server/config/configresource" "github.com/kubeshop/tracetest/server/config/demoresource" - "github.com/kubeshop/tracetest/server/executor" "github.com/kubeshop/tracetest/server/executor/pollingprofile" "github.com/kubeshop/tracetest/server/executor/trigger" httpServer "github.com/kubeshop/tracetest/server/http" @@ -142,16 +140,16 @@ func (app *App) Start(opts ...appOption) error { testdb.WithDB(db), ) + if err != nil { + log.Fatal(err) + } + subscriptionManager := subscription.NewManager() app.subscribeToConfigChanges(subscriptionManager) configRepo := configresource.NewRepository(db, configresource.WithPublisher(subscriptionManager)) configFromDB := configRepo.Current(ctx) - if err != nil { - log.Fatal(err) - } - tracer, err := tracing.NewTracer(ctx, app.cfg) if err != nil { log.Fatal(err) @@ -321,68 +319,6 @@ func registerDemosResource(repository *demoresource.Repository, router *mux.Rout provisioner.AddResourceProvisioner(manager) } -type facadeConfig interface { - PoolingRetryDelay() time.Duration - PoolingMaxWaitTimeForTraceDuration() time.Duration -} - -func newRunnerFacades( - ppRepo *pollingprofile.Repository, - testDB model.Repository, - appTracer trace.Tracer, - tracer trace.Tracer, - subscriptionManager *subscription.Manager, - triggerRegistry *trigger.Registry, -) *runnerFacade { - - execTestUpdater := (executor.CompositeUpdater{}). - Add(executor.NewDBUpdater(testDB)). - Add(executor.NewSubscriptionUpdater(subscriptionManager)) - - assertionRunner := executor.NewAssertionRunner( - execTestUpdater, - executor.NewAssertionExecutor(tracer), - executor.InstrumentedOutputProcessor(tracer), - subscriptionManager, - ) - - pollerExecutor := executor.NewPollerExecutor( - ppRepo, - tracer, - execTestUpdater, - tracedb.Factory(testDB), - testDB, - ) - - tracePoller := executor.NewTracePoller( - pollerExecutor, - ppRepo, - execTestUpdater, - assertionRunner, - subscriptionManager, - ) - - runner := executor.NewPersistentRunner( - triggerRegistry, - testDB, - execTestUpdater, - tracePoller, - tracer, - subscriptionManager, - tracedb.Factory(testDB), - testDB, - ) - - transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager) - - return &runnerFacade{ - runner: runner, - transactionRunner: transactionRunner, - assertionRunner: assertionRunner, - tracePoller: tracePoller, - } -} - func getTriggerRegistry(tracer, appTracer trace.Tracer) *trigger.Registry { triggerReg := trigger.NewRegsitry(tracer, appTracer) triggerReg.Add(trigger.HTTP()) diff --git a/server/app/facade.go b/server/app/facade.go index 980617c68f..5c0e098e93 100644 --- a/server/app/facade.go +++ b/server/app/facade.go @@ -4,7 +4,12 @@ import ( "context" "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/executor/pollingprofile" + "github.com/kubeshop/tracetest/server/executor/trigger" "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/subscription" + "github.com/kubeshop/tracetest/server/tracedb" + "go.opentelemetry.io/otel/trace" ) type runnerFacade struct { @@ -25,3 +30,66 @@ func (rf runnerFacade) RunTransaction(ctx context.Context, tr model.Transaction, func (rf runnerFacade) RunAssertions(ctx context.Context, request executor.AssertionRequest) { rf.assertionRunner.RunAssertions(ctx, request) } + +func newRunnerFacades( + ppRepo *pollingprofile.Repository, + testDB model.Repository, + appTracer trace.Tracer, + tracer trace.Tracer, + subscriptionManager *subscription.Manager, + triggerRegistry *trigger.Registry, +) *runnerFacade { + eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager) + + execTestUpdater := (executor.CompositeUpdater{}). + Add(executor.NewDBUpdater(testDB)). + Add(executor.NewSubscriptionUpdater(subscriptionManager)) + + assertionRunner := executor.NewAssertionRunner( + execTestUpdater, + executor.NewAssertionExecutor(tracer), + executor.InstrumentedOutputProcessor(tracer), + subscriptionManager, + eventEmitter, + ) + + pollerExecutor := executor.NewPollerExecutor( + ppRepo, + tracer, + execTestUpdater, + tracedb.Factory(testDB), + testDB, + ) + + tracePoller := executor.NewTracePoller( + pollerExecutor, + ppRepo, + execTestUpdater, + assertionRunner, + subscriptionManager, + ) + + runner := executor.NewPersistentRunner( + triggerRegistry, + testDB, + execTestUpdater, + tracePoller, + tracer, + subscriptionManager, + tracedb.Factory(testDB), + testDB, + ) + + transactionRunner := executor.NewTransactionRunner( + runner, + testDB, + subscriptionManager, + ) + + return &runnerFacade{ + runner: runner, + transactionRunner: transactionRunner, + assertionRunner: assertionRunner, + tracePoller: tracePoller, + } +} diff --git a/server/executor/assertion_runner.go b/server/executor/assertion_runner.go index 7973c4447a..f2b45f89bf 100644 --- a/server/executor/assertion_runner.go +++ b/server/executor/assertion_runner.go @@ -46,6 +46,7 @@ func NewAssertionRunner( assertionExecutor AssertionExecutor, op OutputsProcessorFn, subscriptionManager *subscription.Manager, + eventEmitter EventEmitter, ) AssertionRunner { return &defaultAssertionRunner{ outputsProcessor: op, diff --git a/server/executor/eventemitter.go b/server/executor/eventemitter.go new file mode 100644 index 0000000000..fee954ce1f --- /dev/null +++ b/server/executor/eventemitter.go @@ -0,0 +1,38 @@ +package executor + +import ( + "context" + + "github.com/kubeshop/tracetest/server/model" +) + +type EventEmitter interface { + Emit(ctx context.Context, event model.TestRunEvent) error +} + +type publisher interface { + Publish(eventID string, message any) +} + +type internalEventEmitter struct { + repository model.TestRunEventRepository + publisher publisher +} + +func NewEventEmitter(repository model.TestRunEventRepository, publisher publisher) EventEmitter { + return &internalEventEmitter{ + repository: repository, + publisher: publisher, + } +} + +func (em *internalEventEmitter) Emit(ctx context.Context, event model.TestRunEvent) error { + err := em.repository.CreateTestRunEvent(ctx, event) + if err != nil { + return err + } + + em.publisher.Publish(event.ResourceID(), event) + + return nil +} diff --git a/server/executor/eventemitter_test.go b/server/executor/eventemitter_test.go new file mode 100644 index 0000000000..1c835305d1 --- /dev/null +++ b/server/executor/eventemitter_test.go @@ -0,0 +1,153 @@ +package executor_test + +import ( + "context" + "errors" + "testing" + + "github.com/kubeshop/tracetest/server/executor" + "github.com/kubeshop/tracetest/server/id" + "github.com/kubeshop/tracetest/server/model" + "github.com/kubeshop/tracetest/server/subscription" + "github.com/kubeshop/tracetest/server/testdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventEmitter_SuccessfulScenario(t *testing.T) { + // Given I have a test run event + + run := model.NewRun() + + test := model.Test{ + ID: id.ID("some-test"), + ServiceUnderTest: model.Trigger{ + Type: model.TriggerTypeHTTP, + }, + } + + testRunEvent := model.TestRunEvent{ + TestID: test.ID, + RunID: run.ID, + Type: "EVENT_1", + Stage: model.StageTrigger, + Title: "OP 1", + Description: "This happened", + } + + // When I emit this event successfully + repository := getTestRunEventRepositoryMock(t, false) + subscriptionManager, subscriber := getSubscriptionManagerMock(t, testRunEvent) + + eventEmitter := executor.NewEventEmitter(repository, subscriptionManager) + + err := eventEmitter.Emit(context.Background(), testRunEvent) + require.NoError(t, err) + + // Then I expect that it was persisted + assert.Len(t, repository.events, 1) + assert.Equal(t, testRunEvent.Title, repository.events[0].Title) + assert.Equal(t, testRunEvent.Stage, repository.events[0].Stage) + assert.Equal(t, testRunEvent.Description, repository.events[0].Description) + + // And that it was sent to subscribers + assert.Len(t, subscriber.events, 1) + assert.Equal(t, testRunEvent.Title, subscriber.events[0].Title) + assert.Equal(t, testRunEvent.Stage, subscriber.events[0].Stage) + assert.Equal(t, testRunEvent.Description, subscriber.events[0].Description) +} + +func TestEventEmitter_FailedScenario(t *testing.T) { + // Given I have a test run event + + run := model.NewRun() + + test := model.Test{ + ID: id.ID("some-test"), + ServiceUnderTest: model.Trigger{ + Type: model.TriggerTypeHTTP, + }, + } + + testRunEvent := model.TestRunEvent{ + TestID: test.ID, + RunID: run.ID, + Type: "EVENT_1", + Stage: model.StageTrigger, + Title: "OP 1", + Description: "This happened", + } + + // When I emit this event and it fails + repository := getTestRunEventRepositoryMock(t, true) + subscriptionManager, subscriber := getSubscriptionManagerMock(t, testRunEvent) + + eventEmitter := executor.NewEventEmitter(repository, subscriptionManager) + + err := eventEmitter.Emit(context.Background(), testRunEvent) + require.Error(t, err) + + // Then I expect that it was not persisted + assert.Len(t, repository.events, 0) + + // And that it was not sent to subscribers + assert.Len(t, subscriber.events, 0) +} + +// TestRunEventRepository +type testRunEventRepositoryMock struct { + testdb.MockRepository + events []model.TestRunEvent + returnError bool + // ... +} + +func (m *testRunEventRepositoryMock) CreateTestRunEvent(ctx context.Context, event model.TestRunEvent) error { + if m.returnError { + return errors.New("error on persistence") + } + + m.events = append(m.events, event) + return nil +} + +func getTestRunEventRepositoryMock(t *testing.T, returnError bool) *testRunEventRepositoryMock { + t.Helper() + + mock := new(testRunEventRepositoryMock) + mock.T = t + mock.Test(t) + + mock.events = []model.TestRunEvent{} + mock.returnError = returnError + + return mock +} + +// TestRunEventSubscriber +type testRunEventSubscriber struct { + events []model.TestRunEvent +} + +func (s *testRunEventSubscriber) ID() string { + return "some-id" +} + +func (s *testRunEventSubscriber) Notify(message subscription.Message) error { + event := message.Content.(model.TestRunEvent) + s.events = append(s.events, event) + return nil +} + +func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (*subscription.Manager, *testRunEventSubscriber) { + t.Helper() + + subscriptionManager := subscription.NewManager() + subscriber := &testRunEventSubscriber{ + events: []model.TestRunEvent{}, + } + + subscriptionManager.Subscribe(event.ResourceID(), subscriber) + + return subscriptionManager, subscriber +} diff --git a/server/model/test_run_event.go b/server/model/test_run_event.go index 96c8d0fef4..4665e1a02d 100644 --- a/server/model/test_run_event.go +++ b/server/model/test_run_event.go @@ -1,6 +1,7 @@ package model import ( + "fmt" "time" "github.com/kubeshop/tracetest/server/id" @@ -54,6 +55,10 @@ type TestRunEvent struct { Outputs []OutputInfo } +func (e TestRunEvent) ResourceID() string { + return fmt.Sprintf("test/%s/run/%d/event", e.TestID, e.RunID) +} + type PollingInfo struct { Type PollingType ReasonNextIteration string