Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): adding event emitter #2269

Merged
merged 8 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion local-config/tracetest.provision.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
dataStore:
---
type: DataStore
spec:
name: OpenTelemetry Collector
type: otlp
isdefault: true
72 changes: 4 additions & 68 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/config/configresource"
"github.com/kubeshop/tracetest/server/config/demoresource"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
httpServer "github.com/kubeshop/tracetest/server/http"
Expand Down Expand Up @@ -142,16 +140,16 @@ func (app *App) Start(opts ...appOption) error {
testdb.WithDB(db),
)

if err != nil {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
app.subscribeToConfigChanges(subscriptionManager)

configRepo := configresource.NewRepository(db, configresource.WithPublisher(subscriptionManager))
configFromDB := configRepo.Current(ctx)

if err != nil {
log.Fatal(err)
}

tracer, err := tracing.NewTracer(ctx, app.cfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -321,68 +319,6 @@ func registerDemosResource(repository *demoresource.Repository, router *mux.Rout
provisioner.AddResourceProvisioner(manager)
}

type facadeConfig interface {
PoolingRetryDelay() time.Duration
PoolingMaxWaitTimeForTraceDuration() time.Duration
}

func newRunnerFacades(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved to ./server/app/facade.go to keep all concerns about the façade there.

ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}

func getTriggerRegistry(tracer, appTracer trace.Tracer) *trigger.Registry {
triggerReg := trigger.NewRegsitry(tracer, appTracer)
triggerReg.Add(trigger.HTTP())
Expand Down
68 changes: 68 additions & 0 deletions server/app/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import (
"context"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
Expand All @@ -25,3 +30,66 @@ func (rf runnerFacade) RunTransaction(ctx context.Context, tr model.Transaction,
func (rf runnerFacade) RunAssertions(ctx context.Context, request executor.AssertionRequest) {
rf.assertionRunner.RunAssertions(ctx, request)
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {
eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
eventEmitter,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(
runner,
testDB,
subscriptionManager,
)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}
1 change: 1 addition & 0 deletions server/executor/assertion_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewAssertionRunner(
assertionExecutor AssertionExecutor,
op OutputsProcessorFn,
subscriptionManager *subscription.Manager,
eventEmitter EventEmitter,
) AssertionRunner {
return &defaultAssertionRunner{
outputsProcessor: op,
Expand Down
38 changes: 38 additions & 0 deletions server/executor/eventemitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package executor

import (
"context"

"github.com/kubeshop/tracetest/server/model"
)

type EventEmitter interface {
Emit(ctx context.Context, event model.TestRunEvent) error
}

type publisher interface {
Publish(eventID string, message any)
}

type internalEventEmitter struct {
repository model.TestRunEventRepository
publisher publisher
}

func NewEventEmitter(repository model.TestRunEventRepository, publisher publisher) EventEmitter {
return &internalEventEmitter{
repository: repository,
publisher: publisher,
}
}

func (em *internalEventEmitter) Emit(ctx context.Context, event model.TestRunEvent) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the next PRs we can evolve this method to also manage/start trace spans with each phase that we designed.

err := em.repository.CreateTestRunEvent(ctx, event)
if err != nil {
return err
}

em.publisher.Publish(event.ResourceID(), event)

return nil
}
153 changes: 153 additions & 0 deletions server/executor/eventemitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package executor_test

import (
"context"
"errors"
"testing"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/testdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEventEmitter_SuccessfulScenario(t *testing.T) {
// Given I have a test run event

run := model.NewRun()

test := model.Test{
ID: id.ID("some-test"),
ServiceUnderTest: model.Trigger{
Type: model.TriggerTypeHTTP,
},
}

testRunEvent := model.TestRunEvent{
TestID: test.ID,
RunID: run.ID,
Type: "EVENT_1",
Stage: model.StageTrigger,
Title: "OP 1",
Description: "This happened",
}

// When I emit this event successfully
repository := getTestRunEventRepositoryMock(t, false)
subscriptionManager, subscriber := getSubscriptionManagerMock(t, testRunEvent)

eventEmitter := executor.NewEventEmitter(repository, subscriptionManager)

err := eventEmitter.Emit(context.Background(), testRunEvent)
require.NoError(t, err)

// Then I expect that it was persisted
assert.Len(t, repository.events, 1)
assert.Equal(t, testRunEvent.Title, repository.events[0].Title)
assert.Equal(t, testRunEvent.Stage, repository.events[0].Stage)
assert.Equal(t, testRunEvent.Description, repository.events[0].Description)

// And that it was sent to subscribers
assert.Len(t, subscriber.events, 1)
assert.Equal(t, testRunEvent.Title, subscriber.events[0].Title)
assert.Equal(t, testRunEvent.Stage, subscriber.events[0].Stage)
assert.Equal(t, testRunEvent.Description, subscriber.events[0].Description)
}

func TestEventEmitter_FailedScenario(t *testing.T) {
// Given I have a test run event

run := model.NewRun()

test := model.Test{
ID: id.ID("some-test"),
ServiceUnderTest: model.Trigger{
Type: model.TriggerTypeHTTP,
},
}

testRunEvent := model.TestRunEvent{
TestID: test.ID,
RunID: run.ID,
Type: "EVENT_1",
Stage: model.StageTrigger,
Title: "OP 1",
Description: "This happened",
}

// When I emit this event and it fails
repository := getTestRunEventRepositoryMock(t, true)
subscriptionManager, subscriber := getSubscriptionManagerMock(t, testRunEvent)

eventEmitter := executor.NewEventEmitter(repository, subscriptionManager)

err := eventEmitter.Emit(context.Background(), testRunEvent)
require.Error(t, err)

// Then I expect that it was not persisted
assert.Len(t, repository.events, 0)

// And that it was not sent to subscribers
assert.Len(t, subscriber.events, 0)
}

// TestRunEventRepository
type testRunEventRepositoryMock struct {
testdb.MockRepository
events []model.TestRunEvent
returnError bool
// ...
}

func (m *testRunEventRepositoryMock) CreateTestRunEvent(ctx context.Context, event model.TestRunEvent) error {
if m.returnError {
return errors.New("error on persistence")
}

m.events = append(m.events, event)
return nil
}

func getTestRunEventRepositoryMock(t *testing.T, returnError bool) *testRunEventRepositoryMock {
t.Helper()

mock := new(testRunEventRepositoryMock)
mock.T = t
mock.Test(t)

mock.events = []model.TestRunEvent{}
mock.returnError = returnError

return mock
}

// TestRunEventSubscriber
type testRunEventSubscriber struct {
events []model.TestRunEvent
}

func (s *testRunEventSubscriber) ID() string {
return "some-id"
}

func (s *testRunEventSubscriber) Notify(message subscription.Message) error {
event := message.Content.(model.TestRunEvent)
s.events = append(s.events, event)
return nil
}

func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (*subscription.Manager, *testRunEventSubscriber) {
t.Helper()

subscriptionManager := subscription.NewManager()
subscriber := &testRunEventSubscriber{
events: []model.TestRunEvent{},
}

subscriptionManager.Subscribe(event.ResourceID(), subscriber)

return subscriptionManager, subscriber
}
Loading