Skip to content

Commit

Permalink
feat(server): adding event emitter (#2269)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbdias authored Mar 29, 2023
1 parent b907493 commit f56d33e
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 69 deletions.
6 changes: 5 additions & 1 deletion local-config/tracetest.provision.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
dataStore:
---
type: DataStore
spec:
name: OpenTelemetry Collector
type: otlp
isdefault: true
72 changes: 4 additions & 68 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/config/configresource"
"github.com/kubeshop/tracetest/server/config/demoresource"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
httpServer "github.com/kubeshop/tracetest/server/http"
Expand Down Expand Up @@ -142,16 +140,16 @@ func (app *App) Start(opts ...appOption) error {
testdb.WithDB(db),
)

if err != nil {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
app.subscribeToConfigChanges(subscriptionManager)

configRepo := configresource.NewRepository(db, configresource.WithPublisher(subscriptionManager))
configFromDB := configRepo.Current(ctx)

if err != nil {
log.Fatal(err)
}

tracer, err := tracing.NewTracer(ctx, app.cfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -321,68 +319,6 @@ func registerDemosResource(repository *demoresource.Repository, router *mux.Rout
provisioner.AddResourceProvisioner(manager)
}

type facadeConfig interface {
PoolingRetryDelay() time.Duration
PoolingMaxWaitTimeForTraceDuration() time.Duration
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}

func getTriggerRegistry(tracer, appTracer trace.Tracer) *trigger.Registry {
triggerReg := trigger.NewRegsitry(tracer, appTracer)
triggerReg.Add(trigger.HTTP())
Expand Down
68 changes: 68 additions & 0 deletions server/app/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import (
"context"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
Expand All @@ -25,3 +30,66 @@ func (rf runnerFacade) RunTransaction(ctx context.Context, tr model.Transaction,
func (rf runnerFacade) RunAssertions(ctx context.Context, request executor.AssertionRequest) {
rf.assertionRunner.RunAssertions(ctx, request)
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {
eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
eventEmitter,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(
runner,
testDB,
subscriptionManager,
)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}
1 change: 1 addition & 0 deletions server/executor/assertion_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewAssertionRunner(
assertionExecutor AssertionExecutor,
op OutputsProcessorFn,
subscriptionManager *subscription.Manager,
eventEmitter EventEmitter,
) AssertionRunner {
return &defaultAssertionRunner{
outputsProcessor: op,
Expand Down
38 changes: 38 additions & 0 deletions server/executor/eventemitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package executor

import (
"context"

"github.com/kubeshop/tracetest/server/model"
)

type EventEmitter interface {
Emit(ctx context.Context, event model.TestRunEvent) error
}

type publisher interface {
Publish(eventID string, message any)
}

type internalEventEmitter struct {
repository model.TestRunEventRepository
publisher publisher
}

func NewEventEmitter(repository model.TestRunEventRepository, publisher publisher) EventEmitter {
return &internalEventEmitter{
repository: repository,
publisher: publisher,
}
}

func (em *internalEventEmitter) Emit(ctx context.Context, event model.TestRunEvent) error {
err := em.repository.CreateTestRunEvent(ctx, event)
if err != nil {
return err
}

em.publisher.Publish(event.ResourceID(), event)

return nil
}
153 changes: 153 additions & 0 deletions server/executor/eventemitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package executor_test

import (
"context"
"errors"
"testing"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/testdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEventEmitter_SuccessfulScenario(t *testing.T) {
// Given I have a test run event

run := model.NewRun()

test := model.Test{
ID: id.ID("some-test"),
ServiceUnderTest: model.Trigger{
Type: model.TriggerTypeHTTP,
},
}

testRunEvent := model.TestRunEvent{
TestID: test.ID,
RunID: run.ID,
Type: "EVENT_1",
Stage: model.StageTrigger,
Title: "OP 1",
Description: "This happened",
}

// When I emit this event successfully
repository := getTestRunEventRepositoryMock(t, false)
subscriptionManager, subscriber := getSubscriptionManagerMock(t, testRunEvent)

eventEmitter := executor.NewEventEmitter(repository, subscriptionManager)

err := eventEmitter.Emit(context.Background(), testRunEvent)
require.NoError(t, err)

// Then I expect that it was persisted
assert.Len(t, repository.events, 1)
assert.Equal(t, testRunEvent.Title, repository.events[0].Title)
assert.Equal(t, testRunEvent.Stage, repository.events[0].Stage)
assert.Equal(t, testRunEvent.Description, repository.events[0].Description)

// And that it was sent to subscribers
assert.Len(t, subscriber.events, 1)
assert.Equal(t, testRunEvent.Title, subscriber.events[0].Title)
assert.Equal(t, testRunEvent.Stage, subscriber.events[0].Stage)
assert.Equal(t, testRunEvent.Description, subscriber.events[0].Description)
}

func TestEventEmitter_FailedScenario(t *testing.T) {
// Given I have a test run event

run := model.NewRun()

test := model.Test{
ID: id.ID("some-test"),
ServiceUnderTest: model.Trigger{
Type: model.TriggerTypeHTTP,
},
}

testRunEvent := model.TestRunEvent{
TestID: test.ID,
RunID: run.ID,
Type: "EVENT_1",
Stage: model.StageTrigger,
Title: "OP 1",
Description: "This happened",
}

// When I emit this event and it fails
repository := getTestRunEventRepositoryMock(t, true)
subscriptionManager, subscriber := getSubscriptionManagerMock(t, testRunEvent)

eventEmitter := executor.NewEventEmitter(repository, subscriptionManager)

err := eventEmitter.Emit(context.Background(), testRunEvent)
require.Error(t, err)

// Then I expect that it was not persisted
assert.Len(t, repository.events, 0)

// And that it was not sent to subscribers
assert.Len(t, subscriber.events, 0)
}

// TestRunEventRepository
type testRunEventRepositoryMock struct {
testdb.MockRepository
events []model.TestRunEvent
returnError bool
// ...
}

func (m *testRunEventRepositoryMock) CreateTestRunEvent(ctx context.Context, event model.TestRunEvent) error {
if m.returnError {
return errors.New("error on persistence")
}

m.events = append(m.events, event)
return nil
}

func getTestRunEventRepositoryMock(t *testing.T, returnError bool) *testRunEventRepositoryMock {
t.Helper()

mock := new(testRunEventRepositoryMock)
mock.T = t
mock.Test(t)

mock.events = []model.TestRunEvent{}
mock.returnError = returnError

return mock
}

// TestRunEventSubscriber
type testRunEventSubscriber struct {
events []model.TestRunEvent
}

func (s *testRunEventSubscriber) ID() string {
return "some-id"
}

func (s *testRunEventSubscriber) Notify(message subscription.Message) error {
event := message.Content.(model.TestRunEvent)
s.events = append(s.events, event)
return nil
}

func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (*subscription.Manager, *testRunEventSubscriber) {
t.Helper()

subscriptionManager := subscription.NewManager()
subscriber := &testRunEventSubscriber{
events: []model.TestRunEvent{},
}

subscriptionManager.Subscribe(event.ResourceID(), subscriber)

return subscriptionManager, subscriber
}
Loading

0 comments on commit f56d33e

Please sign in to comment.