diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml
index 11749e42d..e62521ada 100644
--- a/flyteadmin_config.yaml
+++ b/flyteadmin_config.yaml
@@ -78,6 +78,14 @@ notifications:
Execution \"{{ name }}\" has {{ phase }} in \"{{ domain }}\". View details at
http://example.com/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}. {{ error }}
+externalEvents:
+ Enable: false
+ type: gcp
+ gcp:
+ projectId: "foo"
+ eventsPublisher:
+ topicName: "bar"
+ eventTypes: all
Logger:
show-source: true
level: 6
diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go
index 83ee8c663..2293ff926 100644
--- a/pkg/async/notifications/factory.go
+++ b/pkg/async/notifications/factory.go
@@ -148,3 +148,53 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
return implementations.NewNoopPublish()
}
}
+
+func NewEventsPublisher(config runtimeInterfaces.ExternalEventsConfig, scope promutils.Scope) interfaces.Publisher {
+ if !config.Enable {
+ return implementations.NewNoopPublish()
+ }
+ reconnectAttempts := config.ReconnectAttempts
+ reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
+ switch config.Type {
+ case common.AWS:
+ snsConfig := gizmoAWS.SNSConfig{
+ Topic: config.EventsPublisherConfig.TopicName,
+ }
+ snsConfig.Region = config.AWSConfig.Region
+
+ var publisher pubsub.Publisher
+ var err error
+ err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
+ publisher, err = gizmoAWS.NewPublisher(snsConfig)
+ return err
+ })
+
+ // Any persistent errors initiating Publisher with Amazon configurations results in a failed start up.
+ if err != nil {
+ panic(err)
+ }
+ return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
+ case common.GCP:
+ pubsubConfig := gizmoGCP.Config{
+ Topic: config.EventsPublisherConfig.TopicName,
+ }
+ pubsubConfig.ProjectID = config.GCPConfig.ProjectID
+ var publisher pubsub.MultiPublisher
+ var err error
+ err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
+ publisher, err = gizmoGCP.NewPublisher(context.TODO(), pubsubConfig)
+ return err
+ })
+
+ if err != nil {
+ panic(err)
+ }
+ return implementations.NewEventsPublisher(publisher, scope, config.EventsPublisherConfig.EventTypes)
+ case common.Local:
+ fallthrough
+ default:
+ logger.Infof(context.Background(),
+ "Using default noop events publisher implementation for config type [%s]", config.Type)
+ return implementations.NewNoopPublish()
+ }
+}
diff --git a/pkg/async/notifications/implementations/event_publisher.go b/pkg/async/notifications/implementations/event_publisher.go
new file mode 100644
index 000000000..9f306af8e
--- /dev/null
+++ b/pkg/async/notifications/implementations/event_publisher.go
@@ -0,0 +1,105 @@
+package implementations
+
+import (
+ "context"
+
+ "k8s.io/apimachinery/pkg/util/sets"
+
+ "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
+
+ "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"
+
+ "github.com/NYTimes/gizmo/pubsub"
+ "github.com/golang/protobuf/proto"
+ "github.com/lyft/flytestdlib/logger"
+ "github.com/lyft/flytestdlib/promutils"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type eventPublisherSystemMetrics struct {
+ Scope promutils.Scope
+ PublishTotal prometheus.Counter
+ PublishSuccess prometheus.Counter
+ PublishError prometheus.Counter
+}
+
+// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
+type EventPublisher struct {
+ pub pubsub.Publisher
+ systemMetrics eventPublisherSystemMetrics
+ events sets.String
+}
+
+var taskExecutionReq admin.TaskExecutionEventRequest
+var nodeExecutionReq admin.NodeExecutionEventRequest
+var workflowExecutionReq admin.WorkflowExecutionEventRequest
+
+const (
+ Task = "task"
+ Node = "node"
+ Workflow = "workflow"
+ AllTypes = "all"
+ AllTypesShort = "*"
+)
+
+var supportedEvents = map[string]string{
+ Task: proto.MessageName(&taskExecutionReq),
+ Node: proto.MessageName(&nodeExecutionReq),
+ Workflow: proto.MessageName(&workflowExecutionReq),
+}
+
+// The key is the notification type as defined as an enum.
+func (p *EventPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
+ p.systemMetrics.PublishTotal.Inc()
+
+ if !p.shouldPublishEvent(notificationType) {
+ return nil
+ }
+ logger.Debugf(ctx, "Publishing the following message [%+v]", msg)
+
+ err := p.pub.Publish(ctx, notificationType, msg)
+ if err != nil {
+ p.systemMetrics.PublishError.Inc()
+ logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
+ } else {
+ p.systemMetrics.PublishSuccess.Inc()
+ }
+ return err
+}
+
+func (p *EventPublisher) shouldPublishEvent(notificationType string) bool {
+ return p.events.Has(notificationType)
+}
+
+func newEventPublisherSystemMetrics(scope promutils.Scope) eventPublisherSystemMetrics {
+ return eventPublisherSystemMetrics{
+ Scope: scope,
+ PublishTotal: scope.MustNewCounter("event_publish_total", "overall count of publish messages"),
+ PublishSuccess: scope.MustNewCounter("event_publish_success", "success count of publish messages"),
+ PublishError: scope.MustNewCounter("event_publish_errors", "count of publish errors"),
+ }
+}
+
+func NewEventsPublisher(pub pubsub.Publisher, scope promutils.Scope, eventTypes []string) interfaces.Publisher {
+ eventSet := sets.NewString()
+
+ for _, event := range eventTypes {
+ if event == AllTypes || event == AllTypesShort {
+ for _, e := range supportedEvents {
+ eventSet = eventSet.Insert(e)
+ }
+ break
+ }
+ if e, found := supportedEvents[event]; found {
+ eventSet = eventSet.Insert(e)
+ } else {
+ logger.Errorf(context.Background(), "Unsupported event type [%s] in the config")
+ }
+ }
+
+ return &EventPublisher{
+ pub: pub,
+ systemMetrics: newEventPublisherSystemMetrics(scope.NewSubScope("events_publisher")),
+ events: eventSet,
+ }
+}
diff --git a/pkg/async/notifications/implementations/event_publisher_test.go b/pkg/async/notifications/implementations/event_publisher_test.go
new file mode 100644
index 000000000..dbfed5607
--- /dev/null
+++ b/pkg/async/notifications/implementations/event_publisher_test.go
@@ -0,0 +1,173 @@
+package implementations
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/ptypes"
+ "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
+ "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
+ "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
+
+ "github.com/NYTimes/gizmo/pubsub"
+ "github.com/NYTimes/gizmo/pubsub/pubsubtest"
+ "github.com/golang/protobuf/proto"
+ "github.com/lyft/flytestdlib/promutils"
+ "github.com/stretchr/testify/assert"
+)
+
+var testEventPublisher pubsubtest.TestPublisher
+var mockEventPublisher pubsub.Publisher = &testEventPublisher
+
+var executionID = core.WorkflowExecutionIdentifier{
+ Project: "project",
+ Domain: "domain",
+ Name: "name",
+}
+var nodeExecutionID = core.NodeExecutionIdentifier{
+ NodeId: "node id",
+ ExecutionId: &executionID,
+}
+
+var taskID = &core.Identifier{
+ ResourceType: core.ResourceType_TASK,
+ Project: "p",
+ Domain: "d",
+ Version: "v",
+ Name: "n",
+}
+
+var occurredAt = time.Now().UTC()
+var occurredAtProto, _ = ptypes.TimestampProto(occurredAt)
+
+var taskPhase = core.TaskExecution_RUNNING
+
+var retryAttempt = uint32(1)
+
+const requestID = "request id"
+
+var taskRequest = &admin.TaskExecutionEventRequest{
+ RequestId: requestID,
+ Event: &event.TaskExecutionEvent{
+ TaskId: taskID,
+ ParentNodeExecutionId: &nodeExecutionID,
+ RetryAttempt: retryAttempt,
+ Phase: taskPhase,
+ OccurredAt: occurredAtProto,
+ },
+}
+
+var nodeRequest = &admin.NodeExecutionEventRequest{
+ RequestId: requestID,
+ Event: &event.NodeExecutionEvent{
+ ProducerId: "propeller",
+ Id: &nodeExecutionID,
+ OccurredAt: occurredAtProto,
+ Phase: core.NodeExecution_RUNNING,
+ InputUri: "input uri",
+ },
+}
+
+var workflowRequest = &admin.WorkflowExecutionEventRequest{
+ Event: &event.WorkflowExecutionEvent{
+ Phase: core.WorkflowExecution_SUCCEEDED,
+ OutputResult: &event.WorkflowExecutionEvent_OutputUri{
+ OutputUri: "somestring",
+ },
+ ExecutionId: &executionID,
+ },
+}
+
+// This method should be invoked before every test around Publisher.
+func initializeEventPublisher() {
+ testEventPublisher.Published = nil
+ testEventPublisher.GivenError = nil
+ testEventPublisher.FoundError = nil
+}
+
+func TestNewEventsPublisher_EventTypes(t *testing.T) {
+ {
+ tests := []struct {
+ name string
+ eventTypes []string
+ events []proto.Message
+ shouldSendEvent []bool
+ expectedSendCnt int
+ }{
+ {"eventTypes as workflow,node", []string{"workflow", "node"},
+ []proto.Message{workflowRequest, nodeRequest, taskRequest},
+ []bool{true, true, false},
+ 2},
+ {"eventTypes as workflow,task", []string{"workflow", "task"},
+ []proto.Message{workflowRequest, nodeRequest, taskRequest},
+ []bool{true, false, true},
+ 2},
+ {"eventTypes as workflow,task", []string{"node", "task"},
+ []proto.Message{workflowRequest, nodeRequest, taskRequest},
+ []bool{false, true, true},
+ 2},
+ {"eventTypes as task", []string{"task"},
+ []proto.Message{taskRequest},
+ []bool{true},
+ 1},
+ {"eventTypes as node", []string{"node"},
+ []proto.Message{nodeRequest},
+ []bool{true},
+ 1},
+ {"eventTypes as workflow", []string{"workflow"},
+ []proto.Message{workflowRequest},
+ []bool{true},
+ 1},
+ {"eventTypes as workflow", []string{"workflow"},
+ []proto.Message{nodeRequest, taskRequest},
+ []bool{false, false},
+ 0},
+ {"eventTypes as task", []string{"task"},
+ []proto.Message{workflowRequest, nodeRequest},
+ []bool{false, false},
+ 0},
+ {"eventTypes as node", []string{"node"},
+ []proto.Message{workflowRequest, taskRequest},
+ []bool{false, false},
+ 0},
+ {"eventTypes as all", []string{"all"},
+ []proto.Message{workflowRequest, nodeRequest, taskRequest},
+ []bool{true, true, true},
+ 3},
+ {"eventTypes as *", []string{"*"},
+ []proto.Message{workflowRequest, nodeRequest, taskRequest},
+ []bool{true, true, true},
+ 3},
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ initializeEventPublisher()
+ var currentEventPublisher = NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), test.eventTypes)
+ var cnt = 0
+ for id, event := range test.events {
+ assert.Nil(t, currentEventPublisher.Publish(context.Background(), proto.MessageName(event),
+ event))
+ if test.shouldSendEvent[id] {
+ assert.Equal(t, proto.MessageName(event), testEventPublisher.Published[cnt].Key)
+ marshalledData, err := proto.Marshal(event)
+ assert.Nil(t, err)
+ assert.Equal(t, marshalledData, testEventPublisher.Published[cnt].Body)
+ cnt++
+ }
+ }
+ assert.Equal(t, test.expectedSendCnt, len(testEventPublisher.Published))
+ })
+ }
+ }
+}
+
+func TestEventPublisher_PublishError(t *testing.T) {
+ initializeEventPublisher()
+ currentEventPublisher := NewEventsPublisher(mockEventPublisher, promutils.NewTestScope(), []string{"*"})
+ var publishError = errors.New("publish() returns an error")
+ testEventPublisher.GivenError = publishError
+ assert.Equal(t, publishError, currentEventPublisher.Publish(context.Background(),
+ proto.MessageName(taskRequest), taskRequest))
+}
diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go
index dfe85367e..1453fb88c 100644
--- a/pkg/manager/impl/execution_manager.go
+++ b/pkg/manager/impl/execution_manager.go
@@ -64,6 +64,7 @@ type executionSystemMetrics struct {
SpecSizeBytes prometheus.Summary
ClosureSizeBytes prometheus.Summary
AcceptanceDelay prometheus.Summary
+ PublishEventError prometheus.Counter
}
type executionUserMetrics struct {
@@ -89,6 +90,7 @@ type ExecutionManager struct {
namedEntityManager interfaces.NamedEntityInterface
resourceManager interfaces.ResourceInterface
qualityOfServiceAllocator executions.QualityOfServiceAllocator
+ eventPublisher notificationInterfaces.Publisher
}
func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
@@ -1004,6 +1006,10 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, err
}
}
+ if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
+ m.systemMetrics.PublishEventError.Inc()
+ logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
+ }
m.systemMetrics.ExecutionEventsCreated.Inc()
return &admin.WorkflowExecutionEventResponse{}, nil
@@ -1326,20 +1332,12 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized execution closure"),
AcceptanceDelay: scope.MustNewSummary("acceptance_delay",
"delay in seconds from when an execution was requested to be created and when it actually was"),
+ PublishEventError: scope.MustNewCounter("publish_event_error",
+ "overall count of publish event errors when invoking publish()"),
}
}
-func NewExecutionManager(
- db repositories.RepositoryInterface,
- config runtimeInterfaces.Configuration,
- storageClient *storage.DataStore,
- workflowExecutor workflowengineInterfaces.Executor,
- systemScope promutils.Scope,
- userScope promutils.Scope,
- publisher notificationInterfaces.Publisher,
- urlData dataInterfaces.RemoteURLInterface,
- workflowManager interfaces.WorkflowInterface,
- namedEntityManager interfaces.NamedEntityInterface) interfaces.ExecutionInterface {
+func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope, userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)
@@ -1369,6 +1367,7 @@ func NewExecutionManager(
namedEntityManager: namedEntityManager,
resourceManager: resourceManager,
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
+ eventPublisher: eventPublisher,
}
}
diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go
index f0e5d85ba..1e5dba15e 100644
--- a/pkg/manager/impl/execution_manager_test.go
+++ b/pkg/manager/impl/execution_manager_test.go
@@ -256,9 +256,7 @@ func TestCreateExecution(t *testing.T) {
mockConfig := getMockExecutionsConfigProvider()
mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider)
- execManager := NewExecutionManager(
- repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor,
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher)
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Principal: "unused - populated from authenticated context",
@@ -332,9 +330,7 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
},
)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_CHILD_WORKFLOW,
@@ -372,8 +368,7 @@ func TestCreateExecution_NoAssignedName(t *testing.T) {
Cluster: testCluster,
}, nil
})
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Name = ""
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
@@ -421,8 +416,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
Cluster: testCluster,
}, nil
})
- execManager := NewExecutionManager(
- repository, configProvider, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, configProvider, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
@@ -438,9 +432,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
func TestCreateExecutionValidationError(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Domain = ""
@@ -452,9 +444,7 @@ func TestCreateExecutionValidationError(t *testing.T) {
func TestCreateExecution_InvalidLpIdentifier(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Spec.LaunchPlan = nil
@@ -466,9 +456,7 @@ func TestCreateExecution_InvalidLpIdentifier(t *testing.T) {
func TestCreateExecutionInCompatibleInputs(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Inputs = &core.LiteralMap{
@@ -492,8 +480,7 @@ func TestCreateExecutionPropellerFailure(t *testing.T) {
return nil, expectedErr
}
mockExecutor.(*workflowengineMocks.MockExecutor).SetExecuteWorkflowCallback(createFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
@@ -512,9 +499,7 @@ func TestCreateExecutionDatabaseFailure(t *testing.T) {
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
@@ -577,9 +562,7 @@ func TestCreateExecutionVerifyDbModel(t *testing.T) {
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execManager.(*ExecutionManager)._clock = mockClock
@@ -617,9 +600,7 @@ func TestCreateExecutionDefaultNotifications(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(),
- mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
@@ -653,9 +634,7 @@ func TestCreateExecutionDisableNotifications(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(),
- mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
@@ -720,9 +699,7 @@ func TestCreateExecutionNoNotifications(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(exCreateFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
@@ -751,9 +728,7 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) {
Cluster: testCluster,
}, nil
})
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor,
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Spec.Labels = &admin.Labels{
Values: map[string]string{
@@ -833,9 +808,7 @@ func TestRelaunchExecution(t *testing.T) {
// Set up mocks.
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
startTime := time.Now()
startTimeProto, _ := ptypes.TimestampProto(startTime)
existingClosure := admin.ExecutionClosure{
@@ -892,9 +865,7 @@ func TestRelaunchExecution_GetExistingFailure(t *testing.T) {
// Set up mocks.
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
expectedErr := errors.New("expected error")
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
@@ -928,9 +899,7 @@ func TestRelaunchExecution_CreateFailure(t *testing.T) {
// Set up mocks.
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
startTime := time.Now()
startTimeProto, _ := ptypes.TimestampProto(startTime)
existingClosure := admin.ExecutionClosure{
@@ -1015,9 +984,7 @@ func TestCreateWorkflowEvent(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
@@ -1055,9 +1022,7 @@ func TestCreateWorkflowEvent_TerminalState(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
@@ -1112,9 +1077,7 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, &mockPublisher)
occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
@@ -1153,9 +1116,7 @@ func TestCreateWorkflowEvent_DuplicateRunning(t *testing.T) {
},
)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
@@ -1196,9 +1157,7 @@ func TestCreateWorkflowEvent_InvalidPhaseChange(t *testing.T) {
},
)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
occurredAtTimestamp, _ := ptypes.TimestampProto(occurredAt)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
@@ -1232,9 +1191,7 @@ func TestCreateWorkflowEvent_InvalidEvent(t *testing.T) {
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
@@ -1262,9 +1219,7 @@ func TestCreateWorkflowEvent_UpdateModelError(t *testing.T) {
Message: "bar baz",
}
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
@@ -1296,9 +1251,7 @@ func TestCreateWorkflowEvent_DatabaseGetError(t *testing.T) {
Code: "foo",
Message: "bar baz",
}
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
@@ -1332,9 +1285,7 @@ func TestCreateWorkflowEvent_DatabaseUpdateError(t *testing.T) {
return expectedErr
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.CreateWorkflowEvent(context.Background(), admin.WorkflowExecutionEventRequest{
RequestId: "1",
Event: &event.WorkflowExecutionEvent{
@@ -1373,9 +1324,7 @@ func TestGetExecution(t *testing.T) {
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{
Id: &executionIdentifier,
})
@@ -1396,9 +1345,7 @@ func TestGetExecution_DatabaseError(t *testing.T) {
return models.Execution{}, expectedErr
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{
Id: &executionIdentifier,
})
@@ -1428,9 +1375,7 @@ func TestGetExecution_TransformerError(t *testing.T) {
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{
Id: &executionIdentifier,
})
@@ -1490,9 +1435,7 @@ func TestListExecutions(t *testing.T) {
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
executionList, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
@@ -1523,10 +1466,7 @@ func TestListExecutions(t *testing.T) {
}
func TestListExecutions_MissingParameters(t *testing.T) {
- execManager := NewExecutionManager(
- repositoryMocks.NewMockRepository(),
- getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
_, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
Domain: domainValue,
@@ -1563,9 +1503,7 @@ func TestListExecutions_DatabaseError(t *testing.T) {
return interfaces.ExecutionCollectionOutput{}, expectedErr
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
_, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
Project: projectValue,
@@ -1596,9 +1534,7 @@ func TestListExecutions_TransformerError(t *testing.T) {
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
executionList, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
@@ -1905,9 +1841,7 @@ func TestTerminateExecution(t *testing.T) {
}, input.ExecutionID))
return nil
})
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor,
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
ctx := context.WithValue(context.Background(), auth.PrincipalContextKey, principal)
resp, err := execManager.TerminateExecution(ctx, admin.ExecutionTerminateRequest{
@@ -1937,9 +1871,7 @@ func TestTerminateExecution_PropellerError(t *testing.T) {
t.Fatal("update should not be called when propeller fails to terminate an execution")
return nil
})
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor,
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.TerminateExecution(context.Background(), admin.ExecutionTerminateRequest{
Id: &core.WorkflowExecutionIdentifier{
@@ -1966,9 +1898,7 @@ func TestTerminateExecution_DatabaseError(t *testing.T) {
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecutionFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
resp, err := execManager.TerminateExecution(context.Background(), admin.ExecutionTerminateRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
@@ -2056,9 +1986,7 @@ func TestGetExecutionData(t *testing.T) {
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{
Id: &executionIdentifier,
})
@@ -2087,9 +2015,7 @@ func TestAddLabelsAndAnnotationsRuntimeLimitsObserved(t *testing.T) {
configProvider := getMockExecutionsConfigProvider()
configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
mockRegistrationValidationConfig)
- execManager := NewExecutionManager(
- repository, configProvider, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, configProvider, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
request := testutils.GetExecutionRequest()
request.Spec.Labels = &admin.Labels{
Values: map[string]string{
@@ -2153,9 +2079,7 @@ func TestAddPluginOverrides(t *testing.T) {
}
partiallyPopulatedInputs := workflowengineInterfaces.ExecuteWorkflowInput{}
- execManager := NewExecutionManager(
- db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
taskPluginOverrides, err := execManager.(*ExecutionManager).addPluginOverrides(
context.Background(), executionID, workflowName, launchPlanName)
@@ -2186,9 +2110,7 @@ func TestPluginOverrides_ResourceGetFailure(t *testing.T) {
models.Resource, error) {
return models.Resource{}, flyteAdminErrors.NewFlyteAdminErrorf(codes.Aborted, "uh oh")
}
- execManager := NewExecutionManager(
- db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
_, err := execManager.(*ExecutionManager).addPluginOverrides(
context.Background(), executionID, workflowName, launchPlanName)
@@ -2217,9 +2139,7 @@ func TestGetExecution_Legacy(t *testing.T) {
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{
Id: &executionIdentifier,
})
@@ -2254,9 +2174,7 @@ func TestGetExecution_LegacyClient_OffloadedData(t *testing.T) {
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
storageClient := getMockStorageForExecTest(context.Background())
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
_ = storageClient.WriteProtobuf(context.Background(), storage.DataReference(shared.UserInputs), storage.Options{}, getLegacySpec().Inputs)
_ = storageClient.WriteProtobuf(context.Background(), storage.DataReference(shared.Inputs), storage.Options{}, getLegacyClosure().ComputedInputs)
execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{
@@ -2316,9 +2234,7 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
storageClient := getMockStorageForExecTest(context.Background())
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{
Id: &executionIdentifier,
})
@@ -2363,9 +2279,7 @@ func TestCreateExecution_LegacyClient(t *testing.T) {
Cluster: testCluster,
}, nil
})
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor,
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
response, err := execManager.CreateExecution(context.Background(), *getLegacyExecutionRequest(), requestedAt)
assert.Nil(t, err)
@@ -2381,9 +2295,7 @@ func TestRelaunchExecution_LegacyModel(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
storageClient := getMockStorageForExecTest(context.Background())
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), storageClient, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
startTime := time.Now()
startTimeProto, _ := ptypes.TimestampProto(startTime)
existingClosure := getLegacyClosure()
@@ -2494,9 +2406,7 @@ func TestListExecutions_LegacyModel(t *testing.T) {
}, nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetListCallback(executionListFunc)
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
executionList, err := execManager.ListExecutions(context.Background(), admin.ResourceListRequest{
Id: &admin.NamedEntityIdentifier{
@@ -2698,9 +2608,7 @@ func TestSetDefaults(t *testing.T) {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig,
runtimeMocks.NewMockWhitelistConfiguration(), nil)
- execManager := NewExecutionManager(
- repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow")
assert.True(t, proto.Equal(
&core.Container{
@@ -2767,9 +2675,7 @@ func TestSetDefaults_MissingDefaults(t *testing.T) {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig,
runtimeMocks.NewMockWhitelistConfiguration(), nil)
- execManager := NewExecutionManager(
- repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
+ execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil)
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow")
assert.True(t, proto.Equal(
&core.Container{
@@ -2952,9 +2858,7 @@ func TestCreateSingleTaskExecution(t *testing.T) {
getMockWorkflowConfigProvider(), getMockWorkflowCompiler(), mockStorage,
storagePrefix, mockScope.NewTestScope())
namedEntityManager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
- execManager := NewExecutionManager(
- repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(),
- mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, workflowManager, namedEntityManager)
+ execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, workflowManager, namedEntityManager, nil)
request := admin.ExecutionCreateRequest{
Project: "flytekit",
Domain: "production",
diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go
index 3e37e9c87..87f2dbbd6 100644
--- a/pkg/manager/impl/node_execution_manager.go
+++ b/pkg/manager/impl/node_execution_manager.go
@@ -4,6 +4,9 @@ import (
"context"
"strconv"
+ "github.com/golang/protobuf/proto"
+ notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"
+
"github.com/lyft/flytestdlib/storage"
"github.com/lyft/flytestdlib/contextutils"
@@ -43,14 +46,16 @@ type nodeExecutionMetrics struct {
ClosureSizeBytes prometheus.Summary
NodeExecutionInputBytes prometheus.Summary
NodeExecutionOutputBytes prometheus.Summary
+ PublishEventError prometheus.Counter
}
type NodeExecutionManager struct {
- db repositories.RepositoryInterface
- config runtimeInterfaces.Configuration
- storageClient *storage.DataStore
- metrics nodeExecutionMetrics
- urlData dataInterfaces.RemoteURLInterface
+ db repositories.RepositoryInterface
+ config runtimeInterfaces.Configuration
+ storageClient *storage.DataStore
+ metrics nodeExecutionMetrics
+ urlData dataInterfaces.RemoteURLInterface
+ eventPublisher notificationInterfaces.Publisher
}
type updateNodeExecutionStatus int
@@ -232,6 +237,11 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
}
m.metrics.NodeExecutionEventsCreated.Inc()
+ if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
+ m.metrics.PublishEventError.Inc()
+ logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
+ }
+
return &admin.NodeExecutionEventResponse{}, nil
}
@@ -427,9 +437,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return response, nil
}
-func NewNodeExecutionManager(
- db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore,
- scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface {
+func NewNodeExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, eventPublisher notificationInterfaces.Publisher) interfaces.NodeExecutionInterface {
metrics := nodeExecutionMetrics{
Scope: scope,
ActiveNodeExecutions: scope.MustNewGauge("active_node_executions",
@@ -448,12 +456,15 @@ func NewNodeExecutionManager(
"size in bytes of serialized node execution inputs"),
NodeExecutionOutputBytes: scope.MustNewSummary("output_size_bytes",
"size in bytes of serialized node execution outputs"),
+ PublishEventError: scope.MustNewCounter("publish_event_error",
+ "overall count of publish event errors when invoking publish()"),
}
return &NodeExecutionManager{
- db: db,
- config: config,
- storageClient: storageClient,
- metrics: metrics,
- urlData: urlData,
+ db: db,
+ config: config,
+ storageClient: storageClient,
+ metrics: metrics,
+ urlData: urlData,
+ eventPublisher: eventPublisher,
}
}
diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go
index 676baa495..aa0306ae0 100644
--- a/pkg/manager/impl/node_execution_manager_test.go
+++ b/pkg/manager/impl/node_execution_manager_test.go
@@ -135,7 +135,8 @@ func TestCreateNodeEvent(t *testing.T) {
return nil
})
nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL,
+ &mockPublisher)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.Nil(t, err)
assert.NotNil(t, resp)
@@ -191,7 +192,7 @@ func TestCreateNodeEvent_Update(t *testing.T) {
return nil
})
nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, &mockPublisher)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.Nil(t, err)
assert.NotNil(t, resp)
@@ -204,8 +205,7 @@ func TestCreateNodeEvent_MissingExecution(t *testing.T) {
func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) {
return models.Execution{}, expectedErr
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, &mockPublisher)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.EqualError(t, err, "failed to get existing execution id: [project:\"project\""+
" domain:\"domain\" name:\"name\" ] with err: expected error")
@@ -225,8 +225,7 @@ func TestCreateNodeEvent_CreateDatabaseError(t *testing.T) {
func(ctx context.Context, event *models.NodeExecutionEvent, input *models.NodeExecution) error {
return expectedErr
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.EqualError(t, err, expectedErr.Error())
assert.Nil(t, resp)
@@ -261,8 +260,7 @@ func TestCreateNodeEvent_UpdateDatabaseError(t *testing.T) {
func(ctx context.Context, event *models.NodeExecutionEvent, nodeExecution *models.NodeExecution) error {
return expectedErr
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.EqualError(t, err, expectedErr.Error())
assert.Nil(t, resp)
@@ -291,8 +289,7 @@ func TestCreateNodeEvent_UpdateTerminalEventError(t *testing.T) {
StartedAt: &occurredAt,
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.Nil(t, resp)
assert.NotNil(t, err)
@@ -327,8 +324,7 @@ func TestCreateNodeEvent_UpdateDuplicateEventError(t *testing.T) {
StartedAt: &occurredAt,
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request)
assert.Equal(t, codes.AlreadyExists, err.(flyteAdminErrors.FlyteAdminError).Code())
assert.Nil(t, resp)
@@ -341,8 +337,7 @@ func TestCreateNodeEvent_FirstEventIsTerminal(t *testing.T) {
func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) {
return models.NodeExecution{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "foo")
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, &mockPublisher)
succeededRequest := admin.NodeExecutionEventRequest{
RequestId: "request id",
Event: &event.NodeExecutionEvent{
@@ -403,8 +398,7 @@ func TestGetNodeExecution(t *testing.T) {
NodeExecutionMetadata: metadataBytes,
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Id: &nodeExecutionIdentifier,
})
@@ -467,8 +461,7 @@ func TestGetNodeExecutionParentNode(t *testing.T) {
},
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Id: &nodeExecutionIdentifier,
})
@@ -489,8 +482,7 @@ func TestGetNodeExecution_DatabaseError(t *testing.T) {
func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) {
return models.NodeExecution{}, expectedErr
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Id: &nodeExecutionIdentifier,
})
@@ -517,8 +509,7 @@ func TestGetNodeExecution_TransformerError(t *testing.T) {
Closure: []byte("i'm invalid"),
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Id: &nodeExecutionIdentifier,
})
@@ -587,8 +578,7 @@ func TestListNodeExecutionsLevelZero(t *testing.T) {
},
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
@@ -687,8 +677,7 @@ func TestListNodeExecutionsWithParent(t *testing.T) {
},
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
@@ -722,8 +711,7 @@ func TestListNodeExecutionsWithParent(t *testing.T) {
}
func TestListNodeExecutions_InvalidParams(t *testing.T) {
- nodeExecManager := NewNodeExecutionManager(nil, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(nil, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
_, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
Filters: "eq(execution.project, project)",
})
@@ -749,8 +737,7 @@ func TestListNodeExecutions_DatabaseError(t *testing.T) {
interfaces.NodeExecutionCollectionOutput, error) {
return interfaces.NodeExecutionCollectionOutput{}, expectedErr
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
@@ -788,8 +775,7 @@ func TestListNodeExecutions_TransformerError(t *testing.T) {
},
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
@@ -817,8 +803,7 @@ func TestListNodeExecutions_NothingToReturn(t *testing.T) {
listExecutionsCalled = true
return interfaces.ExecutionCollectionOutput{}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
_, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
@@ -915,8 +900,7 @@ func TestListNodeExecutionsForTask(t *testing.T) {
},
}, nil
})
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
nodeExecutions, err := nodeExecManager.ListNodeExecutionsForTask(context.Background(), admin.NodeExecutionForTaskListRequest{
TaskExecutionId: &core.TaskExecutionIdentifier{
NodeExecutionId: &core.NodeExecutionIdentifier{
@@ -1041,8 +1025,7 @@ func TestGetNodeExecutionData(t *testing.T) {
}
return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String())
}
- nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage,
- mockScope.NewTestScope(), mockNodeExecutionRemoteURL)
+ nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil)
dataResponse, err := nodeExecManager.GetNodeExecutionData(context.Background(), admin.NodeExecutionGetDataRequest{
Id: &nodeExecutionIdentifier,
})
diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go
index ddb927e7c..4a44790fe 100644
--- a/pkg/manager/impl/task_execution_manager.go
+++ b/pkg/manager/impl/task_execution_manager.go
@@ -5,6 +5,9 @@ import (
"fmt"
"strconv"
+ "github.com/golang/protobuf/proto"
+ notificationInterfaces "github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"
+
"github.com/lyft/flytestdlib/storage"
"github.com/lyft/flytestdlib/contextutils"
@@ -40,14 +43,16 @@ type taskExecutionMetrics struct {
ClosureSizeBytes prometheus.Summary
TaskExecutionInputBytes prometheus.Summary
TaskExecutionOutputBytes prometheus.Summary
+ PublishEventError prometheus.Counter
}
type TaskExecutionManager struct {
- db repositories.RepositoryInterface
- config runtimeInterfaces.Configuration
- storageClient *storage.DataStore
- metrics taskExecutionMetrics
- urlData dataInterfaces.RemoteURLInterface
+ db repositories.RepositoryInterface
+ config runtimeInterfaces.Configuration
+ storageClient *storage.DataStore
+ metrics taskExecutionMetrics
+ urlData dataInterfaces.RemoteURLInterface
+ notificationClient notificationInterfaces.Publisher
}
func getTaskExecutionContext(ctx context.Context, identifier *core.TaskExecutionIdentifier) context.Context {
@@ -173,6 +178,11 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
m.metrics.TaskExecutionsTerminated.Inc()
}
+ if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil {
+ m.metrics.PublishEventError.Inc()
+ logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
+ }
+
m.metrics.TaskExecutionEventsCreated.Inc()
logger.Debugf(ctx, "Successfully recorded task execution event [%v]", request.Event)
// TODO: we will want to return some scope information here soon!
@@ -310,9 +320,7 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
return response, nil
}
-func NewTaskExecutionManager(
- db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore,
- scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.TaskExecutionInterface {
+func NewTaskExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface, publisher notificationInterfaces.Publisher) interfaces.TaskExecutionInterface {
metrics := taskExecutionMetrics{
Scope: scope,
ActiveTaskExecutions: scope.MustNewGauge("active_executions",
@@ -333,12 +341,15 @@ func NewTaskExecutionManager(
"size in bytes of serialized node execution inputs"),
TaskExecutionOutputBytes: scope.MustNewSummary("output_size_bytes",
"size in bytes of serialized node execution outputs"),
+ PublishEventError: scope.MustNewCounter("publish_event_error",
+ "overall count of publish event errors when invoking publish()"),
}
return &TaskExecutionManager{
- db: db,
- config: config,
- storageClient: storageClient,
- metrics: metrics,
- urlData: urlData,
+ db: db,
+ config: config,
+ storageClient: storageClient,
+ metrics: metrics,
+ urlData: urlData,
+ notificationClient: publisher,
}
}
diff --git a/pkg/manager/impl/task_execution_manager_test.go b/pkg/manager/impl/task_execution_manager_test.go
index 9cfb181b5..e7af57a34 100644
--- a/pkg/manager/impl/task_execution_manager_test.go
+++ b/pkg/manager/impl/task_execution_manager_test.go
@@ -175,8 +175,7 @@ func TestCreateTaskEvent(t *testing.T) {
}, input)
return nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.True(t, getTaskCalled)
assert.True(t, createTaskCalled)
@@ -289,8 +288,7 @@ func TestCreateTaskEvent_Update(t *testing.T) {
OutputUri: expectedOutputResult.OutputUri,
}
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, &mockPublisher)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.True(t, getTaskCalled)
assert.True(t, updateTaskCalled)
@@ -305,8 +303,7 @@ func TestCreateTaskEvent_MissingExecution(t *testing.T) {
func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) {
return models.NodeExecution{}, expectedErr
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, "failed to get existing node execution id: [node_id:\"node-id\""+
" execution_id: ] "+
@@ -326,8 +323,7 @@ func TestCreateTaskEvent_CreateDatabaseError(t *testing.T) {
func(ctx context.Context, input models.TaskExecution) error {
return expectedErr
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, expectedErr.Error())
assert.Nil(t, resp)
@@ -367,8 +363,7 @@ func TestCreateTaskEvent_UpdateDatabaseError(t *testing.T) {
func(ctx context.Context, execution models.TaskExecution) error {
return expectedErr
})
- nodeExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ nodeExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
resp, err := nodeExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, expectedErr.Error())
assert.Nil(t, resp)
@@ -401,8 +396,7 @@ func TestCreateTaskEvent_UpdateTerminalEventError(t *testing.T) {
}, nil
})
taskEventRequest.Event.Phase = core.TaskExecution_RUNNING
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.Nil(t, resp)
@@ -469,8 +463,7 @@ func TestCreateTaskEvent_PhaseVersionChange(t *testing.T) {
taskEventRequest.Event.PhaseVersion = uint32(1)
taskEventRequest.Event.OccurredAt = taskEventUpdatedAtProto
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, &mockPublisher)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.True(t, getTaskCalled)
assert.True(t, updateTaskCalled)
@@ -536,8 +529,7 @@ func TestGetTaskExecution(t *testing.T) {
},
}, nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
taskExecution, err := taskExecManager.GetTaskExecution(context.Background(), admin.TaskExecutionGetRequest{
Id: &core.TaskExecutionIdentifier{
TaskId: sampleTaskID,
@@ -587,8 +579,7 @@ func TestGetTaskExecution_TransformerError(t *testing.T) {
Closure: []byte("i'm an invalid task closure"),
}, nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
taskExecution, err := taskExecManager.GetTaskExecution(context.Background(), admin.TaskExecutionGetRequest{
Id: &core.TaskExecutionIdentifier{
TaskId: sampleTaskID,
@@ -698,8 +689,7 @@ func TestListTaskExecutions(t *testing.T) {
},
}, nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
taskExecutions, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{
NodeExecutionId: &core.NodeExecutionIdentifier{
NodeId: "nodey b",
@@ -770,8 +760,7 @@ func TestListTaskExecutions_NoFilters(t *testing.T) {
listTaskCalled = true
return interfaces.TaskExecutionCollectionOutput{}, nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
_, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{
Token: "1",
Limit: 99,
@@ -789,8 +778,7 @@ func TestListTaskExecutions_NoLimit(t *testing.T) {
getTaskCalled = true
return interfaces.TaskExecutionCollectionOutput{}, nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
_, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{
Limit: 0,
})
@@ -821,8 +809,7 @@ func TestListTaskExecutions_NothingToReturn(t *testing.T) {
listTasksCalled = true
return interfaces.TaskCollectionOutput{}, nil
})
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(),
- getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
_, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
@@ -924,8 +911,7 @@ func TestGetTaskExecutionData(t *testing.T) {
}
return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String())
}
- taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage,
- mockScope.NewTestScope(), mockTaskExecutionRemoteURL)
+ taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil)
dataResponse, err := taskExecManager.GetTaskExecutionData(context.Background(), admin.TaskExecutionGetDataRequest{
Id: &core.TaskExecutionIdentifier{
TaskId: sampleTaskID,
diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go
index 376233868..806970fa4 100644
--- a/pkg/rpc/adminservice/base.go
+++ b/pkg/rpc/adminservice/base.go
@@ -99,6 +99,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope)
+ eventPublisher := notifications.NewEventsPublisher(*configuration.ApplicationConfiguration().GetExternalEventsConfig(), adminScope)
go func() {
logger.Info(context.Background(), "Started processing notifications.")
processor.StartProcessing()
@@ -131,10 +132,7 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
db, configuration, workflowengine.NewCompiler(), dataStorageClient, applicationConfiguration.MetadataStoragePrefix,
adminScope.NewSubScope("workflow_manager"))
namedEntityManager := manager.NewNamedEntityManager(db, configuration, adminScope.NewSubScope("named_entity_manager"))
- executionManager := manager.NewExecutionManager(
- db, configuration, dataStorageClient, workflowExecutor, adminScope.NewSubScope("execution_manager"),
- adminScope.NewSubScope("user_execution_metrics"), publisher, urlData, workflowManager,
- namedEntityManager)
+ executionManager := manager.NewExecutionManager(db, configuration, dataStorageClient, workflowExecutor, adminScope.NewSubScope("execution_manager"), adminScope.NewSubScope("user_execution_metrics"), publisher, urlData, workflowManager, namedEntityManager, eventPublisher)
scheduledWorkflowExecutor := workflowScheduler.GetWorkflowExecutor(executionManager, launchPlanManager)
logger.Info(context.Background(), "Successfully initialized a new scheduled workflow executor")
@@ -160,10 +158,10 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
LaunchPlanManager: launchPlanManager,
ExecutionManager: executionManager,
NamedEntityManager: namedEntityManager,
- NodeExecutionManager: manager.NewNodeExecutionManager(
- db, configuration, dataStorageClient, adminScope.NewSubScope("node_execution_manager"), urlData),
- TaskExecutionManager: manager.NewTaskExecutionManager(
- db, configuration, dataStorageClient, adminScope.NewSubScope("task_execution_manager"), urlData),
+ NodeExecutionManager: manager.NewNodeExecutionManager(db, configuration, dataStorageClient,
+ adminScope.NewSubScope("node_execution_manager"), urlData, eventPublisher),
+ TaskExecutionManager: manager.NewTaskExecutionManager(db, configuration, dataStorageClient,
+ adminScope.NewSubScope("task_execution_manager"), urlData, eventPublisher),
ProjectManager: manager.NewProjectManager(db, configuration),
ResourceManager: resources.NewResourceManager(db, configuration.ApplicationConfiguration()),
Metrics: InitMetrics(adminScope),
diff --git a/pkg/runtime/application_config_provider.go b/pkg/runtime/application_config_provider.go
index 58032b67e..1ad15c8ef 100644
--- a/pkg/runtime/application_config_provider.go
+++ b/pkg/runtime/application_config_provider.go
@@ -16,6 +16,7 @@ const scheduler = "scheduler"
const remoteData = "remoteData"
const notifications = "notifications"
const domains = "domains"
+const externalEvents = "externalEvents"
var databaseConfig = config.MustRegisterSection(database, &interfaces.DbConfigSection{})
var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.ApplicationConfig{})
@@ -23,6 +24,7 @@ var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.Schedule
var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.RemoteDataConfig{})
var notificationsConfig = config.MustRegisterSection(notifications, &interfaces.NotificationsConfig{})
var domainsConfig = config.MustRegisterSection(domains, &interfaces.DomainsConfig{})
+var externalEventsConfig = config.MustRegisterSection(externalEvents, &interfaces.ExternalEventsConfig{})
// Implementation of an interfaces.ApplicationConfiguration
type ApplicationConfigurationProvider struct{}
@@ -72,6 +74,11 @@ func (p *ApplicationConfigurationProvider) GetNotificationsConfig() *interfaces.
func (p *ApplicationConfigurationProvider) GetDomainsConfig() *interfaces.DomainsConfig {
return domainsConfig.GetConfig().(*interfaces.DomainsConfig)
}
+
+func (p *ApplicationConfigurationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig {
+ return externalEventsConfig.GetConfig().(*interfaces.ExternalEventsConfig)
+}
+
func NewApplicationConfigurationProvider() interfaces.ApplicationConfiguration {
return &ApplicationConfigurationProvider{}
}
diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go
index d00878522..6cffceb29 100644
--- a/pkg/runtime/interfaces/application_configuration.go
+++ b/pkg/runtime/interfaces/application_configuration.go
@@ -146,6 +146,29 @@ type NotificationsEmailerConfig struct {
Body string `json:"body"`
}
+// This section handles configuration for the workflow notifications pipeline.
+type EventsPublisherConfig struct {
+ // The topic which events should be published, e.g. node, task, workflow
+ TopicName string `json:"topicName"`
+ // Event types: task, node, workflow executions
+ EventTypes []string `json:"eventTypes"`
+}
+
+type ExternalEventsConfig struct {
+ Enable bool `json:"enable"`
+ // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
+ // scheme is used.
+ Type string `json:"type"`
+ AWSConfig AWSConfig `json:"aws"`
+ GCPConfig GCPConfig `json:"gcp"`
+ // Publish events to a pubsub tops
+ EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"`
+ // Number of times to attempt recreating a notifications processor client should there be any disruptions.
+ ReconnectAttempts int `json:"reconnectAttempts"`
+ // Specifies the time interval to wait before attempting to reconnect the notifications processor client.
+ ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
+}
+
// Configuration specific to notifications handling
type NotificationsConfig struct {
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
@@ -182,4 +205,5 @@ type ApplicationConfiguration interface {
GetRemoteDataConfig() *RemoteDataConfig
GetNotificationsConfig() *NotificationsConfig
GetDomainsConfig() *DomainsConfig
+ GetExternalEventsConfig() *ExternalEventsConfig
}
diff --git a/pkg/runtime/mocks/mock_application_provider.go b/pkg/runtime/mocks/mock_application_provider.go
index 0d9973dbc..33ac9a9b9 100644
--- a/pkg/runtime/mocks/mock_application_provider.go
+++ b/pkg/runtime/mocks/mock_application_provider.go
@@ -5,12 +5,13 @@ import (
)
type MockApplicationProvider struct {
- dbConfig interfaces.DbConfig
- topLevelConfig interfaces.ApplicationConfig
- schedulerConfig interfaces.SchedulerConfig
- remoteDataConfig interfaces.RemoteDataConfig
- notificationsConfig interfaces.NotificationsConfig
- domainsConfig interfaces.DomainsConfig
+ dbConfig interfaces.DbConfig
+ topLevelConfig interfaces.ApplicationConfig
+ schedulerConfig interfaces.SchedulerConfig
+ remoteDataConfig interfaces.RemoteDataConfig
+ notificationsConfig interfaces.NotificationsConfig
+ domainsConfig interfaces.DomainsConfig
+ externalEventsConfig interfaces.ExternalEventsConfig
}
func (p *MockApplicationProvider) GetDbConfig() interfaces.DbConfig {
@@ -60,3 +61,11 @@ func (p *MockApplicationProvider) GetDomainsConfig() *interfaces.DomainsConfig {
func (p *MockApplicationProvider) SetDomainsConfig(domainsConfig interfaces.DomainsConfig) {
p.domainsConfig = domainsConfig
}
+
+func (p *MockApplicationProvider) SetExternalEventsConfig(externalEventsConfig interfaces.ExternalEventsConfig) {
+ p.externalEventsConfig = externalEventsConfig
+}
+
+func (p *MockApplicationProvider) GetExternalEventsConfig() *interfaces.ExternalEventsConfig {
+ return &p.externalEventsConfig
+}